Publish/subscribe messaging is a pattern that is characterized by the sender (publisher) of a piece of data (message) not specifically directing it to a receiver. Instead, the publisher classifies the message somehow, and that receiver (subscriber) subscribes to receive certain classes of messages. Pub/sub systems often have a broker, a central point where messages are published, to facilitate this.
Apache Kafka is a publish/subscribe messaging system. It is often described as a “distributed commit log” or more recently as a “distributing streaming platform.” Data within Kafka is stored durably, in order, and can be read deterministically. In addition, the data can be distributed within the system to provide additional protections against failures, as well as significant opportunities for scaling performance.
The unit of data within Kafka is called a message => array of bytes KEY (optional) => Message metadata => byte array Value => Message => byte array
Batches: Messages are written into Kafka in batches. Reduces overhead of individual roundtrips acroos the network, for each meessage. Batches are typically compressed => more efficient data transfer.
Broker => Kafka server. Operate as part of a cluster.
Broker receives messages from producers => assigns offsets to them => commits the messages to storage on disk.
Broker serve consumers => respond to fetch requests for partitions and responding with the messages.
broker.id - Every Kafka broker must have an integer identifier - unique within a single Kafka cluster
port - a listener on TCP - DEFAULT port 9092
zookeeper.connect - The location of the Zookeeper used for storing the broker metadata - a semicolon-separated list of hostname:port/path strings:
log.dirs log segments are stored in the directories specified in the log.dirs configuration. - comma-separated list of paths. If more than one path is specified, the broker will store partitions on them in a “least-used” fashion with one partition’s log segments stored within the same path. Note that the broker will place a new partition in the path that has the least number of partitions currently stored in it, not the least amount of disk space used in the following situations.
num.recovery.threads.per.data.dir - number configured is per log directory specified with log.dirs. pool of threads for handling log segments. By default, only one thread per log directory is used
auto.create.topics.enable - DEFAULT true
default.replication.factor for automatically created topics. A replication factor of N allows you to lose N-1 brokers while still being able to read and write data to the topic reliably. Higher replication factor leads to higher availability, higher reliability, and fewer disasters
unclean.leader.election.enable -- default true. If we allow out-of-sync replicas to become leaders, we risk data loss and data inconsistencies. If we set it to false, we choose to wait for the original leader to come back online, resulting in lower availability
min.insync.replicas topic and the broker-level configuration are called min.insync.replicas. To 2, then you can only write to a partition in the topic if at least two out of the three replicas are in-sync -- Read-only mode. min.insync.replicas only matters if acks=all
The controller is one of the Kafka brokers that, in addition to the usual broker functionality, is responsible for electing partition leaders. The first broker that starts in the cluster becomes the controller by creating an ephemeral node in ZooKeeper called /controller.
Kafka uses Zookeeper’s ephemeral node feature to elect a controller and to notify the controller when nodes join and leave the cluster. The controller is responsible for electing leaders among the partitions and replicas whenever it notices nodes join and leave the cluster. The controller uses the epoch number to prevent a “split brain” scenario where two nodes believe each is the current controller.
Controller => one broker will also function as the cluster controller - There is only one controller in a cluster at all times. Admin operations:
The leader of the partition => partition is owned by a single broker in the cluster. A partition may be assigned to multiple brokers, which will result in the partition being replicated. Another broker can take over leadership if there is a broker failure. However, all consumers and producers operating on that partition must connect to the leader.
Messages in Kafka are categorized into topics. Topics are additionally broken down into a number of partitions A topic typically has multiple partitions, there is no guarantee of message time-ordering across the entire topic, just within a single partition.
Replicas are spread across available brokers, and each replica = one broker. RF 3 = 3 brokers
Dynamic topic configurations are maintained in Zookeeper.
Partitions are also the way that Kafka provides redundancy and scalability. Each partition can be hosted on a different server, which means that a single topic can be scaled horizontally across multiple servers to provide performance far beyond the ability of a single server.
log-retention settings operate on log segments
log.retention.ms The most common configuration for how long Kafka will retain messages is by time. log.retention.hours parameter, DEFAULT - 168 hours, or one week. log.retention.minutes log.retention.ms the smaller unit size will take precedence if more than one is specified.
log.retention.bytes - applied per-partition - log.retention.bytes & log.retention.ms messages may be removed when either criteria is met
log.segment.bytes - DEFAULT 1 GB
log.segment.ms - The amount of time after which a log segment should be closed. --- NOT DEFAULT ---
message.max.bytes - DEFAULT 1MB - (or 1000000) - impact I/O throughput Coordinated with the consumer configuration fetch.message.max.bytes configuration on consumer amd the replica.fetch.max.bytes
Producers create new messages (publishers/writers). A message will be produced to a specific topic.
The producer does not care what partition a specific message is written to and will balance messages over all partitions of a topic evenly. In some cases, the producer will direct messages to specific partitions. This is typically done using the message key and a partitioner that will generate a hash of the key and map it to a specific partition. This assures that all messages produced with a given key will get written to the same partition
Keys are necessary if you require strong ordering or grouping for messages that share the same key. If you require that messages with the same key are always seen in the correct order, attaching a key to messages will ensure messages with the same key always go to the same partition in a topic. Kafka guarantees order within a partition, but not across partitions in a topic, so alternatively not providing a key - which will result in round-robin distribution across partitions - will not maintain such order.
ProducerRecord, which must include the topic we want to send the record to and a value. Optionally, we can also specify a key and/or a partition. Once we send the ProducerRecord, the first thing the producer will do is serialize the key and value objects to ByteArrays so they can be sent over the network. Next, the data is sent to a partitioner. If we specified a partition in the ProducerRecord, the partitioner doesn’t do anything and simply returns the partition we specified. If we didn’t, the partitioner will choose a partition for us, usually based on the ProducerRecord key. Once a partition is selected, the producer knows which topic and partition the record will go to. It then adds the record to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers.
When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with the topic, partition, and the offset of the record within the partition. If the broker failed to write the messages, it will return an error. When the producer receives an error, it may retry sending the message a few more times before giving up and returning an error.
A producer object can be used by multiple threads to send messages
Errors before sending the message to Kafka:
KafkaProducer type of errors.
Retriable errors are those that can be resolved by sending the message again. For example, "Connection error". Some errors will not be resolved by retrying. For example, “message size too large.”
acks controls how many partition replicas must receive the record before the producer can consider the write successful.
buffer.memory : This sets the amount of memory the producer will use to buffer messages waiting to be sent to brokers. block.on.buffer.full parameter (replaced with max.block.ms in release 0.9.0.0, which allows blocking for a certain time and then throwing an exception).
compression.type: By default, messages are sent uncompressed. This parameter can be set to snappy, gzip, or lz4.
retries: The value of the retries parameter will control how many times the producer will retry sending the message before giving up and notifying the client of an issue. By default, the producer will wait 100ms between retries, but you can control this using the retry.backoff.ms parameter.
batch.size (in bytes!) controls how many bytes of data to collect before sending messages to the Kafka broker. Set this as high as possible, without exceeding available memory. Enabling compression can also help make more compact batches and increase the throughput of your producer.
linger.ms forces the producer to wait to send messages, hence increasing the chance of creating batches
client.id This can be any string, and will be used by the brokers to identify messages sent from the client.
max.in.flight.requests.per.connection how many messages the producer will send to the server without receiving responses. Setting this to 1 will guarantee that messages will be written to the broker in the order in which they were sent.
request.timeout.ms: how long the producer will wait for a reply from the server when sending data
metadata.fetch.timeout.ms: how long the producer will wait for a reply from the server when requesting metadata
timeout.ms: controls the time the broker will wait for in-sync replicas to acknowledge the message in order to meet the acks configuration—the broker will return an error if the time elapses without the necessary acknowledgments.
max.block.ms: how long the producer will block when calling send() and when explicitly requesting metadata via partitionsFor().
max.request.size: This setting controls the size of a produce request sent by the producer. the broker has its own limit on the size of the largest message it will accept (message.max.bytes). It is usually a good idea to have these configurations match, so the producer will not attempt to send messages of a size that will be rejected by the broker.
receive.buffer.bytes: size of the TCP send buffer
send.buffer.bytes: size of the TCP receive buffer
If these are set to -1, the OS defaults will be used.
Ordering Guarantees If guaranteeing order is critical, we recommend setting in.flight.requests.per.session=1 to make sure that while a batch of messages is retrying, additional messages will not be sent.This will severely limit the throughput of the producer, so only use this when order is important.
Consumers read messages. (subscribers/readers). The consumer subscribes to one or more topics and reads the messages in the order in which they were produced. The consumer keeps track of which messages it has already consumed by keeping track of the offset of messages. Kafka does not track acknowledgments from consumers the way many JMS queues do. Instead, it allows consumers to use Kafka to track their position (offset) in each partition.
commit: Action of updating the current position in the partition. Consumer produces a message to Kafka, to a special __consumer_offsets topic, with the committed offset for each partition.
Consumers do not directly write to the __consumer_offsets topic, they instead interact with a broker that has been elected to manage that topic, which is the Group Coordinator broker
offset: integer -- Each message in a given partition has a unique offset.
Consumer group => consumers that work together to consume a topic. When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic. Each partition is only consumed by one member. The mapping of a consumer to a partition is often called ownership of the partition by the consumer. If we add more consumers to a single group with a single topic than we have partitions,some of the consumers will be idle and get no messages at all. Multiple topics can be passed as a list or regex pattern. Kafka transfers data with zero-copy and sends the raw bytes it receives from the producer straight to the consumer, leveraging the RAM available as page cache.
Moving partition ownership from one consumer to another is called a rebalance. Rebalance is basically a short window of unavailability of the entire consumer group. Ehen partitions are moved from one consumer to another, the consumer loses its current state; if it was caching any data, it will need to refresh its caches—slowing down the application until the consumer sets up its state again. The way consumers maintain membership in a consumer group and ownership of the partitions assigned to them is by sending heartbeats to a Kafka broker designated as the group coordinator
If the committed offset is smaller than the offset of the last message the client processed, the messages between the last processed offset and the committed offset will be processed twice
If the committed offset is larger than the offset of the last message the client actually processed, all messages between the last processed offset and the committed offset will be missed by the consumer group
There are many different ways to implement exactly-once semantics by storing offsets and data in an external store, but all of them will need to use the ConsumerRebalanceListener and seek() to make sure offsets are stored in time and that the consumer starts reading messages from the correct location.
Closing the consumer will commit offsets if needed and will send the group coordinator a message that the consumer is leaving the group. The consumer coordinator will trigger rebalancing immediately and you won’t need to wait for the session to time out before partitions from the consumer you are closing will be assigned to another consumer in the group.
assign() can be used for manual assignment of a partition to a consumer, in which case subscribe() must not be used. Assign() takes TopicPartition object as an argument
at-most once consuming scenario. Which offset commit strategy would you recommend? commit the offsets right after receiving a batch from a call to .poll(). Before processing the data.
Consumer offsets are stored in a Kafka topic __consumer_offsets
In case the consumer has the wrong leader of a partition, it will issue a metadata request. The Metadata request can be handled by any node, so clients know afterwards which broker are the designated leader for the topic partitions. Produce and consume requests can only be sent to the node hosting partition leader.
bootstrap.servers: List of host:port pairs of brokers.
key.deserializer: classes that can take a byte array and turn it into a Java object.
value.deserializer: classes that can take a byte array and turn it into a Java object
group.id : the consumer group the KafkaConsumer instance belongs to (Optional but always used)
One consumer per thread is the rule.
fetch.min.bytes: minimum amount of data that the consumer wants to receive from the broker when fetching records.
fetch.max.wait.ms: control how long to wait. Default 500ms
max.partition.fetch.bytes : This property controls the maximum number of bytes the server will return per partition. Default 1MB. max.partition.fetch.bytes must be larger than the largest message a broker will accept
session.timeout.ms: The amount of time a consumer can be out of contact with the brokers while still considered alive -- defaults to 3 seconds heatbeat.interval.ms must be lower than session.timeout.ms
auto.offset.reset Default is latest. earliest to start consuming from beginning. For KSQL, SET 'auto.offset.reset'='earliest'; auto.offset.reset=none means that the consumer will crash if the offsets it's recovering from have been deleted from Kafka
enable.auto.commit Default is true auto.commit.interval.ms how frequently offsets will be committed. Default 5seg.
AutoCommit=true, avoid duplicates is hard: Call to poll will always commit the last offset returned by the previous poll. It is critical to always process all the events returned by poll() before calling poll() again.
AutoCommit=false, commitSync(). This API will commit the latest offset returned by poll() and return once the offset is committed, throwing an exception if commit fails for some reason. One drawback of manual commit is that the application is blocked until the broker responds to the commit request. Throughput can be improved by committing less frequently, but then we are increasing the number of potential duplicates that a rebalance will create. The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a nonretriable failure, commitAsync() will not retry.
partition.assignment.strategy Range: Assigns to each consumer a consecutive subset of partitions from each topic it subscribes to. RoundRobin: Takes all the partitions from all subscribed topics and assigns them to consumers sequentially, one by one.
client.id: This can be any string, and will be used by the brokers to identify messages sent from the client.
max.poll.records: This controls the maximum number of records that a single call to poll() will return.
receive.buffer.bytes: size of the TCP buffer
send.buffer.bytes: sizes of the TCP buffer
Producer idempotence helps prevent the network introduced duplicates - enable.idempotence=true
The durable storage of messages for some period of time. Kafka brokers are configured with a default retention setting for topics, either retaining messages for some period of time or until the topic reaches a certain size in bytes. Once these limits are reached, messages are expired and deleted so that the retention configuration is a minimum amount of data available at any time. Individual topics can also be configured with their own retention settings. Topics can also be configured as log compacted => Kafka will retain only the last message produced with a specific key. This can be useful for changelog-type data, whereonly the last update is interesting.
The replication mechanisms within the Kafka lusters are designed only to work within a single cluster, not between multiple clusters.
MirrorMaker => Kafka consumer and producer, linked together with a queue. Messages are consumed from one Kafka cluster and produced for another.
Consumers do not always need to work in real time. Durable retention means that if a consumer falls behind, either due to slow processing or a burst in traffic, there is no danger of losing data.
Start with a single broker => expand to a larger cluster. Expansions can be performed while the cluster is online, with no impact.
High Performance Apache Kafka carries messages between the various members of the infrastructure, providing a consistent interface for all clients
An ensemble is a set of 2n + 1 ZooKeeper servers where n is any number greater than 0. The odd number of servers allows ZooKeeper to perform majority elections for leadership.
Different Kafka components subscribe to the /brokers/ids path in Zookeeper where brokers are registered so they get notified when brokers are added or removed.
ACLs are stored in Zookeeper node /kafka-acls/ by default.
2181 - client port, 2888 - peer port, 3888 - leader port
Kafka components that are watching the list of brokers will be notified that the broker is gone.Even though the node representing the broker is gone when the broker is stopped, the broker ID still exists in other data structures. For example, the list of replicas of each topic (see “Replication” on page 97) contains the broker IDs for the replica. This way, if you completely lose a broker and start a brand new broker with the ID of the old one, it will immediately join the cluster in place of the missing broker with the same partitions and topics assigned to it.
Dynamic topic configurations are stored in Zookeeper.
Replication is critical because it is the way Kafka guarantees availability and durability when individual nodes inevitably fail. Each topic is partitioned, and each partition can have multiple replicas. Those replicas are stored on brokers, and each broker typically stores hundreds or even thousands of replicas belonging to different topics and partitions.
The amount of time a follower can be inactive or behind before it is considered out of sync is controlled by the replica.lag.time.max.ms configuration parameter.
Kafka is configured with auto.leader.rebalance.enable=true, which will check if the preferred leader replica is not the current leader but is in-sync and trigger leader election to make the preferred leader the current leader.
All requests sent to the broker from a specific client will be processed in the order in which they were received
The network threads are responsible for taking requests from client connections, placing them in a request queue, and picking up responses from a response queue and sending them back to clients.
Both produce requests and fetch requests have to be sent to the leader replica of a partition
Kafka clients use another request type called a metadata request, which includes a list of topics the client is interested in. The server response specifies which partitions exist in the topics, the replicas for each partition, and which replica is the leader. Metadata requests can be sent to any broker because all brokers have a metadata cache that contains this information. Refresh intervals metadata.max.age.ms configuration parameter
log.dirs: list of directories in which the partitions will be stored
Kafka brokers ship with the DumpLogSegment tool, which allows you to look at a partition segment in the filesystem and examine its contents.
Kafka maintains an index for each partition. The index maps offsets to segment files and positions within the file. Indexes are also broken into segments, so we can delete old index entries when the messages are purged.
Retention policy on a topic:
The compact policy never compacts the current segment. Messages are eligble for compaction only on inactive segments.
Having a message written in multiple replicas is how Kafka provides durability of messages in the event of a crash.
Setting unclean.leader.election.enable to true means we allow out-of-sync replicas to become leaders, we will lose messages when this occurs, effectively losing credit card payments and making our customers very angry.