Artificial Unintelligence.

Six Ways Your Kafka Design will Fail.

☕️☕️☕️ 15 min read

In our blog series on data engineering and data science, we decided to use Kafka as a distributed data plane for the platform. For a fairly simple system, there is quite a lot to say about Kafka, and plenty of ways for you to embarrass yourself while implementing whatever business-value-adding-doovalacky is important this week. In this post we’ll cover some of the most common, dangerous, or strangest.

NB: I promise this will be the last conceptual post for a while and the next several will be pure implementation with substantially more code. While I personally think this post is a bit dry, I was explaining so many Kafka concepts in the following few that I decided to consolidate them all in a single post. I have tried to add some levity by swearing a lot, hopefully this helps.

Kafka Basics

Kafka is a key-value setup, where keys and values are understood only as arbitrary bytes. There are a few concepts to understand in order to use it effectively, including brokers, topics, partitions, replications and consumer groups. Kafka itself does relatively little, it doesn’t understand any data type other than raw bytes, makes no decisions about how to assign data across a cluster, and doesn’t decide how to feed that data back to consumers. Typically, you’ll be told that keys in Kafka are for either (a) partitioning/distributing the data on the cluster; or (b) something to do with indexing, like a database table index. Both statements are bullshit, and we’ll explore why below.

facepalm.
I fucked up, so you don't have to.

One of the reasons there might be so many misconceptions about the system is that there are a variety of other tools in the ecosystem which are often referred to in the same breath as Kafka (e.g. the Kafka streams library for Scala and Java, or the confluent Avro serialisers which interface with the schema registry - another component commonly deployed with kafka). All of these tools use Kafka’s primitives (partitions, topics, consumer groups) in different ways.

The schema registry in particular deserves a quick explanation, as it stores Avro schemata which are often used to serialise messages on Kafka. The schemata are required for both serialisation and deserialisation, and the schema registry is an external system which can provide these. The Confluent avro serialiser makes use of it to automatically retrieve and write schema in Java and Scala. Similar Kafka clients exist for Python also.

Consumers and consumer groups

Consumers (of which there may be many for a given topic) subscribe to a partition (or several) within a topic to receive data as it arrives at the brokers. Once data is received, the consumer “commits” an offset to track where it is up to in the stream. In case of failure, this can be used to restart processing.

Offsets are committed!

Great, but where are they committed sorry? Another fact that is often missed about Kafka is that consumers actually make that decision. While traditionally Zookeeper was often used for this purpose, we might equally decide to use some other database, (like etcd). I believe that as of Kafka 0.9, there is a topic in Kafka which handles this, but that isn’t always optimal.

For a stateless consumer, there’s no reason to track offsets at all. For high performance consumers, or where there are a large number of consumers (e.g. Kafka is backing data delivered via a RESTful web API to a browser app) you might want to make the consumers responsible for tracking their own offsets locally.

Commit behaviour is often overlooked, but can be a powerful tool. For example, a microservice might do some processing to each record and send it to an external REST API. It could be configured to commit an offset only once a 200 OK response had been received. In the event of failure in the external API, no offsets are commited and the data remains “unread” as far as Kafka is concerned. Even in the event of a consumer crash concurrently, we can ensure reliable delivery to the third party and data will not be lost.

Consumers form consumer groups, and a given consumer group receives the same data only once. Not every consumer will receive every message on a topic, but it will receive every message on its partition. You can see how this naturally leads to thinking of a service or application as a consumer group, and the consumers as instances of the application.

Partitions

Partitions are the primary way Kafka parallelises computations. A partition is just a segment of data which has been allocated to a broker. The broker handles all consumer and producer related activity for that particular segment/shard/partition of data.

Because consumers, consumer groups, the application logic relying on the consumer, and the number of instances of the application are all tightly linked, it is highly desirable to avoid changing the number of partitions in production. All partitions are always allocated out to however many consumers you run, so it makes sense to overprovision partitions for future data growth. You can’t run more application instances than you have partitions.

For that reason, I usually suggest using a minimum of 9 partitions for large clusters, or 2–3 times the number of brokers for small clusters. A cluster is rarely smaller than 3 brokers, hence the lower limit of 9. Two partitions per broker doesn’t do much harm, but it does cater for the possibility of growth in the volumes of data (and if your data volume isn’t growing, best start looking for a new job).

One further benefit of overprovisioning your partitions is that it helps with cluster rebalancing. Anyone who’s ever looked at discrete optimisation for any packing problem (or just taken an overseas flight on a budget airline) can attest that the most efficient optimisation is simply to make the components to be packed as small as possible. If we have 9 small partitions instead of 3 big ones, it becomes much easier to fit them onto brokers that might otherwise not have capacity for them.

Historically fewer partitions were desirable for performance reasons, but more recent versions of Kafka suffer few performance drawbacks from increasing the number.

Finally, it bears mentioning that different Kafka designs can make use of different partitioning arrangements. One common misconception (explored below) is that partitioning always happens to same way. Actually, it is the consumers and producers which decide how partitioning occurs. Kafka can map from partitions to brokers (and therefore retrieve the data in a particular partition) but it does not keep track of which data is in which partition.

Topics

Topics are the most visible entity in Kafka, so of course, everyone thinks they understand them. They are most usually wrong. Many of the misconceptions below are about topics, and those who repeat them are usually highly confident in their pronouncements.

Order!
Keep it orderly.

While there are analogies to be made between topics and database tables, and while we do often see that a topic will be linked to an Avro schema (and indeed, it might be only one schema, not several) none of these facts actually captures what a topic is for. In a nutshell, topics (at least in a well designed Kafka system) actually encapsulate an ordering of their messages.

Kafka is a streaming system, so delivery of messages in-order is important. Kafka guarantees that a given consumer will receive all messages on a given partition in the order they were written. There are two catches here -

  1. Messages between different topics enjoy no such guarantee.
  2. Message between partitions receive no such guarantee.

As a result, if you were in a situation where you wanted to preserve the ordering of a particular data stream within your system (for example, you’re implementing an event sourcing system), it needs to be mapped (through the partitioning strategy mentioned above) to a single partition. We give some examples below as to how this plays out later in this post, as messages being delivered out of order is one consequence of Kafka-fuck-ups numbers five through six, below. They are particularly interesting because solutions are not very widely available for all language APIs, and the documentation surrounding them is economical in its insights (if not outright nonsense and probably written by someone in marketing).

So why do we still talk about topics providing the ordering? Because topics hold both the partitions and the partitioning strategies, so provided the partitioning strategies are rational (and people tend not to think about these very much due to them often being provided by Confluent and the like) we can assume that we get the ordering guarantees we’re looking for.

Replications?

Replications differ from partitions, while partitions split data across the cluster to shard it out amongst the Kafka instances, replications copy the entire topic (with the same partitioning scheme) to other brokers. This is for durability and availability - so that broker failure cannot cause data loss - but one strange implication is that sometimes brokers will have the entire dataset on them, which seems at odds with the idea of using a distributed system. For example, if we have 3 brokers, partitions and replications, every replication has to be on a different broker, which means that each broker will have a replication of each partition. While this seems to defeat the point of using a distributed system, it is actually no different to the way striped RAID used to work, and once we add more brokers (if we decide we need to) we will find each partition of each replication (9 partitions) can be placed anywhere within the cluster.

Top misconceptions about Kafka

Ah the part you were waiting for… What are some of the ways all of this can go wrong? Normally things going wrong starts with someone misunderstanding how the system works and what’s implied by that. There are plenty of ways for this to happen even in a relatively simple system like Kafka.

1. Partitioning happens according to the key, therefore…

Actually partitioning happens according to whatever partitioning strategy our producers are using. You also want to hope that the consumers know about this, or they won’t know how to process the data they receive. I had this debate with some colleagues at one stage, where we discussed whether using differently named Avro schemata for a key would land messages on different partitions (because the hash of the key would be different due to the differing magic numbers in the two schemata).

From recollection, some further research turned up the fact that the Confluent Avro Serialiser actually does some work to hash the logical (not serialised) value of the Avro message, in order to correctly allocate it to a partition irrespective of the magic number, schema name, or other considerations.

2. All of our topics just have three partitions, there’s no point running more than three brokers…

The mistake seems obvious when you’re looking for it, but I’ve heard this more than once. Naturally, if you have more than one topic, you can distribute partitions for different topics to brokers across the cluster. Not every broker needs to hold the same set of topics.

3. A topic is just like a table

Absolutely not. Topics support multiple data types (see above), they do not support mutable operations (there’s no update, only inserts and subsequent deletes).

Topics should be thought of as a “slice of state” (to borrow from the React.JS community’s writings on flux libraries). That slice can contain a duplicate of data held elsewhere, that’s fine. Normalisation is not important, what is important is the ordering of the events relative to one another within the topic.

Partitioning and in order delivery - an example

If you were streaming tweets, and had included a particular type of message indicating that a tweet was deleted, it would be important to ensure that the original tweet and the record indicating that it was deleted resided within the same partition.

Naturally, this causes a deal of complexity. Imagine a system with users, tweets and tweet deletion functionality. If each were stored in a database table, and we did change data capture (CDC) to track changes to the underlying rows, a naive approach might;

  1. Push these into seperate Kafka topics with their own defined data structures.
  2. Key the events according to the database keys.

Both would be wrong. There should be a single topic and a key that makes sense across all three entities (probably a user ID given it is the highest level entity). In reality, to ensure we retained consistency we would need to ensure that eventually all data related to a user was mapped to a single partition. If users were on a different partition to Tweets it would be impossible to ascertain if user data had changed before or after a tweet, but if tweets and deletions were on seperate partitions, it would be impossible for a given recipient of a tweet to determine whether a tweet had been deleted.

Event sourcing is a set of architectural patterns which can help resolve these issues. Under an ES approach, we’d ensure that any changes from the tweet and deletion tables were joined to some user key field prior to being sent to Kafka. The user is the actor entity in this situation, and only a single user can create a tweet. Multiple primary entities becomes even more complex. If we were talking about DMs between users, both need to be considered primary and ES might advocate that we replicate the data under two keys, one belonging to each user. In this case, we’d say that only a single user can create or receive a DM, and would probably create SentMessage and ReceivedMessage event types which would hold mirror images of the same data. The important thing in both cases is that there is a primary actor entity which can be referred to with a single key so that its data can be routed to the correct partition.

As these examples demonstrate, it is normal to see data replicated and stored in a very denormalised format within event based systems.

4. Oh, but when it is log compacted, THEN a topic is just like a table, right?

No, still no. Tables are still tables and topics are still topics. One is for normalising subsets of that data to avoid duplication (if you’re using an RDBMS; if you aren’t, you don’t have tables) and one is for ensuring an ordering of subsets of that data. They are not the same, there has been no prestidigitation performed, no rabbits have been pulled out of hats.

For all the reasons above, this is again incorrect. And when you update a key in the log compacted table, you are doing an INSERT followed by a DELETE some unspecific length of time later. You are not, under any circumstances, doing an UPDATE.

5. A topic should contain a single type of data

I’m guilty of this one, and it wasn’t until I started asking why the registry allowed multiple schemata per topic that I figured it out. As you can see from the sidebar, there are situations where you’ll need to have multiple event types in a single partition - mostly when those event types relate to each other in some clearly defined way which is dependant on their ordering. You can’t have a refund before you’ve made a sale, you can’t delete an email before you’ve received it, you can’t end a call or web session before you’ve commenced it.

The events indicating that these things have happened will often have different schemata and (if you use a type heavy language like Scala) different types; as the fields required might be quite different. Those required to describe a refund will clearly differ from those required to describe a sale. But there is no way that a refund should ever occur in advance of a sale, so they really should be in the same topic (unless you really like being a victim of fraud, in which case go for gold).

This is further evidence of how a topic is not-like-a-table, so people who subscribe to the former fallacy are probably also vulnerable to the latter..

*Note that if you do have to take this path, it should be a conscious decision, and you should probably turn off automatic schema creation in your Kafka Producer. Where this functionality is not required, the schema registry should be configured to only allow one schema. Failing to do so (especially if working in a team) means that any change to the schema can break compatibility for any producers which haven’t received the change. It is painful.

6. We can join data between different topics

As we’ve established in depth above, topics are for ordering events. We often need to both;

  1. Think carefully about partitioning strategy to ensure messages are on the same partition where their ordering matters.
  2. Allow several varieties of messages on a single topic, even if this means allowing several Avro schema for that topic.

This is particularly unfortunate when a single topic has been split into several, and someone then decides that recombining them is then required. Kafka Streams claims to do roughly this, but the documentation glosses over all of these nuances in favor of bullshit. The reality is that this is really not possible. For example, if additional processing steps (or simple network latency) delay the messages from one stream, joins may be missed. Kafka streams can use the timestamps on the messages in various ways, but it cannot use them to reimpose proper ordering on a stream which has been split. And if that exact wording could be added to the doco I think we’d see far fewer poor implementations, bluntly.

So there you go, a bunch of ways to stuff up data systems based on Kafka. If you have seen anything else terrible, hilarious, or outright scary when developing systems on Kafka, let me know on Twitter and we can swap war stories.