This is about producers, consumers and the cluster itself, i.e. topic partitions, replicas
etc.
It's not about e.g. Kafka Connect, Kafka Streams or 3rd party client libraries like
spring-kafka or reactor-kafka.
Presented By - Deepak Sood
Kafka is an event streaming platform, combining three key capabilities:
num.partitions=3 – default number of log
partitions per topic (default: 1)default.replication.factor=3 – default
replication factors for automatically created topics (Kafka default: 1, MSK default: 3
for 3-AZ clusters, 2 for 2-AZ clusters)log.retention.hours – time to keep a log
file (default: 168 = 7d)auto.create.topics.enable – allow automatic
topic creation on the broker when subscribing to or assigning a topic (Kafka default:
true, MSK default: false)source: Kafka config / MSK default config
$ bin/kafka-topics.sh --create --topic my-topic \
--partitions 5 \
--replication-factor 3 \
--config retention.ms=864000000
min.insync.replicas – the minimum number of
replicas that must acknowledge a write to be considered successful, if used with
"acks=all" (Kafka default: 1, MSK default: 2 for 3-AZ clusters, 1 for 2-AZ
clusters)replication factor 3,
min.insync.replicas=2,
and produce with acks=all
replication.factor – the more replicas, the more fault-tolerant the
partition ismin.insync.replicas – helps prevent data loss in case of leader
failureunclean.leader.election.enable – controls whether the leader can be
elected from an out-of-sync replica
– if true, availability is preferred over consistency/durability
(Kafka default: false, MSK default: true)
bootstrap.servers – list of host/port pairs for establishing the initial
connection to the cluster. Is used to discover the full set of servers/brokersmetadata.max.age.ms – update metadata even if there's no leadership
change happening due to new brokers or topics / partitions (default: 5 min)
timestamp is not provided, the producer will stamp the record with its
current timevalue can be null – tombstone for compacted topics
key.serializer – Serializer class for key, e.g.
...StringSerializervalue.serializer – Serializer class for value, e.g.
...ByteArraySerializerMessages with the same key are written to the same partition – ordering is provided
The send() method is asynchronous, it adds the record to a
buffer of pending record sends and immediately returns. This allows the producer to batch
together individual records for efficiency.
batch.size – batch size in bytes, i.e. the upper bound of the batch size
to be sent (default: 16 kB)linger.ms – time to give for batches to fill, to make batching more
likely even under moderate load (default: 0)buffer.memory – the total amount of memory available for buffering. If
records are sent faster than they can be transmitted to the server this buffer space will be
exhausted (default: 32 MB)max.block.ms – max time to block send() calls when the
buffer space is exhausted, after which a TimeoutException is thrown (default: 1
min)Batches are grouped by broker and sent if either batch.size
or linger.ms is reached.
max.in.flight.requests.per.connection controls the number of
messages to be sent without any acknowledgment (default: 5)
With max.in.flight.requests.per.connection > 1 there could be
gaps if a request fails
retries – number of times to resend a failed request (default:
Integer.MAX_VALUE)request.timeout.ms – max time the client will wait for the response
(default: 30 sec)delivery.timeout.ms – max time for send() to complete
(default: 2 min)max.in.flight.requests.per.connection > 1 and
retries > 0 message reordering could occur
To allow retries and prevent message reordering set
max.in.flight.requests.per.connection = 1
If the response gets lost, enabled retries could lead to duplicate messages
(at-least-once delivery semantics)
enable.idempotence – prevents message duplication and reordering for a
single producer (!),
to achieve exactly-once delivery semantics (default: true for Kafka
3.x)
max.in.flight.requests.per.connection <= 5
(with message ordering preserved), retries > 0 and acks='all'
ConfigException is thrownacks – number of acknowledgments the producer requires the leader to have
received before considering a request complete (default: all for Kafka 3.x)
acks=0 – do not wait for any acknowledgmentacks=1 – leader writes the record and responds without awaiting acks from
followersacks=all – leader waits for min.insync.replicas to
acknowledge.
This guarantees that the record will not be lost as long as at least one in-sync replica
remains alive
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
auto.offset.reset – how to behave when no offsets have been committed,
or when a committed offset is no longer valid
(earliest/latest/none, default: latest)
enable.auto.commit=true means that offsets are committed
automatically
with auto.commit.interval.ms (default: 5 sec)poll() before any subsequent calls, or before closing the consumer
(otherwise, the
committed offset could get ahead of the consumed position)commitSync or commitAsync an
offset
commitSync: the consumer is blocked until the commit request returns
successfully,
including retries
commitAsync: send the commit request and return immediately to process
the next message
or batch of messages
__consumer_offsetsoffsets.retention.minutes – retention period (default: 7 days)seek(TopicPartition, long) on restart
fetch.max.wait.ms – sets a maximum threshold for time-based
batching (default: 500)fetch.min.bytes – sets a minimum threshold for size-based
batching (default: 1)
Ensure, that a single topic partition is processed by 1 consumer of the group.
A consumer group is identified by a group id.
The group coordinator is responsible for managing
a) group members and
b) their partition assignments.
partition.assignment.strategy – a list of class names, ordered by
preference,
of supported partition assignment strategies to distribute partition ownership (default:
[RangeAssignor, CooperativeStickyAssignor])
RangeAssignor, but allows upgrading to
CooperativeStickyAssignor with just a single rolling bounce (removing
RangeAssignor from the list)
(see also KIP-726
to change the default)
CooperativeStickyAssignor should be preferred to get
cooperative rebalancing
session.timeout.ms – max time that can pass without a heartbeat (default:
45 sec,
got increased from 10 sec for 3.0, see KIP-735)
close() the consumer on shutdown to keep
processing latency low!
heartbeat.interval.ms – time between heartbeats sent to the consumer
coordinator;
must be < session.timeout.ms, recommended to be < 1/3 of it;
also used for discovering that a rebalance is in progress
(default: 3 sec)
max.poll.interval.ms – max delay between invocations of
poll().
If exceeded, the consumer is considered failed and the group will rebalance (default: 5 min)
max.poll.records – max number of records that poll() will
return;
because the consumer will only join a rebalance inside poll(),
reducing this config reduces the delay of a group rebalance (default: 500)
A collection of links that could be of interest