Multiprocess queue blocked? #1474
-
I'm trying to continuously consume messages and send them in batch to a client, but the batch upload may take longer than it takes to receive messages and we're expecting a throughput of 600K+ messages per day. I'm looking to use a multiprocessing queue to avoid losing information as opposed to avoiding duplicates, which is why I don't auto-commit. If that's bad practice, please let me know. It looks like I'm always getting None values even though I receive messages just fine if I have this all in a single for-loop. This also works well if I use the kafka-python module, but that's because the consumer works like an iterable, but I'd rather use this module for speed. """
Continuously consume and process while consuming
"""
import confluent_kafka
import json
import logging
import logging.config
import multiprocessing
import queue
import sys
import time
# in-house
import config
import _deserialize
logging.basicConfig(level = logging.DEBUG)
def get_messages(instance_queue):
logging.info("Calling consumer")
while True:
message = instance_consumer.poll(1.0)
if message is None:
continue
if message.value() is None:
continue
if message.error():
logging.exception(message.error(), exc_info=True)
continue
logging.debug(message)
message_decoded = _deserialize.deserialize_json(message.value())
if message_decoded is None:
continue
logging.debug(f"Topic Name={message.topic()}, Message={message_decoded['id']}")
instance_queue.put(message_decoded)
def process_messages(instance_queue):
""" Send to outbound client """
while True:
try:
message = instance_queue.get(timeout=1)
except queue.Empty:
pass
else:
# do stuff with messages
# Run Kakfa =======================================================================================
def callback_error(data):
logging.error(data)
time.sleep(3) # server has time to run
instance_consumer = confluent_kafka.Consumer({
'bootstrap.servers': ",".join(config.dict_setup_aws['servers']),
'auto.offset.reset': 'earliest',
'group.id' : 'Client-Consumer',
'error_cb': callback_error,
'enable.auto.commit': False
# 'auto.commit.interval.ms' : 30 * 1000, # adjust later
})
instance_consumer.subscribe(config.dict_setup_aws['topics'])
q = multiprocessing.Queue()
process_consume = multiprocessing.Process(target=get_messages, args=[q])
process_consume.start()
process_upload = multiprocessing.Process(target=process_messages, args=[q])
process_upload.start()
process_consume.join()
process_upload.join()
instance_consumer.close()
sys.exit() The logs just hang at "Calling consumer" |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Client instances can't be used across forks, and from a superficial glance it looks like multiprocessing will call get_messages from multiple forked sub-processes. |
Beta Was this translation helpful? Give feedback.
Client instances can't be used across forks, and from a superficial glance it looks like multiprocessing will call get_messages from multiple forked sub-processes.
I would recommend polling messages in the main process and passing their key,payload,topic,partition,offset to the child processes.