CouchDB with RxJava – ChangesFeed got even sweeter

CouchDB is a real time database, where data is stored as JSON documents. One particular thing I like in CouchDB is their ChangesFeed. I like the idea of having a real time feed with all the changes in the data I’m interested in.

There are different ways to access the ChangesFeed and I wont go into details but mostly you can either poll the ChangesFeed for changes, which I find archaic and not really up to the current standards/technologies, or you can use the Continuous Changes. As stated in the CouchDB docs, a continuous feed stays open and connected to the database until explicitly closed and changes are sent to the client as they happen, i.e. in near real-time. This is neat but it can be neater with RxJava.

So, I thought: why not observe the ChangesFeed and emit an event as soon as there is a new item in it? Unfortunately the Java driver for CouchDB (Ektorp) does not have RxJava support. Still, it shouldn’t be that difficult to implement some kind of an observable into the ChangesFeed.

After lurking around Ektorp source code I found out that ChangesFeed is an interface that is implemented by ContinuousChangesFeed. Again, inspecting CountinuousChangesFeed class I see a

private final BlockingQueue changes = new LinkedBlockingQueue(100);

and… bingo!

I just need to observe this LinkedBlockingQueue for changes and emit them!

So, I created a PublishSubject<StdDocumentChange> that will emit all new items added into the LinkedBlockingQueue. PublishSubject<StdDocumentChange> will also provide an Observable to whom we can subscribe for changes. Sweet. This is all we need to add into the ContinuousChangesFeed class:

// Declare our PublishSubject and create it 
private final PublishSubject onAdded = PublishSubject.create();
// Call the PublishSubjsect's onNext() when there is a new item in the LinkedBlockingQueue 
private void handleChange(String line) throws IOException, InterruptedException, JsonParseException, JsonMappingException { 
    StdDocumentChange stdDocumentChange = new StdDocumentChange(OBJECT_MAPPER.readTree(line)); 
    changes.put(stdDocumentChange); 
    onAdded.onNext(stdDocumentChange); 
}

// Provide an Observable to whom we can subscribe for events 
public Observable onAdded() { 
    return onAdded; 
}

To consume the changes we just need to subscribe to the Observable provided by the PublishSubject we defined above. We can do that this way:

// Get a reference to the ChangesFeed 
ContinuousChangesFeed continuousChangesFeed = (ContinuousChangesFeed) couchDBQuery.changesFeed(myCouchDd.getDbConnector());

// Subscribe! - Remember, onAdded() returns an Observable. 
subscription = continuousChangesFeed.onAdded()
        .subscribe(new Action1() {
            @Override
            public void call(StdDocumentChange document) {
                System.out.println("Received: " + document);
            }
        });

Or with Java 8 Lambdas

subscription = continuousChangesFeed.onAdded()
        .subscribe((StdDocumentChange document) -> {
            System.out.println("Received: " + document);
        });

I’ve forked Ektorp and applied this changes. You can find the forked repository with the RxJava implementation here. Either way I’ll soon submit a pull request to Ektorp repository and I’ll update this post with the outcome.

I’ve also created a small app that observes the ChangesFeed. In the app you can insert documents into a CouchDB database and observe the ChangesFeed. To run the app you’ll need to set (in the code) the CouchDB server ip address, port and database name. You can get the source code from Github here.

 

EDIT: The pull request was rejected as: This feature might be nice, but it does not justify another dependency on a quite big lib as RxJava. I would rather see a complete Rx’fied CouchDb driver, that is based on non-blocking code from the ground up.

Link to the pull request.

Leave a Reply

Your email address will not be published. Required fields are marked *

Enter Captcha Here : *

Reload Image

This site uses Akismet to reduce spam. Learn how your comment data is processed.