This is about (java) producers, consumers and the cluster itself, i.e. topic partitions, replicas etc.
It's not about e.g. Kafka Connect, Kafka Streams or 3rd party client libraries like spring-kafka or reactor-kafka.
(For Kafka 3.x)
Kafka is an event streaming platform, combining three key capabilities:
num.partitions=3 – default number of log partitions per topic (default: 1)default.replication.factor=3 – default replication factors for automatically created topics (Kafka default: 1, MSK default: 3 for 3-AZ clusters, 2 for 2-AZ clusters)log.retention.hours – time to keep a log file (default: 168 = 7d)auto.create.topics.enable – allow automatic topic creation on the broker when subscribing to or assigning a topic (Kafka default: true, MSK default: false)source: Kafka config / MSK default config
$ bin/kafka-topics.sh --create --topic my-topic \
--partitions 5 \
--replication-factor 3 \
--config retention.ms=864000000
min.insync.replicas – the minimum number of replicas that must acknowledge a write to be considered successful, if used with "acks=all" (Kafka default: 1, MSK default: 2 for 3-AZ clusters, 1 for 2-AZ clusters)replication factor 3,
min.insync.replicas=2,
and produce with acks=all
replication.factor – the more replicas, the more fault-tolerant the partition ismin.insync.replicas – helps prevent data loss in case of leader failureunclean.leader.election.enable – controls whether the leader can be elected from an out-of-sync replica
– if true, availability is preferred over consistency/durability (Kafka default: false, MSK default: true)
bootstrap.servers – list of host/port pairs for establishing the initial connection to the cluster. Is used to discover the full set of servers/brokersmetadata.max.age.ms – update metadata even if there's no leadership change happening due to new brokers or topics / partitions (default: 5 min)
timestamp is not provided, the producer will stamp the record with its current timevalue can be null – tombstone for compacted topicskey.serializer – Serializer class for key, e.g. ...StringSerializervalue.serializer – Serializer class for value, e.g. ...ByteArraySerializerMessages with the same key are written to the same partition – ordering is provided
The send() method is asynchronous, it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
batch.size – batch size in bytes, i.e. the upper bound of the batch size to be sent (default: 16 kB)linger.ms – time to give for batches to fill, to make batching more likely even under moderate load (default: 0)buffer.memory – the total amount of memory available for buffering. If records are sent faster than they can be transmitted to the server this buffer space will be exhausted (default: 32 MB)max.block.ms – max time to block send() calls when the buffer space is exhausted, after which a TimeoutException is thrown (default: 1 min)Batches are grouped by broker and sent if either batch.size or linger.ms is reached.
max.in.flight.requests.per.connection controls the number of messages to be sent without any acknowledgment (default: 5)
With max.in.flight.requests.per.connection > 1 there could be gaps if a request fails
retries – number of times to resend a failed request (default: Integer.MAX_VALUE)request.timeout.ms – max time the client will wait for the response (default: 30 sec)delivery.timeout.ms – max time for send() to complete (default: 2 min)max.in.flight.requests.per.connection > 1 and retries > 0 message reordering could occur
To allow retries and prevent message reordering set max.in.flight.requests.per.connection = 1
If the response gets lost, enabled retries could lead to duplicate messages
(at-least-once delivery semantics)
enable.idempotence – prevents message duplication and reordering for a single producer (!),
to achieve exactly-once delivery semantics (default: true for Kafka 3.x)
max.in.flight.requests.per.connection <= 5
(with message ordering preserved), retries > 0 and acks='all'ConfigException is thrownacks – number of acknowledgments the producer requires the leader to have received before considering a request complete (default: all for Kafka 3.x)
acks=0 – do not wait for any acknowledgmentacks=1 – leader writes the record and responds without awaiting acks from followersacks=all – leader waits for min.insync.replicas to acknowledge.
This guarantees that the record will not be lost as long as at least one in-sync replica remains alive
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
auto.offset.reset – how to behave when no offsets have been committed,
or when a committed offset is no longer valid (earliest/latest/none, default: latest)
enable.auto.commit=true means that offsets are committed automatically
with auto.commit.interval.ms (default: 5 sec)poll() before any subsequent calls, or before closing the consumer (otherwise, the
committed offset could get ahead of the consumed position)commitSync or commitAsync an offset
commitSync: the consumer is blocked until the commit request returns successfully,
including retries
commitAsync: send the commit request and return immediately to process the next message
or batch of messages
__consumer_offsetsoffsets.retention.minutes – retention period (default: 7 days)seek(TopicPartition, long) on restart
fetch.max.wait.ms – sets a maximum threshold for time-based batching (default: 500)fetch.min.bytes – sets a minimum threshold for size-based batching (default: 1)
Ensure, that a single topic partition is processed by 1 consumer of the group.
A consumer group is identified by a group id.
The group coordinator is responsible for managing
a) group members and
b) their partition assignments.
partition.assignment.strategy – a list of class names, ordered by preference,
of supported partition assignment strategies to distribute partition ownership (default: [RangeAssignor, CooperativeStickyAssignor])
RangeAssignor, but allows upgrading to
CooperativeStickyAssignor with just a single rolling bounce (removing RangeAssignor from the list)
(see also KIP-726 to change the default)
CooperativeStickyAssignor should be preferred to get cooperative rebalancing
session.timeout.ms – max time that can pass without a heartbeat (default: 45 sec,
got increased from 10 sec for 3.0, see KIP-735)
close() the consumer on shutdown to keep processing latency low!
heartbeat.interval.ms – time between heartbeats sent to the consumer coordinator;
must be < session.timeout.ms, recommended to be < 1/3 of it;
also used for discovering that a rebalance is in progress
(default: 3 sec)
max.poll.interval.ms – max delay between invocations of poll().
If exceeded, the consumer is considered failed and the group will rebalance (default: 5 min)
max.poll.records – max number of records that poll() will return;
because the consumer will only join a rebalance inside poll(),
reducing this config reduces the delay of a group rebalance (default: 500)
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("transactional.id", "producer-1");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
// ...
producer.beginTransaction();
// ...
producer.commitTransaction();
transactional.id is specified, all messages sent by the producer must be part of a transaction
try {
producer.beginTransaction();
producer.send(topic1Record);
producer.send(topic2Record);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException) {
// We can't recover, so our only option is to close and exit
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again
producer.abortTransaction();
}
All messages sent between beginTransaction() and commitTransaction() will be part of a single transaction
isolation.level must be set to read_committed (default: read_uncommitted):
poll() will only return transactional messages which have been committed
read_uncommitted, poll() will return all messages, even transactional
messages which have been aborted
KafkaConsumer<String, String> c = new KafkaConsumer<>(consProps);
KafkaProducer<String, String> p = new KafkaProducer<>(prodProps);
p.initTransactions();
while(true) {
ConsumerRecords<String, String> inputs = c.poll(100);
p.beginTransaction();
process(inputs).forEach(p::send);
sendOffsetsResult = p.sendOffsetsToTransaction(
offsets(inputs), c.groupMetadata()
);
p.endTransaction();
}
sendOffsetsToTransaction – sends a list of specified offsets to the consumer group coordinator,
and also marks those offsets as part of the current transactionlastProcessedMessageOffset + 1
enable.auto.commit=false
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("transactional.id", stableUniqueId);
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
// ...
transactional.id – unique id used for transactional delivery, is used to provide
continuity of transactional state across application restarts. I.e. when taking zombie instances (like network
partitions) as a given, for exactly-once semantics the transactional.id must be stable for
multiple producer sessions
initTransactions() – Ensures any transactions initiated by previous instances of the
producer with the same transactional.id are completed. If the previous instance had failed
with a transaction in progress, it will be aborted. If the last transaction had begun completion, but not
yet finished, this method awaits its completion
Usually a more robust approach is to use at-least-once semantics and let consumers deduplicate.
Further reading
A collection of links that could be of interest