Kafka Fundamentals

This is about (java) producers, consumers and the cluster itself, i.e. topic partitions, replicas etc.
It's not about e.g. Kafka Connect, Kafka Streams or 3rd party client libraries like spring-kafka or reactor-kafka.

(For Kafka 3.x)

(if you find a potential improvement please submit a pull request here)

agenda

  1. High level overview
  2. Kafka cluster
  3. Producers
  4. Consumers
  5. Transactional Messaging

high level overview

what is kafka?

Kafka is an event streaming platform, combining three key capabilities:

  1. To publish (write) and subscribe to (read) streams of events
  2. To store streams of events durably and reliably for as long as you want
  3. To process streams of events as they occur or retrospectively

it's a log: an ordered, persistent sequence of events

log

events are organized in topics

topics

topics are split into partitions

partitions

producers write to topic partitions

producer

consumers read from topic partitions

consumer

that's it?

kafka cluster

replicated logs — leaders and replicas

leaders and replicas

replicated logs — broker (default) config

leaders and replicas
  • num.partitions=3 – default number of log partitions per topic (default: 1)
  • default.replication.factor=3 – default replication factors for automatically created topics (Kafka default: 1, MSK default: 3 for 3-AZ clusters, 2 for 2-AZ clusters)
  • log.retention.hours – time to keep a log file (default: 168 = 7d)
  • auto.create.topics.enable – allow automatic topic creation on the broker when subscribing to or assigning a topic (Kafka default: true, MSK default: false)

source: Kafka config / MSK default config

replicated logs — topic creation/config

leaders and replicas

$ bin/kafka-topics.sh --create --topic my-topic \
--partitions 5 \
--replication-factor 3 \
--config retention.ms=864000000

replicated logs — in-sync replicas (ISR)

insync-replicas
  • min.insync.replicas – the minimum number of replicas that must acknowledge a write to be considered successful, if used with "acks=all" (Kafka default: 1, MSK default: 2 for 3-AZ clusters, 1 for 2-AZ clusters)
  • A typical scenario would be replication factor 3, min.insync.replicas=2, and produce with acks=all

leader election

broker unavailable
  • Is triggered when a leader replica fails, or when a new replica is added to a partition
  • Controlled by the Kafka broker that acts as the (*active) controller for the cluster

leader election - related config

  • replication.factor – the more replicas, the more fault-tolerant the partition is
  • min.insync.replicas – helps prevent data loss in case of leader failure
  • unclean.leader.election.enable – controls whether the leader can be elected from an out-of-sync replica – if true, availability is preferred over consistency/durability (Kafka default: false, MSK default: true)

that's the cluster (?)

the java producer

producer flow

java producer overview

  • connect to broker(s) and cache metadata
  • send records to broker(s) / leaders
    1. serialize key/value
    2. select topic partition to write to
    3. maybe compress data
    4. buffer/accumulate messages
    5. send message batches

connect to broker(s) and cache metadata

  • bootstrap.servers – list of host/port pairs for establishing the initial connection to the cluster. Is used to discover the full set of servers/brokers
  • At least 2 are recommended for fault-tolerance
  • The bootstrap server returns list of all brokers in the cluster and all metadata like topics, partitions, partition leader, replication factor etc.
  • metadata.max.age.ms – update metadata even if there's no leadership change happening due to new brokers or topics / partitions (default: 5 min)
producer record

producer record

  • If timestamp is not provided, the producer will stamp the record with its current time
  • Headers are useful to additionally describe the message or value, e.g. to allow consumers to decide about the deserializer, or information useful for monitoring
  • Also value can be null – tombstone for compacted topics

serialize key/value

  • key.serializer – Serializer class for key, e.g. ...StringSerializer
  • value.serializer – Serializer class for value, e.g. ...ByteArraySerializer
  • Not covered here: Confluent Schema Registry

select topic partition

select partition

Messages with the same key are written to the same partition – ordering is provided

buffer/accumulate messages

producer buffer

The send() method is asynchronous, it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

buffer/accumulate messages (config)

producer buffer
  • batch.size – batch size in bytes, i.e. the upper bound of the batch size to be sent (default: 16 kB)
  • linger.ms – time to give for batches to fill, to make batching more likely even under moderate load (default: 0)
  • buffer.memory – the total amount of memory available for buffering. If records are sent faster than they can be transmitted to the server this buffer space will be exhausted (default: 32 MB)
  • max.block.ms – max time to block send() calls when the buffer space is exhausted, after which a TimeoutException is thrown (default: 1 min)

send message batches

producer sender thread

Batches are grouped by broker and sent if either batch.size or linger.ms is reached.

send message batches in parallel

producer parallel requests

max.in.flight.requests.per.connection controls the number of messages to be sent without any acknowledgment (default: 5)

message batch send failures

producer request fails

With max.in.flight.requests.per.connection > 1 there could be gaps if a request fails

handling send failures (retries)

producer request retries
  • retries – number of times to resend a failed request (default: Integer.MAX_VALUE)
  • request.timeout.ms – max time the client will wait for the response (default: 30 sec)
  • delivery.timeout.ms – max time for send() to complete (default: 2 min)
  • Note: with max.in.flight.requests.per.connection > 1 and retries > 0 message reordering could occur

handling send failures (retries)

producer request retries with guaranteed ordering

To allow retries and prevent message reordering set max.in.flight.requests.per.connection = 1

send failure - duplicates

producer send failure duplicates

If the response gets lost, enabled retries could lead to duplicate messages
(at-least-once delivery semantics)

idempotent producer

idempotent producer
  • enable.idempotence – prevents message duplication and reordering for a single producer (!), to achieve exactly-once delivery semantics (default: true for Kafka 3.x)
  • Requires max.in.flight.requests.per.connection <= 5 (with message ordering preserved), retries > 0 and acks='all'
  • If conflicting configurations are set and idempotence is not explicitly enabled, it is disabled. If explicitly enabled and conflicting configs are set, a ConfigException is thrown
  • Application level retries e.g. due to a app/producer crash could still lead to duplicates

message durability / consistency

producer acks
  • acks – number of acknowledgments the producer requires the leader to have received before considering a request complete (default: all for Kafka 3.x)
  • acks=0 – do not wait for any acknowledgment
  • acks=1 – leader writes the record and responds without awaiting acks from followers
  • acks=all – leader waits for min.insync.replicas to acknowledge. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive

that's the producer (?)

the java consumer

java consumer overview

  • connect to broker(s) and cache metadata
  • subscribe to topic(s)
  • join consumer group
  • continuously: poll records from broker(s) and commit offsets
							
								Properties props = new Properties();
								props.setProperty("bootstrap.servers", "localhost:9092");
								props.setProperty("group.id", "test");
								KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
								consumer.subscribe(Arrays.asList("foo", "bar"));
								while (true) {
									ConsumerRecords<String, String> records = consumer.poll(100);
									for (ConsumerRecord<String, String> record : records) {
										process(record);
									}
								}
							
						

consumer offsets

consumer offsets
  • committed offset: where to resume from in the event of a restart
  • auto.offset.reset – how to behave when no offsets have been committed, or when a committed offset is no longer valid (earliest/latest/none, default: latest)

consumer offset management

  • The default enable.auto.commit=true means that offsets are committed automatically with auto.commit.interval.ms (default: 5 sec)
    ⚠️ For at-least-once delivery you must consume all data returned from poll() before any subsequent calls, or before closing the consumer (otherwise, the committed offset could get ahead of the consumed position)
  • It's also possible to "manually" commitSync or commitAsync an offset
    • commitSync: the consumer is blocked until the commit request returns successfully, including retries
    • commitAsync: send the commit request and return immediately to process the next message or batch of messages

consumer offset storage

consumer committed offsets
  • Kafka stores offsets in a compacted topic called __consumer_offsets
  • offsets.retention.minutes – retention period (default: 7 days)
  • Alternative: when processing to an external system, the consumer's offset can be stored together with the output (atomically), to be used with seek(TopicPartition, long) on restart

consumer offset lags

consumer offset lag
  • With a growing consumer lag eventually messages could get lost
  • Increasing the log retention time could be a temporary workaround
  • Adding consumers (and/or partitions) could increase throughput
  • Increasing fetched batches could increase throughput (with the cost of latency)
    • fetch.max.wait.ms – sets a maximum threshold for time-based batching (default: 500)
    • fetch.min.bytes – sets a minimum threshold for size-based batching (default: 1)

consumer groups

Ensure, that a single topic partition is processed by 1 consumer of the group.
A consumer group is identified by a group id.

2 consumer groups

consumer group rebalancing

consumer group 1 consumer consumer group 2 consumer consumer group 3 consumer consumer group 4 consumer

The group coordinator is responsible for managing
a) group members and b) their partition assignments.

consumer group rebalancing

consumer group 1 consumer consumer group 2 consumer
  • In the past rebalancing was "stop the world", with Kafka 2.4 incremental cooperative rebalancing was introduced to reduce consumption downtime
  • partition.assignment.strategy – a list of class names, ordered by preference, of supported partition assignment strategies to distribute partition ownership (default: [RangeAssignor, CooperativeStickyAssignor])
  • I.e. the default config (Kafka 3.3) uses RangeAssignor, but allows upgrading to CooperativeStickyAssignor with just a single rolling bounce (removing RangeAssignor from the list) (see also KIP-726 to change the default)
  • For newer clusters CooperativeStickyAssignor should be preferred to get cooperative rebalancing
  • A nice explanation / visualization of different strategies can be found in this blog post

group management: consumer liveness

  • Each member in the group must send heartbeats to the coordinator to indicate its liveness. If no heartbeats are received before the expiration of the configured session timeout, the coordinator will remove this client from the group and initiate a rebalance.
  • session.timeout.ms – max time that can pass without a heartbeat (default: 45 sec, got increased from 10 sec for 3.0, see KIP-735)
    ⚠️ Make sure to close() the consumer on shutdown to keep processing latency low!
  • heartbeat.interval.ms – time between heartbeats sent to the consumer coordinator; must be < session.timeout.ms, recommended to be < 1/3 of it; also used for discovering that a rebalance is in progress (default: 3 sec)
  • max.poll.interval.ms – max delay between invocations of poll(). If exceeded, the consumer is considered failed and the group will rebalance (default: 5 min)
  • max.poll.records – max number of records that poll() will return; because the consumer will only join a rebalance inside poll(), reducing this config reduces the delay of a group rebalance (default: 500)

that's the consumer (?)

transactional messaging

transactional messaging "use cases"

  1. Atomic writes across multiple partitions: consumers see all or nothing
  2. Exactly-once processing in "read-process-write" cycles from Kafka topics to Kafka topics, committing offsets atomically
  3. Exactly-once processing & zombie fencing: if a "zombie instance" (unavailable/dangling) is "replaced" with a new one, input topics could be processed by both instances, causing duplicates in output topics

transactional producer

							
								Properties props = new Properties();
								props.put("enable.idempotence", "true");
								props.put("transactional.id", "producer-1");
								Producer<String, String> producer = new KafkaProducer<>(props);
								producer.initTransactions();
								// ...
								producer.beginTransaction();
								// ...
								producer.commitTransaction();
							
						
  • When the transactional.id is specified, all messages sent by the producer must be part of a transaction
  • There can be only one open transaction per producer

1) atomic writes across partitions

							
								try {
									producer.beginTransaction();
									producer.send(topic1Record);
									producer.send(topic2Record);
									producer.commitTransaction();
								} catch (ProducerFencedException | OutOfOrderSequenceException) {
								  // We can't recover, so our only option is to close and exit
								  producer.close();
								} catch (KafkaException e) {
								  // For all other exceptions, just abort the transaction and try again
								  producer.abortTransaction();
								}
							
						

All messages sent between beginTransaction() and commitTransaction() will be part of a single transaction

1.1) Reading Transactional Messages

  • For atomic writes across partitions to work (from consumer perspective), consumer isolation.level must be set to read_committed (default: read_uncommitted): poll() will only return transactional messages which have been committed
  • If set to read_uncommitted, poll() will return all messages, even transactional messages which have been aborted
  • Non-transactional messages will be returned unconditionally in either mode

2) committing offsets in "read-process-write"

							
								KafkaConsumer<String, String> c = new KafkaConsumer<>(consProps);
								KafkaProducer<String, String> p = new KafkaProducer<>(prodProps);
								p.initTransactions();
								while(true) {
								  ConsumerRecords<String, String> inputs = c.poll(100);
								  p.beginTransaction();

								  process(inputs).forEach(p::send);

								  sendOffsetsResult = p.sendOffsetsToTransaction(
								    offsets(inputs), c.groupMetadata()
								  );
								  p.endTransaction();
								}
							
						
  • sendOffsetsToTransaction – sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction
  • The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1
  • Note: the consumer should have set enable.auto.commit=false
  • These offsets will be committed only if the transaction is committed

3) exactly-once processing & zombie fencing

							
								Properties props = new Properties();
								props.put("enable.idempotence", "true");
								props.put("transactional.id", stableUniqueId);
								Producer<String, String> producer = new KafkaProducer<>(props);
								producer.initTransactions();
								// ...
							
						
  • transactional.id – unique id used for transactional delivery, is used to provide continuity of transactional state across application restarts. I.e. when taking zombie instances (like network partitions) as a given, for exactly-once semantics the transactional.id must be stable for multiple producer sessions
  • initTransactions() – Ensures any transactions initiated by previous instances of the producer with the same transactional.id are completed. If the previous instance had failed with a transaction in progress, it will be aborted. If the last transaction had begun completion, but not yet finished, this method awaits its completion

that's transactional messaging (?)

⚠️ Disclaimer: this was a high-level overview. To use it please learn more and dig deeper to see how it fits your application, workload and operational model!
If not used correctly you might not get what you want...

Usually a more robust approach is to use at-least-once semantics and let consumers deduplicate.

Further reading

the end

p.s. if you found a potential improvement please submit a pull request here

Further Reading

A collection of links that could be of interest