Wednesday, 23 December 2015

My adventures with Cassandra

This post is a follow up on the Weblogs post. Wherein I had proposed a time-series based log repository in Cassandra with the following data model:
CREATE TABLE data_points (
    app_id text,
    event_ts timeuuid,
    level text,
    log_text text,
    lucene text,
    PRIMARY KEY (app_id, event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC)
Given a single partition key of app_id, this model has a potential drawback that can lead to extremely wide rows or hotspots. For more detail, check the issue raised in Github. This is a valid concern, and best practises in Cassandra time-series modelling will always suggest to go for a composite partition key for better distribution of the data in cluster.

For Cassandra data querying, all the partition key columns are needed to be able to compute the hash that will allow it to locate the nodes containing the partition. Cassandra will require that you either restrict all the partition key columns, or none of them unless the query can use a secondary index.

However from the primary use case perspective, an user would want to simply search logs within a given time frame, and the minimal information thing s/he should be aware of is, probably, which application log to be looked at and within what date range.

Selecting a second partition key

Thus, the second partition key should be such that the application should be able to compute it from the user query parameters. As mentioned above, the mandatory query parameters for our system would be
  1. Application ID
  2. Date range
Since a date range will always be provided in the query, a formatted date string can possibly be used as a second partition key. While inserting data, it can be computed from the event timestamp, and while querying for data, it can be computed from the date range parameters.

So, a modified data model can be as follows:
CREATE TABLE data_points (
    app_id text,
    bucket text,
    event_ts timeuuid,
    level text,
    log_text text,
    lucene text,
    PRIMARY KEY ((app_id, bucket), event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC) 
 The bucket will simply be a yyyyMMdd string computed on event_ts.

Querying for data

Firstly, the buckets need to be computed from the date range parameter and that should be a trivial task.

Attempt 1:

Using a IN clause for the buckets, given the app_id and date range
select (..) from data_points where app_id = <app_id> and buckets in (<b1>,..,<bn>)
 However, it has a couple of drawbacks,
  1. Select on indexed columns and with IN clause for the PRIMARY KEY are not supported. Since the lucene column is a secondary indexed field, IN clause involving that column will be rejected by Cassandra
  2. With the IN query parameters growing sufficiently large, is generally an anti-pattern with Cassandra. Check this article for some discussion
Attempt 2:
Using the TOKEN() function to range query partition key
select (..) from data_points where token(app_id, bucket) > token(<app_id>,<b1>) and token(app_id,bucket) < token(<app_id>,<bn>)
Well, this can only work with a ByteOrderedPartitioner schema, and then again using a ByteOrderedPartitioner is strongly discouraged.

Finally, I opted to an asynchronous distributed querying technique. The algorithm for such a querying would be like,
for each bucket in buckets
loop
  execute async:
   select (..) from data_points where app_id=<app_id> and bucket=<bucket> 
   collect resultset future
end loop;
for each future in resultset futures
loop
  get resultset
  collate 
end loop;
merge collated;
 For concurrent execution, a couple of data structures needed to be developed, a QueryAggregator for collating async result sets; and a sorted bounded buffer to efficiently merge the result sets.

No comments:

Post a Comment