This is about producers, consumers and the cluster itself, i.e. topic partitions, replicas
etc.
It's not about e.g. Kafka Connect, Kafka Streams or 3rd party client libraries like
spring-kafka or reactor-kafka.
Presented By - Deepak Sood
Kafka is an event streaming platform, combining three key capabilities:
num.partitions=3
– default number of log
partitions per topic (default: 1)default.replication.factor=3
– default
replication factors for automatically created topics (Kafka default: 1, MSK default: 3
for 3-AZ clusters, 2 for 2-AZ clusters)log.retention.hours
– time to keep a log
file (default: 168 = 7d)auto.create.topics.enable
– allow automatic
topic creation on the broker when subscribing to or assigning a topic (Kafka default:
true, MSK default: false)source: Kafka config / MSK default config
$ bin/kafka-topics.sh --create --topic my-topic \
--partitions 5 \
--replication-factor 3 \
--config retention.ms=864000000
min.insync.replicas
– the minimum number of
replicas that must acknowledge a write to be considered successful, if used with
"acks=all
" (Kafka default: 1, MSK default: 2 for 3-AZ clusters, 1 for 2-AZ
clusters)replication factor 3
,
min.insync.replicas=2
,
and produce with acks=all
replication.factor
– the more replicas, the more fault-tolerant the
partition ismin.insync.replicas
– helps prevent data loss in case of leader
failureunclean.leader.election.enable
– controls whether the leader can be
elected from an out-of-sync replica
– if true
, availability is preferred over consistency/durability
(Kafka default: false, MSK default: true)
bootstrap.servers
– list of host/port pairs for establishing the initial
connection to the cluster. Is used to discover the full set of servers/brokersmetadata.max.age.ms
– update metadata even if there's no leadership
change happening due to new brokers or topics / partitions (default: 5 min)
timestamp
is not provided, the producer will stamp the record with its
current timevalue
can be null
– tombstone for compacted topics
key.serializer
– Serializer class for key, e.g.
...StringSerializer
value.serializer
– Serializer class for value, e.g.
...ByteArraySerializer
Messages with the same key are written to the same partition – ordering is provided
The send()
method is asynchronous, it adds the record to a
buffer of pending record sends and immediately returns. This allows the producer to batch
together individual records for efficiency.
batch.size
– batch size in bytes, i.e. the upper bound of the batch size
to be sent (default: 16 kB)linger.ms
– time to give for batches to fill, to make batching more
likely even under moderate load (default: 0)buffer.memory
– the total amount of memory available for buffering. If
records are sent faster than they can be transmitted to the server this buffer space will be
exhausted (default: 32 MB)max.block.ms
– max time to block send()
calls when the
buffer space is exhausted, after which a TimeoutException
is thrown (default: 1
min)Batches are grouped by broker and sent if either batch.size
or linger.ms
is reached.
max.in.flight.requests.per.connection
controls the number of
messages to be sent without any acknowledgment (default: 5)
With max.in.flight.requests.per.connection > 1
there could be
gaps if a request fails
retries
– number of times to resend a failed request (default:
Integer.MAX_VALUE
)request.timeout.ms
– max time the client will wait for the response
(default: 30 sec)delivery.timeout.ms
– max time for send()
to complete
(default: 2 min)max.in.flight.requests.per.connection > 1
and
retries > 0
message reordering could occur
To allow retries
and prevent message reordering set
max.in.flight.requests.per.connection = 1
If the response gets lost, enabled retries
could lead to duplicate messages
(at-least-once delivery semantics)
enable.idempotence
– prevents message duplication and reordering for a
single producer (!),
to achieve exactly-once delivery semantics (default: true
for Kafka
3.x)
max.in.flight.requests.per.connection <= 5
(with message ordering preserved), retries > 0
and acks='all'
ConfigException
is thrownacks
– number of acknowledgments the producer requires the leader to have
received before considering a request complete (default: all
for Kafka 3.x)
acks=0
– do not wait for any acknowledgmentacks=1
– leader writes the record and responds without awaiting acks from
followersacks=all
– leader waits for min.insync.replicas
to
acknowledge.
This guarantees that the record will not be lost as long as at least one in-sync replica
remains alive
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
auto.offset.reset
– how to behave when no offsets have been committed,
or when a committed offset is no longer valid
(earliest
/latest
/none
, default: latest
)
enable.auto.commit=true
means that offsets are committed
automatically
with auto.commit.interval.ms
(default: 5 sec)poll()
before any subsequent calls, or before closing the consumer
(otherwise, the
committed offset could get ahead of the consumed position)commitSync
or commitAsync
an
offset
commitSync
: the consumer is blocked until the commit request returns
successfully,
including retries
commitAsync
: send the commit request and return immediately to process
the next message
or batch of messages
__consumer_offsets
offsets.retention.minutes
– retention period (default: 7 days)seek(TopicPartition, long)
on restart
fetch.max.wait.ms
– sets a maximum threshold for time-based
batching (default: 500)fetch.min.bytes
– sets a minimum threshold for size-based
batching (default: 1)
Ensure, that a single topic partition is processed by 1 consumer of the group.
A consumer group is identified by a group id.
The group coordinator is responsible for managing
a) group members and
b) their partition assignments.
partition.assignment.strategy
– a list of class names, ordered by
preference,
of supported partition assignment strategies to distribute partition ownership (default:
[RangeAssignor, CooperativeStickyAssignor]
)
RangeAssignor
, but allows upgrading to
CooperativeStickyAssignor
with just a single rolling bounce (removing
RangeAssignor
from the list)
(see also KIP-726
to change the default)
CooperativeStickyAssignor
should be preferred to get
cooperative rebalancing
session.timeout.ms
– max time that can pass without a heartbeat (default:
45 sec,
got increased from 10 sec for 3.0, see KIP-735)
close()
the consumer on shutdown to keep
processing latency low!
heartbeat.interval.ms
– time between heartbeats sent to the consumer
coordinator;
must be < session.timeout.ms
, recommended to be < 1/3 of it;
also used for discovering that a rebalance is in progress
(default: 3 sec)
max.poll.interval.ms
– max delay between invocations of
poll()
.
If exceeded, the consumer is considered failed and the group will rebalance (default: 5 min)
max.poll.records
– max number of records that poll()
will
return;
because the consumer will only join a rebalance inside poll()
,
reducing this config reduces the delay of a group rebalance (default: 500)
A collection of links that could be of interest