Python/MQTT
![]() |
Completion status: this resource has reached a high level of completion. |
Objective
This tutorial will guide you to work with MQTT protocol in Python by using Paho MQTT client library (paho-mqtt). This tutorial is specifically for anyone who intends to use a third party library for MQTT to send and receive data between devices. The Python programming skills required for this tutorial is very basic. This tutorial will also cover a short introduction to ThingsBoard, a free-for-testing IoT platform, which you can use as a MQTT broker to test your MQTT client's program.
As using 3rd lib paho-mqtt, this tutorial will not include any details explanation to implement MQTT client from TCP socket API as its lower level. However, user need to have some basic knowledge on MQTT protocol before proceeding with this tutorial.
Introduction
MQTT is a publish/subscribe communication protocol which runs on port 1883. For MQTT official document, refer to MQTT v3.1.1.
A MQTT system must includes a broker, and the client(s) connect to that broker. The MQTT broker acts as a post office which the MQTT clients connect to. MQTT clients can be subscribers and publisher. A subscriber subscribes to a topic on the MQTT broker while the publisher publishes message to a topic on the broker. For example:
- MQTT client A is a subscriber and subscribes to topic "data" of a local MQTT broker at 127.0.0.1:1883
- MQTT client B is a publisher and publish message "Hello, World !" to topic "data" of this local MQTT broker
- MQTT client A then receives the published message "Hello, World !" from client B.
For the local MQTT broker for testing, in this tutorial, we use Mosquitto. Install (on Linux):
sudo apt install mosquitto
After installing successfully, local MQTT broker Mosquitto runs on port 1883 by default. Then, we install paho-mqtt as the MQTT client:
pip install paho-mqtt
Publish and subscribe to a local MQTT broker
For both publisher and subscriber, we connects to the MQTT broker with the connect() function:
connect(host, port=1883, keepalive=60, bind_address="")
- keepalive: maximum period in seconds allowed between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker
In paho-mqtt, the maximum packet size is 256MB
MQTT client A publishes text message to a MQTT broker
import paho.mqtt.client as mqtt
import time
import json
MQTT_BROKER = "192.168.230.220"
send_string = 0
number = 0
client = mqtt.Client("this is MQTT ID")
def on_connect(client, userdata, flags, rc):
if rc==0:
print("connected OK") #print out when connect sucessfully
else:
print("bad connection")
client.on_connect = on_connect
client.connect(MQTT_BROKER, 1883, 60)
client.loop_start()
try:
while True:
#Send text message
send_string = "Hello, World !" + str(number)
client.publish("test/message", send_string)
number += 1
time.sleep(1)
except KeyboardInterrupt:
pass
client.loop_stop()
client.disconnect()
MQTT client B subscribes to topic "test/message" to get the published message of MQTT client A:
import paho.mqtt.client as mqtt
MQTT_BROKER = "192.168.230.220"
client = mqtt.Client("this is MQTT ID")
def on_connect(client, userdata, flags, rc):
if rc==0:
print("MQTT client has published")
else:
print("bad connection")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print("Received message")
print(msg.topic+" "+str(msg.payload.decode()))
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, 1883)
client.subscribe("test/message")
client.loop_forever()
Use userdata
# other operations are like MQTT B's subsriber program
user_data_string = "User data string"
def on_connect(client, userdata, flags, rc):
if rc==0:
print("connected OK, userdata: ", userdata)
else:
print("bad connection")
client.on_connect = on_connect
client.user_data_set(user_data_string)
client.connect(MQTT_BROKER, 1883, 60)
client.loop_start()
# other operations are like MQTT B's subsriber program
A process works both as publisher and subscriber:
import paho.mqtt.client as mqtt
import time
MQTT_BROKER = "localhost"
client = mqtt.Client("this is MQTT ID")
send_number = 0
def on_connect(client, userdata, flags, rc):
if rc==0:
print("MQTT client has published")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(str(msg.payload))
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, 1883, 60)
client.subscribe("test/message", 2)
client.loop_start()
try:
while True:
client.publish("test/message", send_number)
send_number += 1
time.sleep(1)
except KeyboardInterrupt:
pass
client.loop_stop()
client.disconnect()
Use paho.mqtt subscribe.py
import paho.mqtt.client as mqtt
mqttBroker = "localhost"
client = mqtt.Client("this is MQTT ID")
def on_connect(client, userdata, flags, rc):
client.subscribe("test/message")
if rc==0:
print("MQTT client has published")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(str(msg.payload))
client.on_connect = on_connect
client.on_message = on_message
client.connect(mqttBroker, 1883, 60)
client.loop_forever()
publish.py
import paho.mqtt.publish as publish
import time
import json
mqttBroker = "localhost"
send_number = 0
try:
while True:
publish.single("test/message", send_number, hostname=mqttBroker)
send_number += 1
time.sleep(1)
except KeyboardInterrupt:
pass
Both subscribe.py and publish.py can't both use import paho.mqtt.client as mqtt
, if using this, subscriber or publisher will not be able to receive message.
Publish and subscribe to a cloud MQTT broker
There are multiple free-for-testing cloud MQTT brokers, such as Eclipse.org and ThingsBoard (which we will discuss later). In this part, we will test the publish, subscribe feature with the Eclipse MQTT broker located at mqtt.eclipse.org
Publisher:
import paho.mqtt.client as mqtt
import time
import json
mqttBroker = "mqtt.eclipse.org"
send_number = 0
client = mqtt.Client("this is MQTT ID")
def on_connect(client, userdata, flags, rc):
if rc==0:
print("connected OK") #print out when connect sucessfully
else:
print("bad connection")
client.on_connect = on_connect
client.connect(mqttBroker, 1883, 60)
client.loop_start()
try:
while True:
client.publish("test/message", send_number)
send_number += 1
time.sleep(1)
except KeyboardInterrupt:
pass
client.loop_stop()
client.disconnect()
Subscriber:
callback()
: work every time when a message is published
import paho.mqtt.subscribe as subscribe
# other operations are like MQTT B's subsriber program
def on_message_print(client, userdata, message):
print("%s %s" % (message.topic, message.payload))
# other operations are like MQTT B's subsriber program
subscribe.callback(on_message_print, "test/message", hostname="mqtt.eclipse.org")
simple()
: only work one time, receive one message then stops
msg = subscribe.simple("test/message", hostname="mqtt.eclipse.org")
print("%s %s" % (msg.topic, msg.payload))
Publish and subscribe with publish and subscribe class
publish.py
import paho.mqtt.publish as publish
import time
import json
mqttBroker = "mqtt.eclipse.org"
send_number = 0
try:
while True:
publish.single("test/message", send_number, hostname=mqttBroker)
send_number += 1
time.sleep(1)
except KeyboardInterrupt:
pass
subscribe.py
import paho.mqtt.subscribe as subscribe
def on_message_print(client, userdata, message):
print("%s %s" % (message.topic, message.payload))
subscribe.callback(on_message_print, "test/message", hostname="mqtt.eclipse.org")
Working with ThingsBoard
ThingsBoard is a IoT platform which can be use for free testing with multiple communication protocols like HTTP, MQTT, WebSocket,... or using for enterprise purpose. In this tutorial, we will use the ThingsBoard live demo server MQTT broker to testing.
At first, we need to know how to setup authentication for a MQTT client with username and password in paho-mqtt:
client = mqtt.Client("this is MQTT ID")
client.username_pw_set("sw", "1_Abc_123")
client.connect(mqttBroker, 1883, 60)
Telemetry
MQTT client sends increasing number to ThingsBoard livedemo MQTT broker
import paho.mqtt.client as mqtt
import time
import json
MQTT_BROKER = "thingsboard.sysats.tech"
TOKEN = "RbeNuZuf86rJ4zlY369i" # A device token in ThingsBoard
client = mqtt.Client("this is MQTT ID")
payload = {"number-value": 0}
def on_connect(client, userdata, flags, rc):
if rc==0:
print("connected OK") #print out when connect sucessfully
else:
print("bad connection")
client.username_pw_set(TOKEN)
client.on_connect = on_connect
client.connect(MQTT_BROKER, 1883, 60)
client.loop_start()
try:
while True:
client.publish("v1/devices/me/telemetry", json.dumps(payload))
payload['number-value'] += 1
time.sleep(1)
except KeyboardInterrupt:
pass
client.loop_stop()
client.disconnect()
Map widget
Simply send the valid latitude and longitude value as JSON field to client.publish()
client.publish("v1/devices/me/telemetry", "{'latitude': 10.786174956334452, 'longitude': 106.71002292803459}")
RPC
Control GPIO on Basic GPIO panel
Control 3 GPIO on Basic GPIO panel and save its previous setup status (i.e Able to display the previous setup status when dashboard is reloaded)
import paho.mqtt.client as mqtt
import json
MQTT_BROKER = "thingsboard.sysats.tech"
TOKEN = "TTf3zmVacJI4dUQsYQwh"
# We assume that all GPIOs are LOW
gpio_state = {1: 0, 2: 0, 3: 0}
client = mqtt.Client("this is MQTT ID")
def on_connect(client, userdata, flags, rc):
if rc==0:
print("connected OK") #print out when connect sucessfully
else:
print("bad connection")
#Must put those 2 functions inside on_connect() callback function
client.subscribe("v1/devices/me/rpc/request/+")
client.publish("v1/devices/me/attributes", get_gpio_status())
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print("topic: " + msg.topic)
print("Received message: " + str(msg.payload.decode()))
data = json.loads(msg.payload)
if data['method'] == "setGpioStatus":
set_gpio_status(data['params']['pin'], data['params']['enabled'])
client.publish(msg.topic.replace('request', 'response'), get_gpio_status())
client.publish("v1/devices/me/attributes", get_gpio_status())
print(get_gpio_status())#Print GPIO status to debug
elif data['method'] == "getGpioStatus":
client.publish(msg.topic.replace('request', 'response'), get_gpio_status())
client.publish("v1/devices/me/attributes", get_gpio_status())
print(get_gpio_status())#Print GPIO status to debug
def set_gpio_status(pin, status):
gpio_state[pin] = status
def get_gpio_status():
return json.dumps(gpio_state)
client.username_pw_set(TOKEN)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, 1883)
client.loop_forever()
Control multiple devices by multithread
To control multiple devices, use multithread.
Features:
- Control 2 devices on ThingsBoard with MQTT client
- Print out the received message with specific device name that publishes that MQTT message
Control a GPIO with mode selection in Basic GPIO panel
Basic GPIO panel includes 2 GPIO, RELAY and MODE.
- If MODE is MANUAL (0): Control RELAY normally
- If MODE is AUTOMATION (1): RELAY can't be control and keep its previously set value
Conclusion
Congratulation ! You have reached the end of the tutorial. The tutorial is very basic for hobbyist and inexperience developers who will gain enough basic knowledge with MQTT with paho-mqtt library to work on their hobbyist projects. Hobbyist will need to investigate more on the paho-mqtt library API based on their projects' features.
For developers and students who want to deep dive into the MQTT protocols, please start by implementing the whole MQTT client publish and subscribe program from the raw TCP socket API. The whole protocol descriptions for that implementation can be found in MQTT v3.1.1.