< Python

Python/MQTT

Completion status: this resource has reached a high level of completion.

Objective

This tutorial will guide you to work with MQTT protocol in Python by using Paho MQTT client library (paho-mqtt). This tutorial is specifically for anyone who intends to use a third party library for MQTT to send and receive data between devices. The Python programming skills required for this tutorial is very basic. This tutorial will also cover a short introduction to ThingsBoard, a free-for-testing IoT platform, which you can use as a MQTT broker to test your MQTT client's program.

As using 3rd lib paho-mqtt, this tutorial will not include any details explanation to implement MQTT client from TCP socket API as its lower level. However, user need to have some basic knowledge on MQTT protocol before proceeding with this tutorial.

Introduction

MQTT is a publish/subscribe communication protocol which runs on port 1883. For MQTT official document, refer to MQTT v3.1.1.

A MQTT system must includes a broker, and the client(s) connect to that broker. The MQTT broker acts as a post office which the MQTT clients connect to. MQTT clients can be subscribers and publisher. A subscriber subscribes to a topic on the MQTT broker while the publisher publishes message to a topic on the broker. For example:

  • MQTT client A is a subscriber and subscribes to topic "data" of a local MQTT broker at 127.0.0.1:1883
  • MQTT client B is a publisher and publish message "Hello, World !" to topic "data" of this local MQTT broker
  • MQTT client A then receives the published message "Hello, World !" from client B.

For the local MQTT broker for testing, in this tutorial, we use Mosquitto. Install (on Linux):

sudo apt install mosquitto

After installing successfully, local MQTT broker Mosquitto runs on port 1883 by default. Then, we install paho-mqtt as the MQTT client:

pip install paho-mqtt

Publish and subscribe to a local MQTT broker

For both publisher and subscriber, we connects to the MQTT broker with the connect() function:

connect(host, port=1883, keepalive=60, bind_address="")
  • keepalive: maximum period in seconds allowed between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker

In paho-mqtt, the maximum packet size is 256MB

MQTT client A publishes text message to a MQTT broker

import paho.mqtt.client as mqtt
import time
import json

MQTT_BROKER = "192.168.230.220"

send_string = 0
number = 0

client = mqtt.Client("this is MQTT ID")

def on_connect(client, userdata, flags, rc):
    if rc==0:
        print("connected OK") #print out when connect sucessfully
    else:
        print("bad connection")  

client.on_connect = on_connect

client.connect(MQTT_BROKER, 1883, 60)

client.loop_start()

try:
    while True:
        #Send text message
        send_string = "Hello, World !" + str(number)
        client.publish("test/message", send_string)
        number += 1
        time.sleep(1)

except KeyboardInterrupt:
    pass

client.loop_stop()
client.disconnect()

MQTT client B subscribes to topic "test/message" to get the published message of MQTT client A:

import paho.mqtt.client as mqtt

MQTT_BROKER = "192.168.230.220"
client = mqtt.Client("this is MQTT ID")

def on_connect(client, userdata, flags, rc):
    if rc==0:
        print("MQTT client has published")
    else:
        print("bad connection")    
        
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print("Received message") 
    print(msg.topic+" "+str(msg.payload.decode()))

client.on_connect = on_connect
client.on_message = on_message

client.connect(MQTT_BROKER, 1883)
client.subscribe("test/message")

client.loop_forever()

Use userdata

# other operations are like MQTT B's subsriber program
user_data_string = "User data string"

def on_connect(client, userdata, flags, rc):
    if rc==0:
        print("connected OK, userdata: ", userdata)
    else:
        print("bad connection")  

client.on_connect = on_connect
client.user_data_set(user_data_string)
client.connect(MQTT_BROKER, 1883, 60)

client.loop_start()

# other operations are like MQTT B's subsriber program

A process works both as publisher and subscriber:

import paho.mqtt.client as mqtt
import time

MQTT_BROKER = "localhost"
client = mqtt.Client("this is MQTT ID")
send_number = 0

def on_connect(client, userdata, flags, rc):
    if rc==0:
        print("MQTT client has published")
        
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg): 
    print(str(msg.payload))

client.on_connect = on_connect
client.on_message = on_message

client.connect(MQTT_BROKER, 1883, 60)
client.subscribe("test/message", 2)

client.loop_start()
try:
    while True:
        client.publish("test/message", send_number)
        send_number += 1
        time.sleep(1)

except KeyboardInterrupt:
    pass

client.loop_stop()
client.disconnect()

Use paho.mqtt subscribe.py

import paho.mqtt.client as mqtt

mqttBroker = "localhost"
client = mqtt.Client("this is MQTT ID")

def on_connect(client, userdata, flags, rc):
    client.subscribe("test/message")
    if rc==0:
        print("MQTT client has published")
        
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg): 
    print(str(msg.payload))

client.on_connect = on_connect
client.on_message = on_message

client.connect(mqttBroker, 1883, 60)
client.loop_forever()

publish.py

import paho.mqtt.publish as publish
import time
import json

mqttBroker = "localhost"
send_number = 0

try:
    while True:
        publish.single("test/message", send_number, hostname=mqttBroker)
        send_number += 1
        time.sleep(1)

except KeyboardInterrupt:
    pass

Both subscribe.py and publish.py can't both use import paho.mqtt.client as mqtt, if using this, subscriber or publisher will not be able to receive message.

Publish and subscribe to a cloud MQTT broker

There are multiple free-for-testing cloud MQTT brokers, such as Eclipse.org and ThingsBoard (which we will discuss later). In this part, we will test the publish, subscribe feature with the Eclipse MQTT broker located at mqtt.eclipse.org

Publisher:

import paho.mqtt.client as mqtt
import time
import json

mqttBroker = "mqtt.eclipse.org"
send_number = 0

client = mqtt.Client("this is MQTT ID")

def on_connect(client, userdata, flags, rc):
    if rc==0:
        print("connected OK") #print out when connect sucessfully
    else:
        print("bad connection")  

client.on_connect = on_connect

client.connect(mqttBroker, 1883, 60)

client.loop_start()

try:
    while True:
        client.publish("test/message", send_number)
        send_number += 1
        time.sleep(1)

except KeyboardInterrupt:
    pass

client.loop_stop()
client.disconnect()

Subscriber: callback(): work every time when a message is published

import paho.mqtt.subscribe as subscribe

# other operations are like MQTT B's subsriber program

def on_message_print(client, userdata, message):
    print("%s %s" % (message.topic, message.payload))

# other operations are like MQTT B's subsriber program 

subscribe.callback(on_message_print, "test/message", hostname="mqtt.eclipse.org")

simple(): only work one time, receive one message then stops

msg = subscribe.simple("test/message", hostname="mqtt.eclipse.org")
print("%s %s" % (msg.topic, msg.payload))

Publish and subscribe with publish and subscribe class

publish.py

import paho.mqtt.publish as publish
import time
import json

mqttBroker = "mqtt.eclipse.org"
send_number = 0

try:
    while True:
        publish.single("test/message", send_number, hostname=mqttBroker)
        send_number += 1
        time.sleep(1)

except KeyboardInterrupt:
    pass

subscribe.py

import paho.mqtt.subscribe as subscribe

def on_message_print(client, userdata, message):
    print("%s %s" % (message.topic, message.payload))

subscribe.callback(on_message_print, "test/message", hostname="mqtt.eclipse.org")

Working with ThingsBoard

ThingsBoard is a IoT platform which can be use for free testing with multiple communication protocols like HTTP, MQTT, WebSocket,... or using for enterprise purpose. In this tutorial, we will use the ThingsBoard live demo server MQTT broker to testing.

At first, we need to know how to setup authentication for a MQTT client with username and password in paho-mqtt:

client = mqtt.Client("this is MQTT ID")
client.username_pw_set("sw", "1_Abc_123")

client.connect(mqttBroker, 1883, 60)

Telemetry

MQTT client sends increasing number to ThingsBoard livedemo MQTT broker

import paho.mqtt.client as mqtt
import time
import json

MQTT_BROKER         = "thingsboard.sysats.tech"
TOKEN               = "RbeNuZuf86rJ4zlY369i" # A device token in ThingsBoard

client = mqtt.Client("this is MQTT ID")

payload = {"number-value": 0}

def on_connect(client, userdata, flags, rc):
    if rc==0:
        print("connected OK") #print out when connect sucessfully
    else:
        print("bad connection")  


client.username_pw_set(TOKEN)
client.on_connect = on_connect

client.connect(MQTT_BROKER, 1883, 60)

client.loop_start()

try:
    while True:
        client.publish("v1/devices/me/telemetry", json.dumps(payload))
        payload['number-value'] += 1
        time.sleep(1)

except KeyboardInterrupt:
    pass

client.loop_stop()
client.disconnect()

Map widget

Simply send the valid latitude and longitude value as JSON field to client.publish()

client.publish("v1/devices/me/telemetry", "{'latitude': 10.786174956334452, 'longitude': 106.71002292803459}")

RPC

Control GPIO on Basic GPIO panel

Control 3 GPIO on Basic GPIO panel and save its previous setup status (i.e Able to display the previous setup status when dashboard is reloaded)

import paho.mqtt.client as mqtt
import json

MQTT_BROKER = "thingsboard.sysats.tech"
TOKEN       = "TTf3zmVacJI4dUQsYQwh"

# We assume that all GPIOs are LOW
gpio_state = {1: 0, 2: 0, 3: 0}

client = mqtt.Client("this is MQTT ID")

def on_connect(client, userdata, flags, rc):
    if rc==0:
        print("connected OK") #print out when connect sucessfully
    else:
        print("bad connection")  
    
    #Must put those 2 functions inside on_connect() callback function
    client.subscribe("v1/devices/me/rpc/request/+")
    client.publish("v1/devices/me/attributes", get_gpio_status())

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print("topic: " + msg.topic)
    print("Received message: " + str(msg.payload.decode()))
    data = json.loads(msg.payload)

    if data['method'] == "setGpioStatus":
        set_gpio_status(data['params']['pin'], data['params']['enabled'])
        client.publish(msg.topic.replace('request', 'response'), get_gpio_status())
        client.publish("v1/devices/me/attributes", get_gpio_status())
        print(get_gpio_status())#Print GPIO status to debug
    elif data['method'] == "getGpioStatus":
        client.publish(msg.topic.replace('request', 'response'), get_gpio_status())
        client.publish("v1/devices/me/attributes", get_gpio_status())
        print(get_gpio_status())#Print GPIO status to debug

def set_gpio_status(pin, status):
    gpio_state[pin] = status

def get_gpio_status():
    return json.dumps(gpio_state)

client.username_pw_set(TOKEN)
client.on_connect = on_connect
client.on_message = on_message

client.connect(MQTT_BROKER, 1883)
client.loop_forever()

Control multiple devices by multithread

To control multiple devices, use multithread.

Features:

  • Control 2 devices on ThingsBoard with MQTT client
  • Print out the received message with specific device name that publishes that MQTT message

Program: thingsboard_basic_gpio_panel_multiple_devices.py

Control a GPIO with mode selection in Basic GPIO panel

Basic GPIO panel includes 2 GPIO, RELAY and MODE.

  • If MODE is MANUAL (0): Control RELAY normally
  • If MODE is AUTOMATION (1): RELAY can't be control and keep its previously set value

Program: thingsboard_basic_gpio_panel_mode_selection.py

Conclusion

Congratulation ! You have reached the end of the tutorial. The tutorial is very basic for hobbyist and inexperience developers who will gain enough basic knowledge with MQTT with paho-mqtt library to work on their hobbyist projects. Hobbyist will need to investigate more on the paho-mqtt library API based on their projects' features.

For developers and students who want to deep dive into the MQTT protocols, please start by implementing the whole MQTT client publish and subscribe program from the raw TCP socket API. The whole protocol descriptions for that implementation can be found in MQTT v3.1.1.

This article is issued from Wikiversity. The text is licensed under Creative Commons - Attribution - Sharealike. Additional terms may apply for the media files.