Tuesday, 26 April 2016

A Distributed, Asynchronous RESTful Framework Prototype


Concepts of asynchronous RESTful operations were already doing rounds (check here), even before JAX-RS 2.0 introduced the server side asynchronous processing API. The very basic principle behind asynchronous RESTful API is pretty simple. This is an article describing how Jersey has implemented JAX-RS 2.0, and some information on the specifications.

Taking a cue from the concepts, we propose a prototype architecture for an asynchronous and distributed JAX-RS provider:
  1. Instead of dispatching the request to a JAX-RS annotated service method, suspend the operation somehow. Then, instead of returning a 201 or 200 HTTP response, issue a 202 (Accepted) response.
  2. Set a redirection URL with a 'Location' header in response.
  3. The suspended operation should be handled by a delegate in an asynchronous manner. Once complete, the response resource should be created at the above mentioned URL.
  4. Client would then need to do a HTTP GET on the redirection URL to get the actual response. The GET would either return the response resource or, either a 204 or 404, depending on the request processing state.
Invocation delegation
Suspension of the invocation would imply, dispatching the processing asynchronously to some other component of the provider. From JAX-RS perspective, this processing component would need to invoke a (annotated) method on a plain Java class via introspection, and probably pass the query/path params and/or POST content as arguments to it.

If we build the provider on top of a data distribution platform (datagrid), then we can distribute the request data (arguments, request details, correlationid). A receiving member would then need to dispatch the request data to the processor component, which would actually execute the invocation. The response would be stored in the datagrid, corresponding to the correlationid.
This correlationid would have been sent back to the client, as a part of the redirection URL.

Hazelcast
Hazelcast was used as the datagrid provider in this project. The distributed IMap, provided by Hazelcast can have local entry listeners attached to it. Entry listeners receive callbacks on map entry events like put/get/remove etc. Thus, whenever a request data is added (put) to a distributed map, an entry listener on the data owning node will be notified. The received data would be delegated to the processing component, to perform the dispatch to JAX-RS annotated service methods.

I have tried to represent it pictorially using a sequence diagram. Well, this may not be the best example of a SD usage, however, it will probably bring out the basic idea.



Design
The following logical components would be required in order to design the platform:
  • A HTTP server component that would be able to map RESTful uri to method invocation (REST Listener)
    • Intercept HTTP requests and parse request parameters/body
    • Prepare serializable request data structure
    • Generate HTTP response
  • A discovery component to auto discover and register JAX-RS annotated elements (RequestDispatcher)
    • Auto discover JAX-RS annotated elements and prepare method invocation metadata
    • Map the method invocation metadata to uri pattern
    • Execute method invocation
  • A Hazelcast component capable of registering local entry listeners and perform distributed map operations (HazelcastService)
    • Register local entry listener callback that gets notified on distributed map entry event
    • Provide a facade for map operations on Hazelcast datagrid
Using this approach for asynchronous RESTful processing, we can delegate execution not only in separate worker threads, but as well as on separate worker nodes. As an afterthought, the framework can well be extended to implement a distributed microservice provider.

Code samples
A Spring Boot project is available on Github. The REST Listener is built on top of a lightweight, opensource Netty based library Webbit.
Being a prototype project, it has some limitations as of now. Basically the project implements a small subset of the JAX RS 2.0 specs.
  1. Only JSON resquest/response is consumed/produced.
  2. Only GET/POST/DELETE is supported.
  3. For the asynchronous server API AsyncResponse, not all operations are supported. Details documented in code.
Given below is an example of an asynchronous REST service. The interesting point to note here, is that the actual method invocation is guaranteed to be executed on a separate thread (and probably in a separate JVM altogether) asynchronously.

On invocation of asyncResponse.resume(), the response is made available at resource URI, as shared in the initial response.

  @GET
  @Path("/async")
  //the first argument has to be @Suspended AsyncResponse type
  //an optional second argument, if present, is the deserialized JSON
  //content of the request (for POST)
  public void helloAsync(@Suspended AsyncResponse asynResponse)
  {
    //No separate asynchronous strategy is required here.
    //This method will automatically be executed in a separate worker (thread/node)

    Object returned = invokeMySuperHeavyService();
    asynResponse.resume(returned);

    //Now the response is committed and made available as JSON

  }

Wednesday, 23 December 2015

My adventures with Cassandra

This post is a follow up on the Weblogs post. Wherein I had proposed a time-series based log repository in Cassandra with the following data model:
CREATE TABLE data_points (
    app_id text,
    event_ts timeuuid,
    level text,
    log_text text,
    lucene text,
    PRIMARY KEY (app_id, event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC)
Given a single partition key of app_id, this model has a potential drawback that can lead to extremely wide rows or hotspots. For more detail, check the issue raised in Github. This is a valid concern, and best practises in Cassandra time-series modelling will always suggest to go for a composite partition key for better distribution of the data in cluster.

For Cassandra data querying, all the partition key columns are needed to be able to compute the hash that will allow it to locate the nodes containing the partition. Cassandra will require that you either restrict all the partition key columns, or none of them unless the query can use a secondary index.

However from the primary use case perspective, an user would want to simply search logs within a given time frame, and the minimal information thing s/he should be aware of is, probably, which application log to be looked at and within what date range.

Selecting a second partition key

Thus, the second partition key should be such that the application should be able to compute it from the user query parameters. As mentioned above, the mandatory query parameters for our system would be
  1. Application ID
  2. Date range
Since a date range will always be provided in the query, a formatted date string can possibly be used as a second partition key. While inserting data, it can be computed from the event timestamp, and while querying for data, it can be computed from the date range parameters.

So, a modified data model can be as follows:
CREATE TABLE data_points (
    app_id text,
    bucket text,
    event_ts timeuuid,
    level text,
    log_text text,
    lucene text,
    PRIMARY KEY ((app_id, bucket), event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC) 
 The bucket will simply be a yyyyMMdd string computed on event_ts.

Querying for data

Firstly, the buckets need to be computed from the date range parameter and that should be a trivial task.

Attempt 1:

Using a IN clause for the buckets, given the app_id and date range
select (..) from data_points where app_id = <app_id> and buckets in (<b1>,..,<bn>)
 However, it has a couple of drawbacks,
  1. Select on indexed columns and with IN clause for the PRIMARY KEY are not supported. Since the lucene column is a secondary indexed field, IN clause involving that column will be rejected by Cassandra
  2. With the IN query parameters growing sufficiently large, is generally an anti-pattern with Cassandra. Check this article for some discussion
Attempt 2:
Using the TOKEN() function to range query partition key
select (..) from data_points where token(app_id, bucket) > token(<app_id>,<b1>) and token(app_id,bucket) < token(<app_id>,<bn>)
Well, this can only work with a ByteOrderedPartitioner schema, and then again using a ByteOrderedPartitioner is strongly discouraged.

Finally, I opted to an asynchronous distributed querying technique. The algorithm for such a querying would be like,
for each bucket in buckets
loop
  execute async:
   select (..) from data_points where app_id=<app_id> and bucket=<bucket> 
   collect resultset future
end loop;
for each future in resultset futures
loop
  get resultset
  collate 
end loop;
merge collated;
 For concurrent execution, a couple of data structures needed to be developed, a QueryAggregator for collating async result sets; and a sorted bounded buffer to efficiently merge the result sets.

Tuesday, 8 December 2015

Full text searchable log repository using Cassandra + Lucene

Abstract:

Logging is a general cross cutting concern with almost all applications. We have robust libraries like log4j/logback or the jdk logging present for it. For many projects the logs are stored in some database for analysis afterwards.

In this article we propose such a persistent, searchable, scalable logging infrastructure that can be customized, or simply used as a plugin to extend the existing logging framework of an application. We will discuss it as a Log4j plugin, using a custom Log4j appender to utilize the framework.

Working code can be found at:
https://github.com/javanotes/weblogs
https://github.com/javanotes/weblog4jappender

The project uses Stratio secondary index plugin for Cassandra; a Lucene based full text search implementation on Cassandra. The core project is developed as a Spring boot application that can run as an embedded servlet container, or deployed as webapp. It provides:
  1. A RESTful api for log request ingestion. For e.g, the Log4j appender would POST logging requests to the api
  2. A web based console for viewing and searching through logs, and some data visualization
The Problem:

A logging service, that should be:
  • Persistent - Logs generated should be persisted as time series data, onto disk system
  • Scalable - Logs generated can grow arbitrarily. So the infrastructure should be scalable enough to persist massive amount of data.
  • Non intrusive - Logging should be a 'non intrusive' cross cutting concern of an application. That is to say, the performance/functionality of the application should not be affected/negligibly affected by the logging service
  • Centralized - The infrastructure should be a centralized one with multiple logging clients allowed to utilize it
  • Searchable - The persisted log data should be full text searchable, and date wise searchable, and can be paged through results.
  • Pluggable - The solution should be extensible so that it can be plugged/adapted with any popular logging framework easily.
  • Analyzable - Some analytics like error count trends
The Solution:

A persistent and searchable data store needs to be a database. While some traditional RDBMS (MySql, that I know of) do support full text search, but storing a massive amount of data with high write operation rate can quickly become a bottleneck. So we move out from a RDBMS solution.

Lucene based datastores, like SolR/Elastic Search can be a good proposition, preferably ElasticSearch, if we take the ease of use. However, these tools index document wise; that is to say, a complete record with all its fields is indexed, simplistically speaking. These solutions are a good fit for complex record (structurally) search capabilities, but can be an overkill for a limited search facility ('phrase' or 'term' search only) and without any need of, say relevance search.

This takes us to the options of nosql. The immediate candidates that come to mind, purely on the basis of popularity (well, in internet search results at least!) are Cassandra and Mongodb. In fact Mongo has built in support for full text search as well. However, keeping in mind the time series nature of the data, it seems a key-value store (Cassandra) should be a better fit than a document oriented (Mongo). Also, keeping in mind the high write operation nature, it felt to me Cassandra is a better candidate. The last inference is simply based on a previous project that I had worked on, where we were prototyping a big data ingestion platform on Cassandra. So.. Cassandra!

Approach:

We need to keep the following thing in mind in using Cassandra:
  • A basic challenge is, well, Cassandra (v2.2.3) has no out of the box support for full text search
  • Dataset pagination, which is not trivial using Cassandra and has some limitation on previous/next fetches.
  • Cassandra data model needs to be designed top down, that is we design how we store the data based on what we want to see and not the other way
  • The partition key, needs to be based on something that will always be provided while querying. As well it should be good enough to distribute data evenly across the cluster.
  • The timestamp column should be the first clustering key with a timeuuid datatype. Using a timeuuid will serve a dual purpose of natural ordering a time-series data, as well as provide for a unique 'rowid' that can be useful for pagination queries
  • Any other searchable field, say logging level, can be kept as subsequent clustering key
Full Text Search

For full text searching capability, we will use a custom secondary index plugin for Cassandra. A custom secondary index in Cassandra is an external java extension that Cassandra uses as a dynamic library. Some discussions here.

Stratio has developed a lucene based custom secondary index implementation as part of their core big data platform and it is open sourced. From their Github wiki:
It is a plugin for Apache Cassandra that extends its index functionality to provide near real time search such as ElasticSearch or Solr, including full text search capabilities and free multivariable, geospatial and bitemporal search.
 Keeping in mind the above points, a simple data model can be as follows:
CREATE TABLE data_points (
    app_id text,
    event_ts timeuuid,
    level text,
    log_text text,
    lucene text,
    PRIMARY KEY (app_id, event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC)
The column 'lucene' is a dummy column that will be used by Stratio plugin to index designated fields. More details can be found in their documentation.

Then queries can be formed to fetch time-series ordered data as follows:

SELECT * FROM data_points WHERE app_id=<app_log_id> AND lucene='{filter:{type:"boolean", must:[{type:"phrase",field:"log_text",value:"<search_phrase_or_term>"},{type:"match",field:"level",value:"INFO"}],should:[],not:[]},sort:{fields:[{field:"event_ts",reverse:false}]}}' AND event_ts>=minTimeuuid(<lower_bound_timeuuid>) AND event_ts<=maxTimeuuid(<upper_bound_timeuuid>) LIMIT <page_size>;
 The important points to note for pagination:
  1. For NEXT - <upper_bound> is from the last record in current page
  2. For PREVIOUS - <lower_bound> is from the first record in current page
  3. For NEXT, the limit is from 'head' and for PREVIOUS it will be from 'tail'. Accordingly for a PREVIOUS query, the timeuuid should be sorted in reverse
  4. Skipping to page numbers is not supported, since we do not have a concept of 'offset'
Screenshots:



Installation for plugging the infrastructure to a Log4j based application
  1. Setup Cassandra. Can be downloaded from here
  2. Setup Stratio Cassandra secondary index plugin. The latest release can be found here. Follow instructions to build a composite jar and include in Cassandra lib. NOTE: For Windows system running Datastax Cassandra as service (using Apache commons daemon), the classpath specified in registry path HKEY_LOCAL_MACHINE\SOFTWARE\Wow6432Node\Apache Software Foundation\Procrun 2.0\DataStax_Cassandra_Community_Server\Parameters\Java need to be modified to include the plugin jar
  3. Run application as Spring boot standalone with embedded container/ or deploy as a WAR file in a J2EE server
  4. Include weblog4jappender as a jar to an existing log4j application and configure log4j.properties as follows:
log4j.appender.weblogs=com.weblogs.log4j.WeblogsAppender
log4j.appender.weblogs.serviceUrl=http://<host:port>/weblogs/api/ingestbatch
log4j.appender.weblogs.batchSize=100
log4j.appender.weblogs.applicationId=<application_id>
#log4j.appender.weblogs.flushSecs=60 -- optional

Tuesday, 18 August 2015

A piped stream implementation

Java IO has the concept of PipedInputStream and PipedOutputStream from JDK 1.0 onwards, to be used as communication channels amongst threads in the same JVM. You write to the OutputStream from a particular thread, and read from the InputStream from another thread, thus creating a 'pipe' of communication.

Creating a pipe using Java IO is done via the PipedOutputStream and PipedInputStream classes. A PipedInputStream should be connected to a PipedOutputStream. The data written to the PipedOutputStream by one thread can be read from the connected PipedInputStream by another thread. 

Under the hood, the PipedInputStream has a synchronized receive(int) and receive(byte[], int, int) method which work on a consumer-producer model to that of the connected PipedOutputStream. As in any producer-consumer scenario there is a wait-notify system in place.

We try to improve this scenario, by using a ring buffer in the form of an ArrayBlockingQueue and thus leveraging the java.util.concurrent improvements.


public class NPipedOutputStream extends OutputStream implements InterruptibleChannel {
    private BlockingQueue<Byte> circularBuffer;
    private int bufferSize = Constants.READ_CHUNK_SIZE_BYTES;
    private long position = 0;
    private boolean connected;
   
    public long getPosition() {
        return position;
    }
    /**
     *
     */
    public NPipedOutputStream()
    {
       
    }
    /**
     *
     * @param sink
     * @param size
     * @throws IOException
     */
    public NPipedOutputStream(NPipedInputStream sink, int size) throws IOException
    {
        bufferSize = size;
        connect(sink);
    }
    public NPipedOutputStream(NPipedInputStream sink) throws IOException {
        connect(sink);
    }
    /**
     * Connect to sink
     * @param sink
     * @throws IOException
     */
    public void connect(NPipedInputStream sink) throws IOException {
        if (sink == null) {
            throw new NullPointerException();
        } else if (sink.connected) {
            throw new IOException("SynchronousInputStream provided is already connected to a source ");
        }
        if(connected)
            throw new IOException("SynchronousOutputStream already connected to a sink SynchronousInputStream");
       
        circularBuffer = new ArrayBlockingQueue<>(bufferSize);
        sink.circularBuffer = this.circularBuffer;
        sink.setConnected(true);
        connected = true;
    }
    @Override
    public void write(int b) throws IOException {
        writeByte((byte) b);
    }
    private void writeByte(byte b) throws IOException
    {
        try {
            writeToChannel(b);
        } catch (InterruptedException e) {
            throw new InterruptedIOException();
        }
    }
    public boolean isBlocked()
    {
        while(!b1.compareAndSet(false, true));
        try {
            return writer != null && writer.isAlive();
        } finally {
            b1.compareAndSet(true, false);
        }
       
    }
    private Thread writer;
    /**
     * blocking call
     * @throws InterruptedException
     */
    private void writeToChannel(byte b) throws InterruptedException {
        try {
           
            writer = Thread.currentThread();
            circularBuffer.put(b);
            position++;
           
        } finally {
            while(!b1.compareAndSet(false, true));
            writer = null;
            b1.compareAndSet(true, false);
        }
       
    }
    public int getBufferSize() {
        return bufferSize;
    }
    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }
    private final AtomicBoolean b1 = new AtomicBoolean();
    public void close() throws IOException {
        while(!b1.compareAndSet(false, true));
        if(writer != null)
        {
            writer.interrupt();
        }
        b1.compareAndSet(true, false);
        closed = true;       
    }
    private boolean closed = false;
    @Override
    public boolean isOpen() {
        return !closed;
    }
}


public class NPipedInputStream extends InputStream implements InterruptibleChannel {
    BlockingQueue<Byte> circularBuffer;
    boolean connected = false;
    void setConnected(boolean connected) {
        this.connected = connected;
    }
    public NPipedInputStream()
    {
       
    }
    NPipedInputStream(BlockingQueue<Byte> commChannel)
    {
        this.circularBuffer = commChannel;
    }
    @Override
    public int read() throws IOException {
        if(!connected)
            throw new IOException("SynchronousInputStream not connected to any source");
        return readByte();
    }
    private byte readByte() throws InterruptedIOException
    {
        try {
            return readFromChannel();
        } catch (InterruptedException e) {
            throw new InterruptedIOException();
        }
    }
    public boolean isBlocked()
    {
        while(!b1.compareAndSet(false, true));
        try {
            return reader != null && reader.isAlive();
        } finally {
            b1.compareAndSet(true, false);
        }
       
    }
    private final AtomicBoolean b1 = new AtomicBoolean();
    private Thread reader;
    private Byte readFromChannel() throws InterruptedException
    {
        Byte bytes;
        try {
            reader = Thread.currentThread();
            bytes = circularBuffer.take();
        } finally {
            while(!b1.compareAndSet(false, true));
            reader = null;
            b1.compareAndSet(true, false);
        }
        return bytes;
    }
    private boolean closed = false;
    public void close() throws IOException {
        while(!b1.compareAndSet(false, true));
        if(reader != null)
        {
            reader.interrupt();
        }
        b1.compareAndSet(true, false);
        closed = true;
    }
    @Override
    public boolean isOpen() {
        return !closed;
    }
}