I was really hoping for some pearls of wisdom from you @Greenberet . 🙂
I can't use NodeRed for this particular requirement as it is multi-tenant, and the complete lack of security in NodeRed makes it unsuitable.
Since you added the async support to the Meraki library it has made so many things possible that I had previously written it off for. I really want to try and swim with the flow that the Meraki API team has created.
I have done my first splash with Paho. Let me share my learnings:
- Paho uses a separate thread for all callbacks, such as receiving a message.
- Many many Python libraries are not thread safe.
- Some data types, like arrays, are not thread safe.
- The only thing you want to do in the callback is use the "queue" module, do any fast processing and then put the data into the queue, and then process later. The "queue" module is thread-safe.
- asyncio.queue is not thread-safe.
- You can't mix async functions between threads even if they are threadsafe. For example, you can't call an async function in one thread (such as the Paho message callback), and then await for that in the main thread.
- Make sure you use the "global" keyword in the callback functions if there is any chance you might be updating something. I used it on everything I referenced globally to be safe. Be warned again, some datatypes are not thread safe.
- The way Paho works, it really encourages you to use threads for everything. But with so many things not being thread safe, you could really only use this approach on the smallest of projects. If you want to use asyncio with anything - DANGER.
This is the jist of how to make it work:
import os,asyncio
import paho.mqtt.client as mqtt
from queue import Queue
# Sample async function to be used to processing inbound MQTT messages
async def processSnapshot(camera_serial):
await asyncio.sleep(1)
# This gets called whenever we connect to the MQTT broker server
def mqtt_on_connect(client, userdata, flags, rc):
print("Connected to MQTT broker with result code "+str(rc))
client.subscribe("/merakimv/#")
# This gets called whenever get get an MQTT message
def mqtt_on_message(client, userdata, msg):
global xxx # Reference anything you might update that is a global variable
print(msg.topic+" "+str(msg.payload))
data = json.loads(msg.payload.decode('utf-8'))
split_topic = msg.topic.split("/", 3)
...
q.put(data); # Add to the queue whatever you want it to process
async def main():
...
mqtt_client=mqtt.Client("name-of-your-client")
mqtt_client.username_pw_set(MQTT_USERNAME,MQTT_PASSWORD)
mqtt_client.on_connect=mqtt_on_connect
mqtt_client.on_message=mqtt_on_message
mqtt_client.connect("your-server",port=1883,keepalive=60)
mqtt_client.loop_start()
# In this case, loop forever. q.get() will block if queue is empty.
while True:
await processSnapshot(q.get())
if __name__ == "__main__":
# execute only if run as a script
asyncio.run(main())
The plus side of this approach is it *may* be able to use two CPU cores, one for the callbacks when you receive a message, and then another for the processing of the messages. So this approach may scale better if you have a large number of incoming messages AND if you can do safely do some kind of pre-processing on those messages in that same thread. If all you do is receive the message and then put it into the queue - it has to be slower. I think I would only do pre-processing that involved only your code and no libraries to be confident you weren't going to run into thread-safety issues.