I’m starting a new challenge, maintain and blog (around) every weekend a curated list of links I’ve stumble upon during my work week. As I’m a JVM guy, this will be mostly related with Java development but there may be a few here and there links about general topics. So, today is #1.
Lately I’ve been looking around distributed messaging systems. As the new kid on the block, I decided to start with Apache Kafka and I was surprised with how easy it is to start with the basics.
With Kafka,
Producers push and Consumers pull messages.
Messages are persisted (filesystem) and organized into topics.
Topics can have multiple partitions (on different machines, if needed).
Runs in clusters.
Nodes/servers are called brokers.
Each topic has it’s own offset which allows you to start consuming a topic from a specified offset. Every new message is added to the partition’s head and a specific message is identified by it’s partition, topic and offset. Kafka allows you to have multiple consumers consuming the same partition on the same topic, as messages are not removed once they are consumed. E.g. ConsumerA is consuming from the offset 3 and ConsumerB started consuming from the offset 3 but is currently reading from the offset 10, when ConsumerA reaches offset 10 he will have consumed the exact same messages as Consumer B.
Still, need to note that the persistence time is configurable. Kafka can persist the messages for a predefined amount of time, as in the previous example if ConsumerA takes too long to consume it may not consume the exact same messages. Kafka doesn’t keep a track of consumers, i.e. it is the consumer’s responsibility to keep track of what they have consumed and where they are currently in the topic (partition & offsets).
Each broker has one or more partitions and each partition has a leader. The consumers and producers only talk with the partition leader. The leader replicates the information with the followers.
Producers load balance the partition, i.e. producers randomly talk to the leader partitionA for an X amount of time. When that time expires the producer will select another leader to talk to. Still, it is possible to customise the load balancer by your needs.
Consumers can be grouped, consumer groups. Each consumer takes a part of the data where the group as a whole does the work. This is handy for scaling, if a consumer is struggling to keep up with the work load you can fire up a new consumer to help out.
Ok, enough of cheap talking. Let’s get our hands dirty.
First you need to download Kafka and extract it to your server/computer. Kafka has binaries for Win and Linux systems. For both operation systems the procedure is quite the same, on Windows use “yourKafkaFolderLocation\bin\windows” folder.
First thing first, let’s have a look at the configuration file. Go to config folder and open up server.properties. The content is pretty much self explanatory. Each instance of Kafka requires it’s own .properties file. If you intend to run more than one instance of Kafka in your machine/server you need a copy of server.properties file and each .properties file has to have a unique broker.id and port number.
First, let’s create a topic by using the following command for Linux:
Once Zookeeper is up and running you can fire up Kafka, to do so on Linux run the following command:
sh kafka-server-start.sh path-to-your-kafka-config-file
and for Windows:
kafka-server-start path-to-your-kafka-config-file
Now that you can start an instance of Kafka let’s code a producer and a consumer.
For the producer, we need to set some configurations. We do so with the following method.
// Set the producer configuration
public void configureAndStart(ProducerType producerType) {
kafkaProperties = new Properties();
kafkaProperties.put("metadata.broker.list", brokerList);
kafkaProperties.put("serializer.class", "kafka.serializer.StringEncoder");
kafkaProperties.put("request.required.acks", "1");
kafkaProperties.put("producer.type", producerType.getType());
producerConfig = new ProducerConfig(kafkaProperties);
// Start the producer
producer = new Producer<String, String>(producerConfig);
}
Here we set the broker url, the message serializer, we wait for the leader’s acknowledgement after the leader replica has received the data and we specify the producer type (if it is a synchronous or an asynchronous producer).
More information about these and other possible configurations can be found here.
To produce a message into a Kafka’s topic we call the following method.
// Create the message and send it over to kafka
// We use null for the key which means the message will be sent to a random partition
// The producer will switch over to a different random partition every 10 minutes
public void produceMessage(String message) {
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, null, message);
producer.send(keyedMessage);
}
And that’s pretty much it.
On the consumer side we also need to set up some configurations. To do so, we use the following method.
/**
* Zookeeper to coordinate all the consumers in the consumer group and track the offsets
*/
public void configure() {
kafkaProperties.put("zookeeper.connect", zookeeperUrl);
kafkaProperties.put("group.id", groupId);
kafkaProperties.put("auto.commit.internal.ms", "1000");
kafkaProperties.put("auto.offset.reset", "largest");
// If you want to manually control the offset commit set this config to false
kafkaProperties.put("auto.commit.enable", "true");
// By default it waits till infinite otherwise specify a wait time for new messages.
kafkaProperties.put("consumer.timeout.ms", waitTime);
consumerConfig = new ConsumerConfig(kafkaProperties);
}
Then, we start the consumer by calling the start() method.
public void start() {
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
/**
* Define how many threads will read from each topic. We have one topic and one thread.
*
* Can be multi thread (one topic 2+ threads). With the high level consumer we usually don't multi thread them.
* This is because if I want to manually commit offsets it will commit for all the threads that have this
* consumer instance, at the same time. To avoid this we can create multiple consumer instances and have each
* of them their own thread.
*/
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
/**
* We will use a decoder to get Kafka to convert messages to Strings
* valid property will be deserializer.encoding with the charset to use
* default UTF8 which works for us
*/
StringDecoder decoder = new StringDecoder(new VerifiableProperties());
/**
* Kafka will give us a list of streams of messages for each topic.
* In this case it is just one topic with a list of a single stream
*/
stream = consumerConnector.createMessageStreams(topicCountMap, decoder, decoder).get(topic).get(0);
}
Whenever we want to fetch for messages we call the following method where we get a stream iterator. Using the iterator we retrieve the next message and return it.
public String fetchMessage() {
ConsumerIterator<String, String> it = stream.iterator();
try {
return it.next().message();
}
catch (ConsumerTimeoutException ex) {
System.out.println("Waited " + waitTime + " but no messages arrived... Exiting...");
return null;
}
}
Here is the output for the producer.
And here is the consumer output.
The full code can be found on my GitHub repository or by following this link.
For the sake of brevity and simplicity I followed the KISS principle. A lot more can be done with Kafka. I’ll try to come up with slightly more advanced use cases for Kafka in the future.
Data model can be a pain to maintain among a group of developers. Mostly at the beginning where the data model is very likely to change frequently due to new system requirements and specifications.
When using MySQL Workbench you get all those UI niceties. You can see how your tables interact with each other, table attributes “right there” and so on. The problem is, when you save your data model Workbench will save it as a binary file, a .mbw file type. Therefore, if you use your version control system, like git, it wont let you see the file changes from each commit.
You may say, let the UI niceties go down the hole and just do it by hand and make yourself your data model using SQL and use the resulting .sql file on your version control system. It’s one of those choices where you can only pick one. Either use the UI from Workbench, or any other data modelling software, for a fast and easy development but you lose the ability to check for data changes and have a rather slow process of commiting your changes (you would have to ask your team if anyone as also updated the data model so he wont lose his changes) or use a slow, tedious and non descriptive process of creating the data model by coding the SQL but get all the power the version control system provides you, like check file changes and, most importantly, manage merges and merge conflicts.
What if you could join both in one? Actually you can, at least with Workbench. It is easy as saving your data model as a .sql file (File -> Export -> Forward Engineer SQL CREATE Script) and use that .sql file as your data model on your version control system. When you want to load your .sql file as a graphical data model you just need to click on the right pointing arrow and select “Create EER Model From Script”, as shown below, select your .sql file that represents your data model and tick the “Place imported objects on a diagram” box. Workbench will load the file and display your data model like your were actually opening the .mwb file.
Now you have both worlds together, the niceties of your UI and the power of your version control system.
CouchDB is a real time database, where data is stored as JSON documents. One particular thing I like in CouchDB is their ChangesFeed. I like the idea of having a real time feed with all the changes in the data I’m interested in.
There are different ways to access the ChangesFeed and I wont go into details but mostly you can either poll the ChangesFeed for changes, which I find archaic and not really up to the current standards/technologies, or you can use the Continuous Changes. As stated in the CouchDB docs, a continuous feed stays open and connected to the database until explicitly closed and changes are sent to the client as they happen, i.e. in near real-time. This is neat but it can be neater with RxJava.
So, I thought: why not observe the ChangesFeed and emit an event as soon as there is a new item in it? Unfortunately the Java driver for CouchDB (Ektorp) does not have RxJava support. Still, it shouldn’t be that difficult to implement some kind of an observable into the ChangesFeed.
After lurking around Ektorp source code I found out that ChangesFeed is an interface that is implemented by ContinuousChangesFeed. Again, inspecting CountinuousChangesFeed class I see a
private final BlockingQueue changes = new LinkedBlockingQueue(100);
and… bingo!
I just need to observe this LinkedBlockingQueue for changes and emit them!
So, I created a PublishSubject<StdDocumentChange> that will emit all new items added into the LinkedBlockingQueue. PublishSubject<StdDocumentChange> will also provide an Observable to whom we can subscribe for changes. Sweet. This is all we need to add into the ContinuousChangesFeed class:
// Declare our PublishSubject and create it
private final PublishSubject onAdded = PublishSubject.create();
// Call the PublishSubjsect's onNext() when there is a new item in the LinkedBlockingQueue
private void handleChange(String line) throws IOException, InterruptedException, JsonParseException, JsonMappingException {
StdDocumentChange stdDocumentChange = new StdDocumentChange(OBJECT_MAPPER.readTree(line));
changes.put(stdDocumentChange);
onAdded.onNext(stdDocumentChange);
}
// Provide an Observable to whom we can subscribe for events
public Observable onAdded() {
return onAdded;
}
To consume the changes we just need to subscribe to the Observable provided by the PublishSubject we defined above. We can do that this way:
// Get a reference to the ChangesFeed
ContinuousChangesFeed continuousChangesFeed = (ContinuousChangesFeed) couchDBQuery.changesFeed(myCouchDd.getDbConnector());
// Subscribe! - Remember, onAdded() returns an Observable.
subscription = continuousChangesFeed.onAdded()
.subscribe(new Action1() {
@Override
public void call(StdDocumentChange document) {
System.out.println("Received: " + document);
}
});
I’ve forked Ektorp and applied this changes. You can find the forked repository with the RxJava implementation here. Either way I’ll soon submit a pull request to Ektorp repository and I’ll update this post with the outcome.
I’ve also created a small app that observes the ChangesFeed. In the app you can insert documents into a CouchDB database and observe the ChangesFeed. To run the app you’ll need to set (in the code) the CouchDB server ip address, port and database name. You can get the source code from Github here.
EDIT: The pull request was rejected as: This feature might be nice, but it does not justify another dependency on a quite big lib as RxJava. I would rather see a complete Rx’fied CouchDb driver, that is based on non-blocking code from the ground up.