In any distributed service solution, you have many options for enabling services to communicate with each other. In the python world, you could use HTTP requests and the Flask framework. Other options include gRPC, XML web services, or raw sockets. In some of my personal Python hacking, I wanted to experiment with longer running Python jobs. This post provides some motivations for communication in distributed systems using message queues. I’m hoping this pattern will help machine learning jobs, IoT data captures, or data transformation processes.
https://blog.iron.io/introduction-to-message-queue-architecture/
Install Python tools
In your working folder, let’s start by installing tools.
In this tutorial, we’re using natural language toolkit(nltk) to execute part of speech tagging of sentences. We’re using the pandas data library to manage simple tables of data. For this recipe, we will use RabbitMQ and the related pika library to provide inter-process communication.
pip install nltk
python -m nltk.downloader popular
pip install pandas
pip install pika
Install RabbitMQ using docker-compose
Using the following docker-compose file, we can start a test instance of RabbitMQ
Make sure that you have installed Docker compose tools properly.
version: "3.2"
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: 'rabbitmq'
ports:
- 5672:5672
- 15672:15672
volumes:
- ~/.docker-conf/rabbitmq/data/:/someDataFolderOnHostComputer/
- ~/.docker-conf/rabbitmq/log/:/someDataFolderOnHostComputer
networks:
- rabbitmq_go_net
networks:
rabbitmq_go_net:
driver: bridge
Let’s make test data.
For this exercise, let’s make a test data file called “data.json.”
[
{"sentence": "Cheese burgers and ham are yummy!", "foo": 42},
{"sentence": "We don't talk about bruno", "foo": 42},
{"sentence": "I see you!", "foo": 42}
]
How to send data for processing
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
queue = "input_queue"
file = open('data.json','r')
json_text = file.read()
channel.queue_declare(queue=queue)
channel.basic_publish(exchange='', routing_key=queue, body=json_text)
print(" [x] Sent json")
connection.close()
Using the pika library, we start a connection to the RabbitMQ message on localhost. After reading the input data (data.json) into a variable, we send a message to RabbitMQ with the data. In the send operation, we needed to specify the queue name, an empty exchange value, and the message body.
Receiving data and processing it
#!/usr/bin/env python
import pandas as pd
import json
import nltk
import pika, sys, os
input_queue = "input_queue"
output_queue = "output_queue"
def processSentencesJson(json_text):
df = pd.read_json(json_text)
results = []
for index, row in df.iterrows():
sentence = row[0]
tokens = nltk.word_tokenize(sentence)
tagged = nltk.pos_tag(tokens)
results.append(tagged)
return results
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue=input_queue)
def callback(ch, method, properties, body):
print(" [x] Received json")
results = processSentencesJson(body)
channel.basic_publish(exchange='', routing_key=output_queue, body=json.dumps(results))
channel.basic_consume(queue=input_queue, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
In the receiving python process, we start things off by naming the input and output message queue names.
input_queue = "input_queue"
output_queue = "output_queue"
In the main function, we establish a connection to the RabbitMQ. We also define a call back function to execute when we receive a message on the input queue. In the call back function, we execute the “processSentencesJson” function and return results to the output message queue.
I do admire the brief and concise nature of this solution.
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue=input_queue)
def callback(ch, method, properties, body):
print(" [x] Received json")
results = processSentencesJson(body)
channel.basic_publish(exchange='', routing_key=output_queue, body=json.dumps(results))
channel.basic_consume(queue=input_queue, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
In the following function, we accept a JSON string of sentences. Using the pandas library, we convert the JSON string into a data frame structure. We loop over the data frame rows and execute part of speech tagging using NLTK. For each iteration of the loop, we contribute tagged sentences into the results array.
def processSentencesJson(json_text):
df = pd.read_json(json_text)
results = []
for index, row in df.iterrows():
sentence = row[0]
tokens = nltk.word_tokenize(sentence)
tagged = nltk.pos_tag(tokens)
print(tagged)
results.append(tagged)
return results
The system will execute part of speech tagging for each sentence using the following format.
[('We', 'PRP'), ('do', 'VBP'), ("n't", 'RB'), ('talk', 'VB'), ('about', 'IN'), ('bruno', 'NN')]
Photo credit to Rgourley on Flickr
Leave a Reply