Jay Taylor's notes

back to listing index

Advanced Time Series with Cassandra

[web search]
Original source (www.datastax.com)
Tags: database cassandra indices time-series-databases materialized-views events www.datastax.com
Clipped on: 2015-12-18

DataStax Developer Blog

Advanced Time Series with Cassandra

By Tyler Hobbs -  March 28, 2012 | 28 Comments

Cassandra is an excellent fit for time series data, and it’s widely used for storing many types of data that follow the time series pattern: performance metrics, fleet tracking, sensor data, logs, financial data (pricing and ratings histories), user activity, and so on.

A great introduction to this topic is Kelley Reynolds’ Basic Time Series with Cassandra. If you haven’t read that yet, I highly recommend starting with it. This post builds on that material, covering a few more details, corner cases, and
advanced techniques.

Indexes vs Materialized Views

When working with time series data, one of two strategies is typically employed: either the column values contain row keys pointing to a separate column family which contains the actual data for events, or the complete set of data for each event is stored in the timeline itself. The latter strategy can be implemented by serializing the entire event into a single column value or by using composite column names of the form <timestamp>:<event_field>.

With the first strategy, which is similar to building an index, you first fetch a set of row keys from a timeline and then multiget the matching data rows from a separate column family. This approach is appealing to many at first because it is more normalized; it allows for easy updates of events, doesn’t require you to repeat the same data in multiple timelines, and lets you easily add built-in secondary indexes to your main data column family. However, the second step of the data fetching process, the multiget, is fairly expensive and slow. It requires querying many nodes where each node will need to perform many disk seeks to fetch the rows if they aren’t well cached. This approach will not scale well with large data sets.

Image (Asset 1/10) alt=

The top column family contains only a timeline index; the bottom, the actual data for the events.

The second strategy, which resembles maintaining a materialized view, provides much more efficient reads. Fetching a time slice of events only requires reading a contiguous portion of a row on one set of replicas. If the same event is tracked in multiple timelines, it’s okay to denormalize and store all of the event data in each of those timelines. One of the main principles that Cassandra was built on is that disk space is very cheap resource; minimizing disk seeks at the cost of higher space consumption is a good tradeoff. Unless the data for each event is very large, I always prefer this strategy over the index strategy.

Image (Asset 2/10) alt=

All event data is serialized as JSON in the column values.

Reversed Column Comparators

Since Cassandra 0.8, column comparators can easily be reversed. This means that if you’re using timestamps or TimeUUIDs as column names, you can choose to have them sorted in reverse chronological order.

If the majority of your queries ask for the N most recent events in a timeline or N events immediately before a point in time, using a reversed comparator will give you a small performance boost over always setting reversed=True when fetching row slices from the timeline.

Timeline Starting Points

To support queries that ask for all events before a given time, your application usually needs to know when the timeline was first started. Otherwise, if you aren’t guarenteed to have events in every bucket, you cannot just fetch buckets further and further back in time until you get back an empty row; there’s no way to distinguish between a bucket that just happens to contain no events and one that falls before the timeline even began.

To prevent uneccessary searching through empty rows, we can keep track of when the earliest event was inserted for a given timeline using a metadata row. When an application writes to a timeline for the first time after starting up, it can read the metadata row, find out the current earliest timestamp, and write a new timestamp if it ever inserts an earlier event. To avoid race conditions, add a new column to the metadata row each time a new earliest event is inserted. I suggest using TimeUUIDs with a timestamp matching the event’s timestamp for the column name so that the earliest timestamp will always be at the beginning of the metadata row.

After reading only the first column from the metadata row (either on startup or the first time it’s required, refreshing periodically), the application can know exactly how far in the past it should look for events in a given timeline.

High Throughput Timelines

Each row in a timeline will be handled by a single set of replicas, so they may become hotspots while the row holding the current time bucket falls in their range. It’s not very common, but occasionally a single timeline may grow at such a rate that a single node cannot easily handle it. This may happen if tens of thousands of events are being inserted per second or at a lower rate if the column values are large. Sometimes, by reducing the size of the time bucket enough, a single set of replicas will only have to ingest writes for a short enough period of time that the throughput is sustainable, but this isn’t always a feasible option.

In order to spread the write load among more nodes in the cluster, we can split each time bucket into multiple rows. We can use row keys of the form <timeline>:<bucket>:<partition>, where partition is a number between 1 and the number of rows we want to split the bucket across. When writing, clients should append new events to each of the partitions in round robin fashion so that all partitions grow at a similar rate. When reading, clients should fetch slices from all of the partition rows for the time bucket they are interested in and merge the results client-side, similar to the merge step of merge-sort.

If some timelines require splitting while others do not, or if you need to be able to adjust the number of rows a timeline is split across periodically, I suggest storing info about the splits in a metadata row for the timeline in a separate column family (see the notes at the end of this post). The metadata row might have one column for each time the splitting factor is adjusted, something like {<timestamp>: <splitting_factor>}, where timestamp should align with the beginning of a time bucket after which clients should use the new splitting factor. When reading a time slice, clients can know how many partition rows to ask for during a given range of time based on this metadata.

Image (Asset 3/10) alt=

The "jbellis" timeline has increased its splitting factor over time; it currently spans three rows for each time bucket.

Variable Time Bucket Sizes

For some applications, the rate of events for different timelines may differ drastically. If some timelines have an incoming event rate that is 100x or 1000x higher than other timelines, you may want to use a different time bucket size for different timelines to prevent extremely wide rows for the busy timelines or a very sparse set of rows for the slow timelines. In other cases, a single timeline may increase or decrease its rate of events over time; eventually, this timeline may need to change its bucket size to keep rows from growing too wide or too sparse.

Similar to the timeline metadata suggestion for high throughput timelines (above), we can track time bucket sizes and their changes for individual timelines with a metadata row. Use a column of the form {<timestamp>: <bucket_size>}, where timestamp aligns with the start of a time bucket, and bucket_size is the bucket size to use after that point in time, measured in a number of seconds. When reading a time slice of events, calculate the appropriate set of row keys based on the bucket size during that time period.

Image (Asset 4/10) alt=

At time 1332959000, the "jbellis" timeline switched from using 1000 second time buckets to 10 second buckets.

Notes on Timeline Metadata

When using timeline metadata for high throughput timelines or variable bucket size timelines, the metadata rows should typically be stored in a separate column family to allow for cache tuning. I suggest using a fair amount of key cache on the metadata column family if it will be queried frequently.

The timeline metadata should generally be written by a process external to the application to avoid race conditions, unless the application operates in such a fashion that this isn’t a concern. The application can read the metadata row on startup or on demand for a particular timeline; if the application is long lived, it should periodically poll the metadata row for updates. If this is done, a new splitting factor or bucket size can safely be set to start with a new time bucket that begins shortly in the future; the application processes should see the updated metadata in advance, before the new bucket begins, allowing them to change their behavior right on time.

Further Data Modeling Advaice

If you want to learn more about data modeling, I recommend taking Datastax’s free, self-paced online data modeling course (DS220).





Comments

  1. Image (Asset 5/10) alt= Carlos says:

    Will composite keys be efficient on large data sets in Cassandra 1.1?

    E.g.

    CREATE TABLE actions(
    user_id varchar,
    time_uuid uuid,

    PRIMARY KEY (user_id, time_uuid)
    );

  2. Image (Asset 6/10) alt= Tyler Hobbs says:

    Carlos, I’m not sure what you mean. There’s no reason that I can think of that would make composites inefficient now or in Cassandra 1.1. If you are using CQL, what you’re describing is only possible in CQL 3.0, which you can choose to use in Cassandra 1.1.

    Can you be more specific with your question?

  3. Image (Asset 7/10) alt= Carlos says:

    I was reading “Schema in Cassandra 1.1″. In the section “Clustering, composite keys, and more”, it describes defining CFs with composite primary keys. Will composite primary keys scale well with large data sets for time series?

  4. Image (Asset 8/10) alt= Tyler Hobbs says:

    Yes, they shouldn’t have any problems. The first component of the composite will be used as the row key; the remaining components become part of a composite column name. Queries for a slice of these will only read a portion of a single row, so it will be very efficient. You should have no problems scaling it.

  5. Howdy,

    Can I ask a data model question here?

    We have a book table with 20 columns, 300 million rows.

    create table book(
    book_id,
    isbn,
    price,
    author,
    titile,
    col_n1,
    col_n2,

    col_nm,
    );

    Data usage:

    We need to query data by each column, do pagination as below,

    select * from book where isbn < "XYZ" order by ISBN descending;
    select * from book where price < 992 order by price descending;
    select * from book where col_n1 < 992 order by col_n1 descending;
    select * from book where col_n2 < 992 order by col_n2 descending;

    select * from book where col_nm < 992 order by col_nm descending;

    We update book table in a high rate, at about 100 millions updates a day.

    If we choose Materialized Views approach, we have to update 20 column(s) in each Materialized View
    column family, for each base row update.
    Will the Cassandra write performance acceptable?

    Redis recommend building an index for the query on each column, that is your 1st strategy:
    http://redis.io/topics/data-types-intro
    (see section [ Pushing IDs instead of the actual data in Redis lists ]

    Should we just normalize the data, create base book table with book_id as primary key, and then
    build 20 index column family(s), use wide row column slicing approach, with index column values as column name and book_id as value?
    This way, we only need to update fewer column family that column value changed, but not all 20 Materialized Views CF(s).

    What do you think of it?

    Thanks,
    Charlie | DBA developer

  6. Image (Asset 10/10) alt= Bhaskar says:

    Can the 2 approaches, i.e. high-throughput timelines and varying buckt-size , be somehow combined , or used together.

    Or are they orthogonal, in that they achieve the same goal, to avoid hot-spots by distributing the data across nodes, but go about in different ways of achieving that goal.

    Secondly If the entire data is stored i.e. Materialized View approach, doesn’t that make the data difficult to search on, on account of not being able to create secondary index. i.e. How will you index some of the keys that are part of the data ?

  7. Tyler Hobbs says:

    Those are great questions!

    The two approaches can definitely be used in conjunction; they are not mutually exclusive. They have different goals: the variable bucket size approach helps to control the total width of rows, while the high-throughput approach minimizes the hotspot effect by distributing each time bucket in the timeline across multiple nodes. If you have concerns about both of these, you can combine the two approaches.

    Regarding the materialized view approach, you are correct, you can’t use the built-in secondary indexes directly on this data. Typically, what you do instead is create a separate materialized view column family for each type of query that you need to support on that data and denormalize so that the data exists separately in each of those column families. So, if you need to be able to query by time and also query by some second attribute, you will use one column family for the timelines and a second column family that will support queries based on the second attribute efficiently.

  8. Redis4You says:

    @Charlie
    Redis is different planet. It pay no price for the disk I/O lookups.
    With Cassandra you should de-normalize, in order to do less reads as possible, without having too long rows.

  9. rikAtee says:

    In the materialized table it appears you are using super columns. Is this correct or am I misinterpreting the diagram?

    I understand super columns will be replaced ultimately with composites.

    If my interpretation of the diagram is correct, how will using composite types change the materialized table?

    will it result in something like this?

    timeline_ids = [32232,45545,76566]
    columns_names = [‘foo’, ‘bar’, ‘baz’]
    columns = [(id,name) for id in timeline_ids for name in columns_names]

    column_family_name.get(user, columns=columns)

    Is this the most appropriate means?

  10. Tyler Hobbs says:

    @rikAtee I am not using super columns in the materialized view example, just storing a JSON blob. If you need to be able to update individual fields in those columns frequently *and* it’s not easy or efficient to overwrite the entire thing, you could use either supercolumns (not recommended) or composites (recommended), especially through CQL 3.

    For the composites-with-cql3 approach, I suggest checking out some of the recent blog posts on CQL 3: http://www.datastax.com/dev/blog/whats-new-in-cql-3-0, http://www.datastax.com/dev/blog/thrift-to-cql3, http://www.datastax.com/dev/blog/cql3_collections, and http://www.datastax.com/dev/blog/cql3-for-cassandra-experts.

    Whatever you end up doing, I don’t suggest trying to fetch a lot of columns by name, like your example code. Always try to use slices if you’re fetching a lot of columns.

  11. Scott says:

    We are planning a major time series database with Cassandra and wanted to thank you for this article. We made a few modifications, but one thing I would note is that using JSON for your data will take up a lot of space. There are alternatives out there but I won’t name them, a simple Google search should suffice though.

  12. hardik says:

    while using counter column family we can maually update values of counter say c1, c2 using cli

    how to achieve this using hector client,i want counter column to accept only 20(e.g.) records and than after new column c2 (with 20 rec) and than c3 and so on

    select * from timecontentcounter;

    key | column1 | value
    ————+———+——-
    2013053015 | c1 | 20
    2013053015 | c2 | 20
    In above example new column c3 should be generated to accept counter increments upto 20 records ,hector solution is needed

  13. Subodh Nijsure says:

    I am trying to store data with following schema:

    CREATE TABLE temp_humidity_data (
    asset_id text,
    date text,
    event_time timestamp,
    temprature int,
    humidity int,
    PRIMARY KEY((asset_id, date),event_time)
    )

    I have followed datastax article ‘Getting Started with Time Series Modeling’ – http://planetcassandra.org/blog/post/getting-started-with-time-series-data-modeling/

    however with this data model one thing that is not working is query that returns me data between two dates. How do I do that?

    If I do this:

    select * from temp_humidity_data
    where asset_id=’1234′ AND date >= ‘2010-04-02′ AND date <= '2011-04-03';

    It gives me following error:

    code=2200 [Invalid query] message="Only EQ and IN relation are supported on the partition key (unless you use the token() function)"

    In understand there is a way to do IN operator but I don't want to put all those dates in a 'IN' operator. Is there a way to query when using the above table definition data between two dates?

    1. Tyler Hobbs says:

      Subodh,

      With that schema, you will need to put the dates (and asset_id) into an IN clause or issue a separate query per day. (If you’re using one of the Datastax drivers, I suggest doing a separate asynchronous query for each partition, as this will put less strain on the coordinator node and still achieve a high level of concurrency.)

      If the number of events you have per-asset per-day is low (say, less than 10k), you may want to consider using a week or month instead of a single day for the partition key. If you do this, you will have fewer partitions to query. Just be careful not to let partitions grow too large. I would try to keep them under 100k rows.

      1. Firdousi says:

        @Tyler

        What is the issue when you have too many rows or partitions (as you mentioned, more than 100k)?

        1. Tyler Hobbs says:

          Wide partitions aren’t always handled well by compaction and repair. Cassandra 2.1 should handle compactions on wide partitions more efficiently, but repairs still repair the entire partition if any part of it is inconsistent.

          1. Hi, I got a similar question.

            Let’s say you want to get ALL the data from this ‘temp_humidity_data’ table. And this table, like the scheme above, is combing a date (month/year, whatever) together with the asset_id as 1 partition key?

            How would the query look like, to get all the data, from every data from eg. asset_id=2?

            I will put it differently; is the second partition key mandatory? If so, how to get all the data?

          2. Tyler Hobbs says:

            @Melroy there are basically two or three options:

            First, if you know the date range that the asset_id has data for, I suggest simply fetching all of valid the combinations of (date, asset_id) in parallel (with asynchronous queries if possible, or an IN clause if you must).

            If you don’t know the date range, you can do the same thing, but try all dates for which you have any data. A lot of these will not exist, but Cassandra can determine that they don’t exist very efficiently (through bloom filters). Another option is to create an index on the asset_id column, which would allow you to execute queries like SELECT * FROM mytable WHERE asset_id=?. However, that may not be as efficient as the other option, especially if you have vnodes enabled and you’re not using Cassandra 2.1+.

  14. @Tyler
    In my specific case it’s about stock data (timeseries). And I’m using the latest Cassandra version.

    CREATE TABLE ats.raw_ts (
    listings_id uuid, // Indirect the Symbol (eg. APPLE), it can handle symbol changes
    insertion_time timeuuid,
    type text, // Eg. ‘tick’
    price int,
    ask int,
    bid int,
    bid_size bigint,
    ask_size bigint,
    high int,
    low int,
    volume bigint,
    PRIMARY KEY(listings_id, insertion_time, type)
    )
    WITH CLUSTERING ORDER BY(insertion_time desc);

    Now is my question: Is the symbol as partition key enough? Will Cassandra spread the data around the cluster efficiently, by using the WITH CLUSTERING ORDER BY?

    Or you I need to setup a SECOND partition key? Like this:

    CREATE TABLE ats.raw_ts_2 (
    listings_id uuid,
    date text,
    insertion_time timeuuid,
    type text,
    price int,
    ask int,
    bid int,
    bid_size bigint,
    ask_size bigint,
    high int,
    low int,
    volume bigint,
    PRIMARY KEY((listings_id, date), insertion_time, type)
    )
    WITH CLUSTERING ORDER BY(insertion_time desc);

    As you can see, the partition key is now a combination of the symbol & year.

    The pro is that the data will be spread on the year. Let’s say you need 2 nodes to store the data. 1 node could be 2014 data & another 2015 of the same symbol (eg. AAPL). The big con is here; if you want to get ALL the data over 2 years or more, you need to get the data from several nodes. As you said, using IN.

    Another performance issue could be around old/new year. Where you definitely want some data from the months of previous year, together with the data from the next year.

    So, waht is the best solution? Is Cassandra smart enough with CLUSTERING ORDER BY? Do I really need this second partition key?

    1. Melroy van den Berg says:

      Because there is a 2 billion cell limit (row * columns). Is it also better to replace ask, bid, high, … by 1 value? Let’s say blob…

      1. Melroy van den Berg says:

        Ps.
        And I got +/- 120 rows per second. So symbol, date (year+month) satisfies.

        120*60*60*24*31*4

        (120 write/sec * 60 * 60 * 24 * days in month * 4 columns, if I use “value blob”.

  15. anu says:

    Hello Tyler – if I stored the data in JSON lob, how would I query them ( to get the individual values ) using a cql. or is there a better way to query them. please suggest.

    1. Tyler Hobbs says:

      You can use the new user-defined functions in Cassandra 2.2 to extract data from a JSON blob. However, you can’t do that inside of a WHERE clause right now.

  16. anu says:

    Hi

    This is really very helpful. can you please explain this with an example, like take a table where we need to store the time series data and suggest the DDL and best partition keys. you may like to assume that we need to select values based on timestamp field for the sake of simplicity.

    What is the meaning of “Variable Time Bucket Sizes” , can you please give an example, how to actually implement it.

  17. anu says:

    Hi Tyler, This is a great post. very helpful.

    I am modeling to store time series data, i am new to cassandra, have following questions

    1. I want to partition my table on timestamp is a such a way that one partition will always keep latest 36 hours of data and then other partition should be partioned for 1 day of data.

    Can you please give an example on how to do this.

    2. My model will keep the parameter data from different machines – machine_id is the key . what should be the best primary key

    an example would be very helpful to understand. if you can please take a sample table structure and show us how to model it , that would be very helpful.

  18. JD says:

    Ok, try it again. Why the composition key is ‘timeline:bucket:partition’ but not ‘bucket:timeline:partition’?

  19. Tyler Hobbs says:

    Either one would work fine. If you’re using RandomPartitioner or Murmur3Partitioner (which you should be using) then keys aren’t sorted anyway.

  20. anu says:

    Hi Tyler, This is a great post. very helpful.

    I am modeling to store time series data, i am new to cassandra, have following questions

    1. I want to partition my table on timestamp is a such a way that one partition will always keep latest 36 hours of data and then other partition should be partioned for 1 day of data.

    Can you please give an example on how to do this.

    2. My model will keep the parameter data from different machines – machine_id is the key . what should be the best primary key

    an example would be very helpful to understand. if you can please take a sample table structure and show us how to model it , that would be very helpful.

Leave a Reply

Your email address will not be published. Required fields are marked *

Name *

Email *

Website

Comment

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

<img height="1" width="1" border="0" alt="" src="https://googleads.g.doubleclick.net/pagead/viewthroughconversion/995363228/?frame=0&random=1450457487819&cv=8&fst=1450457487819&num=1&fmt=1&guid=ON&u_h=900&u_w=1440&u_ah=874&u_aw=1440&u_cd=24&u_his=1&u_tz=-480&u_java=false&u_nplug=5&u_nmime=7&frm=0&url=http%3A//www.datastax.com/dev/blog/advanced-time-series-with-cassandra&ref=http%3A//blog.parsely.com/post/1928/cass/" />