This is about (java) producers, consumers and the cluster itself, i.e. topic partitions, replicas etc.
It's not about e.g. Kafka Connect, Kafka Streams or 3rd party client libraries like spring-kafka or reactor-kafka.
(For Kafka 3.x)
Kafka is an event streaming platform, combining three key capabilities:
num.partitions=3
– default number of log partitions per topic (default: 1)default.replication.factor=3
– default replication factors for automatically created topics (Kafka default: 1, MSK default: 3 for 3-AZ clusters, 2 for 2-AZ clusters)log.retention.hours
– time to keep a log file (default: 168 = 7d)auto.create.topics.enable
– allow automatic topic creation on the broker when subscribing to or assigning a topic (Kafka default: true, MSK default: false)source: Kafka config / MSK default config
$ bin/kafka-topics.sh --create --topic my-topic \
--partitions 5 \
--replication-factor 3 \
--config retention.ms=864000000
min.insync.replicas
– the minimum number of replicas that must acknowledge a write to be considered successful, if used with "acks=all
" (Kafka default: 1, MSK default: 2 for 3-AZ clusters, 1 for 2-AZ clusters)replication factor 3
,
min.insync.replicas=2
,
and produce with acks=all
replication.factor
– the more replicas, the more fault-tolerant the partition ismin.insync.replicas
– helps prevent data loss in case of leader failureunclean.leader.election.enable
– controls whether the leader can be elected from an out-of-sync replica
– if true
, availability is preferred over consistency/durability (Kafka default: false, MSK default: true)
bootstrap.servers
– list of host/port pairs for establishing the initial connection to the cluster. Is used to discover the full set of servers/brokersmetadata.max.age.ms
– update metadata even if there's no leadership change happening due to new brokers or topics / partitions (default: 5 min)
timestamp
is not provided, the producer will stamp the record with its current timevalue
can be null
– tombstone for compacted topicskey.serializer
– Serializer class for key, e.g. ...StringSerializer
value.serializer
– Serializer class for value, e.g. ...ByteArraySerializer
Messages with the same key are written to the same partition – ordering is provided
The send()
method is asynchronous, it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
batch.size
– batch size in bytes, i.e. the upper bound of the batch size to be sent (default: 16 kB)linger.ms
– time to give for batches to fill, to make batching more likely even under moderate load (default: 0)buffer.memory
– the total amount of memory available for buffering. If records are sent faster than they can be transmitted to the server this buffer space will be exhausted (default: 32 MB)max.block.ms
– max time to block send()
calls when the buffer space is exhausted, after which a TimeoutException
is thrown (default: 1 min)Batches are grouped by broker and sent if either batch.size
or linger.ms
is reached.
max.in.flight.requests.per.connection
controls the number of messages to be sent without any acknowledgment (default: 5)
With max.in.flight.requests.per.connection > 1
there could be gaps if a request fails
retries
– number of times to resend a failed request (default: Integer.MAX_VALUE
)request.timeout.ms
– max time the client will wait for the response (default: 30 sec)delivery.timeout.ms
– max time for send()
to complete (default: 2 min)max.in.flight.requests.per.connection > 1
and retries > 0
message reordering could occur
To allow retries
and prevent message reordering set max.in.flight.requests.per.connection = 1
If the response gets lost, enabled retries
could lead to duplicate messages
(at-least-once delivery semantics)
enable.idempotence
– prevents message duplication and reordering for a single producer (!),
to achieve exactly-once delivery semantics (default: true
for Kafka 3.x)
max.in.flight.requests.per.connection <= 5
(with message ordering preserved), retries > 0
and acks='all'
ConfigException
is thrownacks
– number of acknowledgments the producer requires the leader to have received before considering a request complete (default: all
for Kafka 3.x)
acks=0
– do not wait for any acknowledgmentacks=1
– leader writes the record and responds without awaiting acks from followersacks=all
– leader waits for min.insync.replicas
to acknowledge.
This guarantees that the record will not be lost as long as at least one in-sync replica remains alive
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
auto.offset.reset
– how to behave when no offsets have been committed,
or when a committed offset is no longer valid (earliest
/latest
/none
, default: latest
)
enable.auto.commit=true
means that offsets are committed automatically
with auto.commit.interval.ms
(default: 5 sec)poll()
before any subsequent calls, or before closing the consumer (otherwise, the
committed offset could get ahead of the consumed position)commitSync
or commitAsync
an offset
commitSync
: the consumer is blocked until the commit request returns successfully,
including retries
commitAsync
: send the commit request and return immediately to process the next message
or batch of messages
__consumer_offsets
offsets.retention.minutes
– retention period (default: 7 days)seek(TopicPartition, long)
on restart
fetch.max.wait.ms
– sets a maximum threshold for time-based batching (default: 500)fetch.min.bytes
– sets a minimum threshold for size-based batching (default: 1)
Ensure, that a single topic partition is processed by 1 consumer of the group.
A consumer group is identified by a group id.
The group coordinator is responsible for managing
a) group members and
b) their partition assignments.
partition.assignment.strategy
– a list of class names, ordered by preference,
of supported partition assignment strategies to distribute partition ownership (default: [RangeAssignor, CooperativeStickyAssignor]
)
RangeAssignor
, but allows upgrading to
CooperativeStickyAssignor
with just a single rolling bounce (removing RangeAssignor
from the list)
(see also KIP-726 to change the default)
CooperativeStickyAssignor
should be preferred to get cooperative rebalancing
session.timeout.ms
– max time that can pass without a heartbeat (default: 45 sec,
got increased from 10 sec for 3.0, see KIP-735)
close()
the consumer on shutdown to keep processing latency low!
heartbeat.interval.ms
– time between heartbeats sent to the consumer coordinator;
must be < session.timeout.ms
, recommended to be < 1/3 of it;
also used for discovering that a rebalance is in progress
(default: 3 sec)
max.poll.interval.ms
– max delay between invocations of poll()
.
If exceeded, the consumer is considered failed and the group will rebalance (default: 5 min)
max.poll.records
– max number of records that poll()
will return;
because the consumer will only join a rebalance inside poll()
,
reducing this config reduces the delay of a group rebalance (default: 500)
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("transactional.id", "producer-1");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
// ...
producer.beginTransaction();
// ...
producer.commitTransaction();
transactional.id
is specified, all messages sent by the producer must be part of a transaction
try {
producer.beginTransaction();
producer.send(topic1Record);
producer.send(topic2Record);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException) {
// We can't recover, so our only option is to close and exit
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again
producer.abortTransaction();
}
All messages sent between beginTransaction()
and commitTransaction()
will be part of a single transaction
isolation.level
must be set to read_committed
(default: read_uncommitted
):
poll()
will only return transactional messages which have been committed
read_uncommitted
, poll()
will return all messages, even transactional
messages which have been aborted
KafkaConsumer<String, String> c = new KafkaConsumer<>(consProps);
KafkaProducer<String, String> p = new KafkaProducer<>(prodProps);
p.initTransactions();
while(true) {
ConsumerRecords<String, String> inputs = c.poll(100);
p.beginTransaction();
process(inputs).forEach(p::send);
sendOffsetsResult = p.sendOffsetsToTransaction(
offsets(inputs), c.groupMetadata()
);
p.endTransaction();
}
sendOffsetsToTransaction
– sends a list of specified offsets to the consumer group coordinator,
and also marks those offsets as part of the current transactionlastProcessedMessageOffset + 1
enable.auto.commit=false
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("transactional.id", stableUniqueId);
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
// ...
transactional.id
– unique id used for transactional delivery, is used to provide
continuity of transactional state across application restarts. I.e. when taking zombie instances (like network
partitions) as a given, for exactly-once semantics the transactional.id
must be stable for
multiple producer sessions
initTransactions()
– Ensures any transactions initiated by previous instances of the
producer with the same transactional.id
are completed. If the previous instance had failed
with a transaction in progress, it will be aborted. If the last transaction had begun completion, but not
yet finished, this method awaits its completion
Usually a more robust approach is to use at-least-once semantics and let consumers deduplicate.
Further reading
A collection of links that could be of interest