This post is a follow up on the Weblogs post. Wherein I had proposed a time-series based log repository in Cassandra with the following data model:
For Cassandra data querying, all the partition key columns are needed to be able to compute the hash that will allow it to locate the nodes containing the partition. Cassandra will require that you either restrict all the partition key columns, or none of them unless the query can use a secondary index.
However from the primary use case perspective, an user would want to simply search logs within a given time frame, and the minimal information thing s/he should be aware of is, probably, which application log to be looked at and within what date range.
Selecting a second partition key
Thus, the second partition key should be such that the application should be able to compute it from the user query parameters. As mentioned above, the mandatory query parameters for our system would be
So, a modified data model can be as follows:
Querying for data
Firstly, the buckets need to be computed from the date range parameter and that should be a trivial task.
Attempt 1:
Using a IN clause for the buckets, given the app_id and date range
Attempt 2:CREATE TABLE data_points (Given a single partition key of app_id, this model has a potential drawback that can lead to extremely wide rows or hotspots. For more detail, check the issue raised in Github. This is a valid concern, and best practises in Cassandra time-series modelling will always suggest to go for a composite partition key for better distribution of the data in cluster.
app_id text,
event_ts timeuuid,
level text,
log_text text,
lucene text,
PRIMARY KEY (app_id, event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC)
For Cassandra data querying, all the partition key columns are needed to be able to compute the hash that will allow it to locate the nodes containing the partition. Cassandra will require that you either restrict all the partition key columns, or none of them unless the query can use a secondary index.
However from the primary use case perspective, an user would want to simply search logs within a given time frame, and the minimal information thing s/he should be aware of is, probably, which application log to be looked at and within what date range.
Selecting a second partition key
Thus, the second partition key should be such that the application should be able to compute it from the user query parameters. As mentioned above, the mandatory query parameters for our system would be
- Application ID
- Date range
So, a modified data model can be as follows:
CREATE TABLE data_points (The bucket will simply be a yyyyMMdd string computed on event_ts.
app_id text,
bucket text,
event_ts timeuuid,
level text,
log_text text,
lucene text,
PRIMARY KEY ((app_id, bucket), event_ts)
) WITH CLUSTERING ORDER BY (event_ts DESC)
Querying for data
Firstly, the buckets need to be computed from the date range parameter and that should be a trivial task.
Attempt 1:
Using a IN clause for the buckets, given the app_id and date range
select (..) from data_points where app_id = <app_id> and buckets in (<b1>,..,<bn>)However, it has a couple of drawbacks,
- Select on indexed columns and with IN clause for the PRIMARY KEY are not supported. Since the lucene column is a secondary indexed field, IN clause involving that column will be rejected by Cassandra
- With the IN query parameters growing sufficiently large, is generally an anti-pattern with Cassandra. Check this article for some discussion
Using the TOKEN() function to range query partition key
select (..) from data_points where token(app_id, bucket) > token(<app_id>,<b1>) and token(app_id,bucket) < token(<app_id>,<bn>)Well, this can only work with a ByteOrderedPartitioner schema, and then again using a ByteOrderedPartitioner is strongly discouraged.
Finally, I opted to an asynchronous distributed querying technique. The algorithm for such a querying would be like,
for each bucket in buckets
loop
execute async:
select (..) from data_points where app_id=<app_id> and bucket=<bucket>
collect resultset future
end loop;
for each future in resultset futures
loop
get resultset
collate
end loop;
merge collated;
For concurrent execution, a couple of data structures needed to be developed, a QueryAggregator for collating async result sets; and a sorted bounded buffer to efficiently merge the result sets.