Python microservice for MQTT

Hello,

our python middleware is receiving around 10-15 MQTT messages per minute, while the expectation is around 166 MQTT messages per minute now (500 tags sending data every 3 minutes) and this number will scale more to reach around 3333 MQTT messages per minute (10000 tags) with time that is why we are exploring multi-treading as a way to overcome a potential slowness causing the issue.

is the logic below correct?
//////////////////////////////////////////////////////////////////////

from concurrent.futures import ThreadPoolExecutor

# ThreadPoolExecutor to handle message processing concurrently

executor = ThreadPoolExecutor(max_workers=X) # Adjust max_workers as needed

def process_message(message):

# process_message contains the actual python code handling a new MQTT message

def on_message(client, userdata, message):
# Submit message processing to the thread pool
executor.submit(process_message, message)

///////////////////////////////////////////////////////////////////////////////////////

concerning “max_workers=X” - how do we choose X? based on the expected 3333 MQTT messages per minute for now and maybe more in the future? and for this microservice, what CPU/Memory allocation should we ask for? is 4 vcpu and 12 GB RAM enough?

////////////////////////////////////////////////////////

Hello @bdm,

I don’t know the details of your use case but even the maximum rate you mention seems quite low (3,333 msg/min is only 55.5 msg/sec), so unless I’m missing something really fundamental about what you’re trying to do, moving to a multi-threaded architecture might be overkill. Likewise the resource requirements you mention feel like they are at least order of magnitude too large for this message rate.

Are you able to share some more information about your use case? For example, what clients are publishing and subscribing to the middleware? How large are the messages? Are you doing some very expensive per-message processing that could impact the throughput? Are you seeing queues/backlogs building up in the middleware?

Best regards,

Scott

1 Like

Hi Scott, the middleware is simple and the messages are small, receiving MQTT messages which are very small from an MQTT broker. We are trying to free the on_message thread as much as possible to be free to receive new messages so multi-treading seemed a good approach to test that is why multi-treading for process_message is being tested and process_message is called from on_message

Hi, sure, I understand the motivation for keeping processing off the receiver thread. However, my point was that even at 3,333 msg/min (55.5 msg/sec), you still have a whole second available to process each message, so you are unlikely to need a huge thread pool. My advice would be first to measure how much time you actually need to process each message. Then use that to determine how many messages you will need to process in parallel to keep up with your input rate, with some headroom for occasional spikes in load.

Also, it looks like you are using Paho, which will auto-ack QoS 1 messages when the on_message callback returns, even if your process_message is still running in the background. If that’s not the behaviour you want, you can set manual_ack=True on the client and explicitly acknowledge each message in process_message.

1 Like

thanks Scott, I would for sure prefer keeping a single thread if it is enough and optimize further the current logic if there is a room for improvement. Would you please elaborate more on the calculations? for 3,333 msg/min (55.5 msg/sec) why do we consider that we have a whole second to process each of those 55.5 msgs/sec?

Room for optimization: Lets break the on_message thread into some high level steps:

  1. Receiving the MQTT message from MQTT broker which contains external ID
  2. Since on Cumulocity we need internal ID to create the event, we need to perform a GET API call to get internal ID based on external ID. Cumulocity on Cloud, network delay 100ms, API call response time is around 600ms
  3. based on #2 (Internal ID) and based on MQTT message content to create an API call#2 (this time a POST) to create the event (so another 600ms)

so if the thread is busy with 1 MQTT message, the next MQTT message will have to wait for at least 1.2 seconds before processing the next MQTT message

what do you suggest?

now for auto-ack QOS1, yes I am using paho and QOS 1 is sent when subscribing
client.subscribe(“node_data_json”, qos=1)

Apparently I have lost the ability to do basic arithmetic. Thinking is hard and I somehow managed to confuse myself between msg/min and msg/sec. Obviously 55 msg/second gives you ~18ms per message, not a whole second. I do apologise for lowering the IQ of the whole conversation there :face_with_spiral_eyes:

Anyway, thanks for the explanation of what your middleware is doing, that really helps to understand the problem. So, if we assume 55 msg/sec and 1.2s processing time per message, you would need at least 55 * 1.2 = 66 threads to keep up. You will want some headroom so call it at least 80 in practice. That should work, but it seems pretty inefficient, and I think we can do better.

The first thing I would consider doing is to cache the external-to-internal ID mapping so that you don’t have to do a lookup on every message. That immediately halves the number of round-trips you need to make to the server.

Then I would look at using SmartREST to deliver your POST requests to Cumulocity. This would let you stream the requests into Cumulocity and avoid having to wait for a complete network round-trip on every request.

Finally, it’s not quite ready yet but within the next couple of weeks we plan to make the new Cumulocity MQTT Service publicly available on our public cloud instances. Assuming your devices use MQTT, this would let them connect directly to a new MQTT endpoint hosted by Cumulocity and continue to publish the same payloads on the same topics that they do now. Your existing logic for mapping those payloads to the Cumulocity API would move into a microservice hosted by Cumulocity. As I said it’s not publicly available yet but you can see the early-access documentation to learn more about it.

Thanks, and apologies again for my bad maths,

Scott

1 Like

Thanks Scott, no worries.

so our initial plan for optimization would be:

  1. To put the Cumulocity part in a separate function. Even I am thinking about creating a separate function for the GET and a separate one for the post (which would release the on_message sooner. What do you think?
  2. to implement multi-treading, I have shared in the first message the prepared code for it, is it correct? do you have any comments?
  3. to cache the external-to-internal ID mapping
  4. To explore Smart REST in order to post to cumulocity API a stream instead of 1 by 1

Hi Scott,
currently, even after multi-treading we are facing a very weird problem, the broker is generating around 120 MQTT messages/minute, while on the middleware( based on python using paho) we are seeing around 45 MQTT messages/minute received and sometimes we see a group of 4 messages/second and sometimes 1 message/second. We tried to put QOS=0 instead of QOS=1 and same result, what is weird is that even if the python middleware is causing some delay, with QOS=1 re-transmission should occur but in reality some MQTT messages are completely lost. To troubleshoot, I completely removed the part related to Cumulocity and kept only the MQTT subscriber part, nothing changed still not exceeding the 45 MQTT messages/minute, what do you think about it? what could be the reason? from MQTT broker side?

do I need to set something related to CPU/memory for this python application inside the python code? or no need?

hi Scott, kind reminder

Hi Scott kind reminder

Hi @bdm,

That’s weird, but nothing comes immediately to mind. Have you tried removing the multi-threading implementation to check that a really simple client can actually receive all the messages the broker is sending? As I mentioned before, your message rate is really very small, so without the high latency calls into Cumulocity your client should have no problem keeping up.

By the way, the plan you presented in an earlier message broadly makes sense. I’m not personally familiar with the concurrent.futures Python library but based on a quick skim of the documentation I think it will do what you need.

Thanks,

Scott

ok thanks, any recommended documentations/references for moving from REST (now Async rest) to smart REST?

and in the meantime before implementing smart REST, I am seeing sometimes on the middleware the following log related to max retries for URL, what is the default maximum? and is it possible to tune it?

Hi @bdm,

I would start here for SmartRest docs: Introduction - Cumulocity documentation.

Thanks,

Scott