confluent-kafka
Confluent kafka-python
pip install confluent-kafka
pip install "confluent-kafka [avro]"
Consumer
from confluent_kafka import Consumer, KafkaError
consumer_config = {
'bootstrap.servers': 'my-cluster-kafka-brokers.kafka:9092',
'partition.assignment.strategy': 'roundrobin',
'group.id': 'test_bank_data_consumer',
'auto.offset.reset': 'earliest', # earliest/latest
'enable.auto.commit': 'false',
# for limiting the amount of messages pre-fetched by librdkafka
'queued.max.messages.kbytes': '32000',
'fetch.message.max.bytes': '15728640',
}
c = Consumer(consumer_config)
# callbacks
def print_on_assign(consumer, partitions):
logging.info(f'Assignment: {partitions}')
for partition in partitions:
logging.info(f'watermark: {c.get_watermark_offsets(partition=partition)}')
logging.info(f'committed offsets for all partitions: {c.committed(partitions=partitions)}')
logging.info(f'position: {c.position(partitions=partitions)}')
def print_on_revoke(consumer, partitions):
logging.info(f'Revoke Assignment: {partitions}')
c.subscribe(['bank_data'], on_assign=print_on_assign, on_revoke=print_on_revoke)
timeout_seconds = 1
while True:
msg = c.poll(1.0)
# initial error handling
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logging.error(f'druid consumer error: {msg.error()}')
break
logging.debug(f'{msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
try:
# get value from message and convert bytes
final_data = msg.value()
final_data = json.loads(final_data.decode('utf-8'))
c.commit()
except Exception as e:
try:
logging.error(f'data/msg: {msg.value()}')
except Exception:
logging.exception(f'cannot print data')
logging.exception(
f'global exception occurred, Will not attempt for another {timeout_seconds} seconds.')
else:
continue
# exponential back-off if exception occurred
time.sleep(timeout_seconds)
timeout_seconds *= 2
Producer
from confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'my-cluster-kafka-brokers.kafka:9092',
'queue.buffering.max.messages': '1000000',
'queue.buffering.max.kbytes': '1048576',
'message.max.bytes': '15728640',
'delivery.timeout.ms': '10000',
'request.timeout.ms': '5000'
})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
# raise error and handle using exception
logging.exception(f'kafka deliver_report error: {err}')
else:
logging.debug(f'Message delivered topic: {msg.topic()} partition: {msg.partition()} offset: {msg.offset()}')
p.produce('bank_data', json.dumps(payload), callback=delivery_report)
p.flush()
Others
Resources
- https://github.com/confluentinc/confluent-kafka-python
- https://docs.confluent.io/current/clients/confluent-kafka-python
- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION
- https://towardsdatascience.com/3-libraries-you-should-know-to-master-apache-kafka-in-python-c95fdf8700f2
- Configure Kafka clients for LDAP Authentication in Confluent Platform | Confluent Documentation
- Confluent Cloud Cluster Types | Confluent Documentation