Wednesday, 23 December 2015

My adventures with Cassandra

This post is a follow up on the Weblogs post. Wherein I had proposed a time-series based log repository in Cassandra with the following data model:
CREATE TABLE data_points (
    app_id text,
    event_ts timeuuid,
    level text,
    log_text text,
    lucene text,
    PRIMARY KEY (app_id, event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC)
Given a single partition key of app_id, this model has a potential drawback that can lead to extremely wide rows or hotspots. For more detail, check the issue raised in Github. This is a valid concern, and best practises in Cassandra time-series modelling will always suggest to go for a composite partition key for better distribution of the data in cluster.

For Cassandra data querying, all the partition key columns are needed to be able to compute the hash that will allow it to locate the nodes containing the partition. Cassandra will require that you either restrict all the partition key columns, or none of them unless the query can use a secondary index.

However from the primary use case perspective, an user would want to simply search logs within a given time frame, and the minimal information thing s/he should be aware of is, probably, which application log to be looked at and within what date range.

Selecting a second partition key

Thus, the second partition key should be such that the application should be able to compute it from the user query parameters. As mentioned above, the mandatory query parameters for our system would be
  1. Application ID
  2. Date range
Since a date range will always be provided in the query, a formatted date string can possibly be used as a second partition key. While inserting data, it can be computed from the event timestamp, and while querying for data, it can be computed from the date range parameters.

So, a modified data model can be as follows:
CREATE TABLE data_points (
    app_id text,
    bucket text,
    event_ts timeuuid,
    level text,
    log_text text,
    lucene text,
    PRIMARY KEY ((app_id, bucket), event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC) 
 The bucket will simply be a yyyyMMdd string computed on event_ts.

Querying for data

Firstly, the buckets need to be computed from the date range parameter and that should be a trivial task.

Attempt 1:

Using a IN clause for the buckets, given the app_id and date range
select (..) from data_points where app_id = <app_id> and buckets in (<b1>,..,<bn>)
 However, it has a couple of drawbacks,
  1. Select on indexed columns and with IN clause for the PRIMARY KEY are not supported. Since the lucene column is a secondary indexed field, IN clause involving that column will be rejected by Cassandra
  2. With the IN query parameters growing sufficiently large, is generally an anti-pattern with Cassandra. Check this article for some discussion
Attempt 2:
Using the TOKEN() function to range query partition key
select (..) from data_points where token(app_id, bucket) > token(<app_id>,<b1>) and token(app_id,bucket) < token(<app_id>,<bn>)
Well, this can only work with a ByteOrderedPartitioner schema, and then again using a ByteOrderedPartitioner is strongly discouraged.

Finally, I opted to an asynchronous distributed querying technique. The algorithm for such a querying would be like,
for each bucket in buckets
loop
  execute async:
   select (..) from data_points where app_id=<app_id> and bucket=<bucket> 
   collect resultset future
end loop;
for each future in resultset futures
loop
  get resultset
  collate 
end loop;
merge collated;
 For concurrent execution, a couple of data structures needed to be developed, a QueryAggregator for collating async result sets; and a sorted bounded buffer to efficiently merge the result sets.

Tuesday, 8 December 2015

Full text searchable log repository using Cassandra + Lucene

Abstract:

Logging is a general cross cutting concern with almost all applications. We have robust libraries like log4j/logback or the jdk logging present for it. For many projects the logs are stored in some database for analysis afterwards.

In this article we propose such a persistent, searchable, scalable logging infrastructure that can be customized, or simply used as a plugin to extend the existing logging framework of an application. We will discuss it as a Log4j plugin, using a custom Log4j appender to utilize the framework.

Working code can be found at:
https://github.com/javanotes/weblogs
https://github.com/javanotes/weblog4jappender

The project uses Stratio secondary index plugin for Cassandra; a Lucene based full text search implementation on Cassandra. The core project is developed as a Spring boot application that can run as an embedded servlet container, or deployed as webapp. It provides:
  1. A RESTful api for log request ingestion. For e.g, the Log4j appender would POST logging requests to the api
  2. A web based console for viewing and searching through logs, and some data visualization
The Problem:

A logging service, that should be:
  • Persistent - Logs generated should be persisted as time series data, onto disk system
  • Scalable - Logs generated can grow arbitrarily. So the infrastructure should be scalable enough to persist massive amount of data.
  • Non intrusive - Logging should be a 'non intrusive' cross cutting concern of an application. That is to say, the performance/functionality of the application should not be affected/negligibly affected by the logging service
  • Centralized - The infrastructure should be a centralized one with multiple logging clients allowed to utilize it
  • Searchable - The persisted log data should be full text searchable, and date wise searchable, and can be paged through results.
  • Pluggable - The solution should be extensible so that it can be plugged/adapted with any popular logging framework easily.
  • Analyzable - Some analytics like error count trends
The Solution:

A persistent and searchable data store needs to be a database. While some traditional RDBMS (MySql, that I know of) do support full text search, but storing a massive amount of data with high write operation rate can quickly become a bottleneck. So we move out from a RDBMS solution.

Lucene based datastores, like SolR/Elastic Search can be a good proposition, preferably ElasticSearch, if we take the ease of use. However, these tools index document wise; that is to say, a complete record with all its fields is indexed, simplistically speaking. These solutions are a good fit for complex record (structurally) search capabilities, but can be an overkill for a limited search facility ('phrase' or 'term' search only) and without any need of, say relevance search.

This takes us to the options of nosql. The immediate candidates that come to mind, purely on the basis of popularity (well, in internet search results at least!) are Cassandra and Mongodb. In fact Mongo has built in support for full text search as well. However, keeping in mind the time series nature of the data, it seems a key-value store (Cassandra) should be a better fit than a document oriented (Mongo). Also, keeping in mind the high write operation nature, it felt to me Cassandra is a better candidate. The last inference is simply based on a previous project that I had worked on, where we were prototyping a big data ingestion platform on Cassandra. So.. Cassandra!

Approach:

We need to keep the following thing in mind in using Cassandra:
  • A basic challenge is, well, Cassandra (v2.2.3) has no out of the box support for full text search
  • Dataset pagination, which is not trivial using Cassandra and has some limitation on previous/next fetches.
  • Cassandra data model needs to be designed top down, that is we design how we store the data based on what we want to see and not the other way
  • The partition key, needs to be based on something that will always be provided while querying. As well it should be good enough to distribute data evenly across the cluster.
  • The timestamp column should be the first clustering key with a timeuuid datatype. Using a timeuuid will serve a dual purpose of natural ordering a time-series data, as well as provide for a unique 'rowid' that can be useful for pagination queries
  • Any other searchable field, say logging level, can be kept as subsequent clustering key
Full Text Search

For full text searching capability, we will use a custom secondary index plugin for Cassandra. A custom secondary index in Cassandra is an external java extension that Cassandra uses as a dynamic library. Some discussions here.

Stratio has developed a lucene based custom secondary index implementation as part of their core big data platform and it is open sourced. From their Github wiki:
It is a plugin for Apache Cassandra that extends its index functionality to provide near real time search such as ElasticSearch or Solr, including full text search capabilities and free multivariable, geospatial and bitemporal search.
 Keeping in mind the above points, a simple data model can be as follows:
CREATE TABLE data_points (
    app_id text,
    event_ts timeuuid,
    level text,
    log_text text,
    lucene text,
    PRIMARY KEY (app_id, event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC)
The column 'lucene' is a dummy column that will be used by Stratio plugin to index designated fields. More details can be found in their documentation.

Then queries can be formed to fetch time-series ordered data as follows:

SELECT * FROM data_points WHERE app_id=<app_log_id> AND lucene='{filter:{type:"boolean", must:[{type:"phrase",field:"log_text",value:"<search_phrase_or_term>"},{type:"match",field:"level",value:"INFO"}],should:[],not:[]},sort:{fields:[{field:"event_ts",reverse:false}]}}' AND event_ts>=minTimeuuid(<lower_bound_timeuuid>) AND event_ts<=maxTimeuuid(<upper_bound_timeuuid>) LIMIT <page_size>;
 The important points to note for pagination:
  1. For NEXT - <upper_bound> is from the last record in current page
  2. For PREVIOUS - <lower_bound> is from the first record in current page
  3. For NEXT, the limit is from 'head' and for PREVIOUS it will be from 'tail'. Accordingly for a PREVIOUS query, the timeuuid should be sorted in reverse
  4. Skipping to page numbers is not supported, since we do not have a concept of 'offset'
Screenshots:



Installation for plugging the infrastructure to a Log4j based application
  1. Setup Cassandra. Can be downloaded from here
  2. Setup Stratio Cassandra secondary index plugin. The latest release can be found here. Follow instructions to build a composite jar and include in Cassandra lib. NOTE: For Windows system running Datastax Cassandra as service (using Apache commons daemon), the classpath specified in registry path HKEY_LOCAL_MACHINE\SOFTWARE\Wow6432Node\Apache Software Foundation\Procrun 2.0\DataStax_Cassandra_Community_Server\Parameters\Java need to be modified to include the plugin jar
  3. Run application as Spring boot standalone with embedded container/ or deploy as a WAR file in a J2EE server
  4. Include weblog4jappender as a jar to an existing log4j application and configure log4j.properties as follows:
log4j.appender.weblogs=com.weblogs.log4j.WeblogsAppender
log4j.appender.weblogs.serviceUrl=http://<host:port>/weblogs/api/ingestbatch
log4j.appender.weblogs.batchSize=100
log4j.appender.weblogs.applicationId=<application_id>
#log4j.appender.weblogs.flushSecs=60 -- optional