Python and MQTT

Solved
PhilipDAth
Kind of a big deal
Kind of a big deal

Python and MQTT

I used to use node.js to process MQTT.  It's brilliant.  Everything is async.

 

I have been trying to wean myself off it and onto Python since the Meraki node.js library is no longer supported, and it's going to create a headache in the future using the Meraki V0 API which will be eventually discontinued.

 

I often use MQTT with MVs where you get a notification of a person or vehicle, you grab a snapshot (which takes ages) and then process it in some way.

 

I can't settle on a good way to do this in Python.  Because the snapshot process is slow I need something non-blocking.  What are you using, and how are you handling it?

 

There is the super popular Paho library.  Zero async support.  Uses callbacks.  I could use queues and add the message to a queue, and then use async to process from there.  Haven't tried that way yet.  That will probably be my next experiment.

 

There are async mqtt engines like asyncio-mqtt (which has great notes like "Expect API changes until we reach version 1.0.0") and HBMQTT.  They don't seem that popular.  I worry if I start using one of these I might end up in the same camp with node.js, of having to change later because they get abandoned.

 

Your advice for me?

1 Accepted Solution
Greenberet
Head in the Cloud


@PhilipDAth wrote:

I've found asyncio processing of MQTT messages in Python is almost pointless.

 

Typically you construct an array of tasks, and then wait for them all to complete.

 

The problem is MQTT messages come in one at a time.  You can't construct an array.

Sure if the first message takes 10s to process, and you get messages every second, then after that first 10s you could batch all messages waiting in the queue.

And to do this, you can't use the native MQTT async functions.  You have to use the original code I gave, which runs it in a thread and adds it to a thread-safe queue.

 

It's a real shame Meraki dropped support for node.js.


Why do you need to queue them?

you could also use asyncio.ensure_future and let it do it's work. With await you are just blocking & waiting for it to be finished, but you aren't doing anything with the result, therefore you can just let it run.

 

from your asyncio-mqtt example:

 

 

import asyncio
from asyncio_mqtt import Client, MqttError

# Sample async function to be called
async def processSnapshot(camera_serial):
    print("processSnapshot")
    await asyncio.sleep(1)

# This gets called whenever when we get an MQTT message
async def mqtt_on_message(msg):
    ...
    data = json.loads(msg.payload.decode('utf-8'))
    split_topic = msg.topic.split("/", 3)
    ...
    return await processSnapshot(camera_serial)


async def main():
    ...
    async with Client(hostname="127.0.0.1",port=1883,username=MQTT_USERNAME,password=MQTT
_PASSWORD) as client:
        async with client.filtered_messages("/merakimv/#") as messages:
            await client.subscribe("/merakimv/#")
            async for message in messages:
                asyncio.ensure_future(mqtt_on_message(message)) #I've just changed this line of code

if __name__ == "__main__":
    # execute only if run as a script
    asyncio.run(main())

 

 

you could also use asyncio.create_task as they would basically do the same thing here.
personally I prefer to use ensure_future for tasks on which I don't care about the result, and create_task, when I want to "await" them later

View solution in original post

10 Replies 10
Greenberet
Head in the Cloud

I'm in a similiar situation like you.

I've solved it with node-red. The MQTT related tasks (and currently even more) are automated with node-red and then I'm calling the python scripts either via exec node or via a rest api.

PhilipDAth
Kind of a big deal
Kind of a big deal

I was really hoping for some pearls of wisdom from you @Greenberet .  🙂

 

I can't use NodeRed for this particular requirement as it is multi-tenant, and the complete lack of security in NodeRed makes it unsuitable.

Since you added the async support to the Meraki library it has made so many things possible that I had previously written it off for.  I really want to try and swim with the flow that the Meraki API team has created.

 

I have done my first splash with Paho.  Let me share my learnings:

  • Paho uses a separate thread for all callbacks, such as receiving a message.
    • Many many Python libraries are not thread safe.
    • Some data types, like arrays, are not thread safe.
    • The only thing you want to do in the callback is use the "queue" module, do any fast processing and then put the data into the queue, and then process later.  The "queue" module is thread-safe.
      • asyncio.queue is not thread-safe.
    • You can't mix async functions between threads even if they are threadsafe.  For example, you can't call an async function in one thread (such as the Paho message callback), and then await for that in the main thread.
    • Make sure you use the "global" keyword in the callback functions if there is any chance you might be updating something.  I used it on everything I referenced globally to be safe.  Be warned again, some datatypes are not thread safe.
  • The way Paho works, it really encourages you to use threads for everything.  But with so many things not being thread safe, you could really only use this approach on the smallest of projects.  If you want to use asyncio with anything - DANGER.

 

This is the jist of how to make it work:

 

 

import os,asyncio
import paho.mqtt.client as mqtt
from queue import Queue

# Sample async function to be used to processing inbound MQTT messages
async def processSnapshot(camera_serial):
    await asyncio.sleep(1)

# This gets called whenever we connect to the MQTT broker server
def mqtt_on_connect(client, userdata, flags, rc):
    print("Connected to MQTT broker with result code "+str(rc))
    client.subscribe("/merakimv/#")

# This gets called whenever get get an MQTT message
def mqtt_on_message(client, userdata, msg):
    global xxx # Reference anything you might update that is a global variable

    print(msg.topic+" "+str(msg.payload))
    data = json.loads(msg.payload.decode('utf-8'))
    split_topic = msg.topic.split("/", 3)
    ...
    q.put(data);  # Add to the queue whatever you want it to process


async def main():
    ...
    mqtt_client=mqtt.Client("name-of-your-client")
    mqtt_client.username_pw_set(MQTT_USERNAME,MQTT_PASSWORD)
    mqtt_client.on_connect=mqtt_on_connect
    mqtt_client.on_message=mqtt_on_message
    mqtt_client.connect("your-server",port=1883,keepalive=60)

    mqtt_client.loop_start()

    # In this case, loop forever.  q.get() will block if queue is empty.
    while True:
        await processSnapshot(q.get())


if __name__ == "__main__":
    # execute only if run as a script
    asyncio.run(main())

 

The plus side of this approach is it *may* be able to use two CPU cores, one for the callbacks when you receive a message, and then another for the processing of the messages.  So this approach may scale better if you have a large number of incoming messages AND if you can do safely do some kind of pre-processing on those messages in that same thread.  If all you do is receive the message and then put it into the queue - it has to be slower.  I think I would only do pre-processing that involved only your code and no libraries to be confident you weren't going to run into thread-safety issues.

PhilipDAth
Kind of a big deal
Kind of a big deal

I've had a look over the asyncio MQTT solutions now.

 

  • asyncio-mqtt
    • uses paho-mqtt unmodified and just extends it
    • less than 600 lines of code
    • fully embraces ayncio style
    • uses mature paho-mqtt unmodified and just extends it
  •  aiomqtt
    • based on paho-mqtt
    • Requires the paho "loop_forver" style mechanism
    • Has a lot of concepts as asyncio, but does not fully embrace it
  •  hbmqtt
    • Own implementation of MQTT
    • Embraces asyncio style well
    • Hasn't had any commits for a while so either it works good is not supported that well
  •  mqttools
    • Immature.  Only implements bits of the MQTT protocol.
  •  gmqtt
    • uses its own implementation of MQTT
    • seems to have regular maintenance and releases
    • has some support of asyncio concepts but doesn't embrace it completely.

 

Overall, I like asyncio-mqtt best because it uses the stable and mature paho-mqtt library, un-modified.  It simply extends it.  The extension is a small amount of code.

PhilipDAth
Kind of a big deal
Kind of a big deal

An asyncio_mqtt example looks like:

 

 

 

 

 

import asyncio
from asyncio_mqtt import Client, MqttError

# Sample async function to be called
async def processSnapshot(camera_serial):
    print("processSnapshot")
    await asyncio.sleep(1)

# This gets called whenever when we get an MQTT message
async def mqtt_on_message(msg):
    ...
    data = json.loads(msg.payload.decode('utf-8'))
    split_topic = msg.topic.split("/", 3)
    ...
    return await processSnapshot(camera_serial)


async def main():
    ...
    async with Client(hostname="127.0.0.1",port=1883,username=MQTT_USERNAME,password=MQTT
_PASSWORD) as client:
        async with client.filtered_messages("/merakimv/#") as messages:
            await client.subscribe("/merakimv/#")
            async for message in messages:
                await mqtt_on_message(message)

if __name__ == "__main__":
    # execute only if run as a script
    asyncio.run(main())

 

 

 

 

PhilipDAth
Kind of a big deal
Kind of a big deal

I've found asyncio processing of MQTT messages in Python is almost pointless.

 

Typically you construct an array of tasks, and then wait for them all to complete.

 

The problem is MQTT messages come in one at a time.  You can't construct an array.

Sure if the first message takes 10s to process, and you get messages every second, then after that first 10s you could batch all messages waiting in the queue.

And to do this, you can't use the native MQTT async functions.  You have to use the original code I gave, which runs it in a thread and adds it to a thread-safe queue.

 

It's a real shame Meraki dropped support for node.js.

PhilipDAth
Kind of a big deal
Kind of a big deal

I ended up using the original code and the Paho MQTT library, with this modified main() function:

 

async def main():
    ...
    mqtt_client=mqtt.Client("...")
    mqtt_client.username_pw_set(MQTT_USERNAME,MQTT_PASSWORD)
    mqtt_client.on_connect=mqtt_on_connect
    mqtt_client.on_message=mqtt_on_message
    mqtt_client.connect("127.0.0.1",port=1883,keepalive=60)

    mqtt_client.loop_start()

    while True:
        # At a minimum wait a second before checking for items in the queue
        tasks = [asyncio.sleep(1)]

        print(f"Queue size={q.qsize()}")
        try:
            # Get all waiting jobs in the queue
            while True:
                message=q.get(False)
                tasks.append(processSnapshot(message))
        except queue.Empty:
            # Wait for all the jobs to finish
            await asyncio.gather(*tasks)
Greenberet
Head in the Cloud


@PhilipDAth wrote:

I've found asyncio processing of MQTT messages in Python is almost pointless.

 

Typically you construct an array of tasks, and then wait for them all to complete.

 

The problem is MQTT messages come in one at a time.  You can't construct an array.

Sure if the first message takes 10s to process, and you get messages every second, then after that first 10s you could batch all messages waiting in the queue.

And to do this, you can't use the native MQTT async functions.  You have to use the original code I gave, which runs it in a thread and adds it to a thread-safe queue.

 

It's a real shame Meraki dropped support for node.js.


Why do you need to queue them?

you could also use asyncio.ensure_future and let it do it's work. With await you are just blocking & waiting for it to be finished, but you aren't doing anything with the result, therefore you can just let it run.

 

from your asyncio-mqtt example:

 

 

import asyncio
from asyncio_mqtt import Client, MqttError

# Sample async function to be called
async def processSnapshot(camera_serial):
    print("processSnapshot")
    await asyncio.sleep(1)

# This gets called whenever when we get an MQTT message
async def mqtt_on_message(msg):
    ...
    data = json.loads(msg.payload.decode('utf-8'))
    split_topic = msg.topic.split("/", 3)
    ...
    return await processSnapshot(camera_serial)


async def main():
    ...
    async with Client(hostname="127.0.0.1",port=1883,username=MQTT_USERNAME,password=MQTT
_PASSWORD) as client:
        async with client.filtered_messages("/merakimv/#") as messages:
            await client.subscribe("/merakimv/#")
            async for message in messages:
                asyncio.ensure_future(mqtt_on_message(message)) #I've just changed this line of code

if __name__ == "__main__":
    # execute only if run as a script
    asyncio.run(main())

 

 

you could also use asyncio.create_task as they would basically do the same thing here.
personally I prefer to use ensure_future for tasks on which I don't care about the result, and create_task, when I want to "await" them later

PhilipDAth
Kind of a big deal
Kind of a big deal

You are a star!

 

What I want is that when an MQTT message comes in, it starts getting processed regardless of what else the system is doing.

 

I had played with creating tasks, but I just couldn't get the above outcome to happen reliably.  I also thought you had to collect the task ID that it produces for later cleanup.

 

The ensure_future now makes it work perfectly.  For me, this makes asyncio-mqtt the hands-down winner for processing MQTT messages in Python.

Greenberet
Head in the Cloud

oh and a little warning!

when "main" is finished, and you are still having tasks/futures active, then they will be cancelled.

 

so you might want to do a little await asyncio.sleep(10) at the end of the main function. just to be sure, that every future is finished.

PhilipDAth
Kind of a big deal
Kind of a big deal

I never intend for the script to ever complete execution.

 

I've wrapped it in a little loop now.

 

# The main code starts running from here
async def mqtt_loop():
    async with Client(hostname="127.0.0.1",port=1883,username=MQTT_USERNAME,password=MQTT
_PASSWORD) as client:
        async with client.unfiltered_messages() as messages:
            # Subscribe to add the cameras we are interested in
            for camera_serial in inventory:
                print(f"Subscribing to camera {camera_serial}")
                await client.subscribe(f"/merakimv/{camera_serial}/0")

            async for message in messages:
                asyncio.ensure_future(mqtt_on_message(message))


async def main():
    reconnect_interval = 3
...
    # Start the main loop.  If an error happens, reconnect
    while True:
        try:
            await mqtt_loop()
        except MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
        finally:
            await asyncio.sleep(reconnect_interval)


if __name__ == "__main__":
    # Execute only if run as a script
    asyncio.run(main())
Get notified when there are additional replies to this discussion.