back to listing index

Building A Relational Database Using Kafka

[web search]
Original source (yokota.blog)
Tags: distributed-systems psql relational-database kafka key-value
Clipped on: 2019-09-23

Skip to content

Building A Relational Database Using Kafka

In a previous post, I showed how Kafka can be used as the persistent storage for an embedded key-value store, called KCache. Once you have a key-value store, it can be used as the basis for other models such as documents, graphs, and even SQL. For example, CockroachDB is a SQL layer built on top of the RocksDB key-value store and YugaByteDB is both a document and SQL layer built on top of RocksDB. Other databases such as FoundationDB claim to be multi-model, because they support several types of models at once, using the key-value store as a foundation.

In this post I will show how KCache can be extended to implement a fully-functional relational database, called KarelDB1. In addition, I will show how today a database architecture can be assembled from existing open-source components, much like how web frameworks like Dropwizard came into being by assembling components such as a web server (Jetty), RESTful API framework (Jersey), JSON serialization framework (Jackson), and an object-relational mapping layer (JDBI or Hibernate).

Hello, KarelDB

Before I drill into the components that comprise KarelDB, first let me show you how to quickly get it up and running. To get started, download a release, unpack it, and then modify config/kareldb.properties to point to an existing Kafka broker. Then run the following:

1
$ bin/kareldb-start config/kareldb.properties

While KarelDB is still running, at a separate terminal, enter the following command to start up sqlline, a command-line utility for accessing JDBC databases.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ bin/sqlline
sqlline version 1.8.0
 
sqlline> !connect jdbc:avatica:remote:url=http://localhost:8765 admin admin
 
sqlline> create table books (id int, name varchar, author varchar);
No rows affected (0.114 seconds)
 
sqlline> insert into books values (1, 'The Trial', 'Franz Kafka');
1 row affected (0.576 seconds)
 
sqlline> select * from books;
+----+-----------+-------------+
| ID |   NAME    |   AUTHOR    |
+----+-----------+-------------+
| 1  | The Trial | Franz Kafka |
+----+-----------+-------------+
1 row selected (0.133 seconds)

KarelDB is now at your service.

Kafka for Persistence

At the heart of KarelDB is KCache, an embedded key-value store that is backed by Kafka.  Many components use Kafka as a simple key-value store, including Kafka Connect and Confluent Schema Registry.  KCache not only generalizes this functionality, but provides a simple Map based API for ease of use.  In addition, KCache can use different implementations for the embedded key-value store that is backed by Kafka.

In the case of KarelDB, by default KCache is configured as a RocksDB cache that is backed by Kafka. This allows KarelDB to support larger datasets and faster startup times. KCache can also be configured to use an in-memory cache instead of RocksDB if desired.

Avro for Serialization and Schema Evolution

Kafka has pretty much adopted Apache Avro as its de facto data format, and for good reason.  Not only does Avro provide a compact binary format, but it has excellent support for schema evolution.  Such support is why the Confluent Schema Registry has chosen Avro as the first format for which it provides schema management.

KarelDB uses Avro to both define relations (tables), and serialize the data for those relations.  By using Avro, KarelDB gets schema evolution for free when executing an ALTER TABLE command.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
sqlline> !connect jdbc:avatica:remote:url=http://localhost:8765 admin admin
 
sqlline> create table customers (id int, name varchar);
No rows affected (1.311 seconds)
 
sqlline> alter table customers add address varchar not null;
Error: Error -1 (00000) :
Error while executing SQL "alter table customers add address varchar not null":
org.apache.avro.SchemaValidationException: Unable to read schema:
{
  "type" : "record",
  "name" : "CUSTOMERS",
  "fields" : [ {
    "name" : "ID",
    "type" : "int",
    "sql.key.index" : 0
  }, {
    "name" : "NAME",
    "type" : [ "null", "string" ],
    "default" : null
  } ]
}
using schema:
{
  "type" : "record",
  "name" : "CUSTOMERS",
  "fields" : [ {
    "name" : "ID",
    "type" : "int",
    "sql.key.index" : 0
  }, {
    "name" : "NAME",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "ADDRESS",
    "type" : "string"
  } ]
}
 
sqlline> alter table customers add address varchar null;
No rows affected (0.024 seconds)

As you can see above, when we first try to add a column with a NOT NULL constraint, Avro rejects the schema change, because adding a new field with a NOT NULL constraint would cause deserialization to fail for older records that don’t have that field. When we instead add the same column with a NULL constraint, the ALTER TABLE command succeeds.

By using Avro for deserialization, a field (without a NOT NULL constraint) that is added to a schema will be appropriately populated with a default, or null if the field is optional. This is all automatically handled by the underlying Avro framework.

Another important aspect of Avro is that it defines a standard sort order for data, as well as a comparison function that operates directly on the binary-encoded data, without first deserializing it. This allows KarelDB to efficiently handle key range queries, for example.

Calcite for SQL

Apache Calcite is a SQL framework that handles query parsing, optimization, and execution, but leaves out the data store. Calcite allows for relational expressions to be pushed down to the data store for more efficient processing. Otherwise, Calcite can process the query using a built-in enumerable calling convention, that allows the data store to be represented as a set of tuples that can be accessed through an iterator interface. An embedded key-value store is a perfect representation for such a set of tuples, so KarelDB will handle key lookups and key range filtering (using Avro’s sort order support) but otherwise defer query processing to Calcite’s enumerable convention. One nice aspect of the Calcite project is that it continues to develop optimizations for the enumerable convention, which will automatically benefit KarelDB moving forward.

Calcite supports ANSI-compliant SQL, including some newer functions such as JSON_VALUE and JSON_QUERY.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
sqlline> create table authors (id int, json varchar);
No rows affected (0.132 seconds)
 
sqlline> insert into authors
       > values (1, '{"name":"Franz Kafka", "book":"The Trial"}');
1 row affected (0.086 seconds)
 
sqlline> insert into authors
       > values (2, '{"name":"Karel Capek", "book":"R.U.R."}');
1 row affected (0.036 seconds)
 
sqlline> select json_value(json, 'lax $.name') as author from authors;
+-------------+
|   AUTHOR    |
+-------------+
| Franz Kafka |
| Karel Capek |
+-------------+
2 rows selected (0.027 seconds)

Omid for Transactions and MVCC

Although Apache Omid was originally designed to work with HBase, it is a general framework for supporting transactions on a key-value store. In addition, Omid uses the underlying key-value store to persist metadata concerning transactions. This makes it especially easy to integrate Omid with an existing key-value store such as KCache.

Omid actually requires a few features from the key-value store, namely multi-versioned data and atomic compare-and-set capability. KarelDB layers these features atop KCache so that it can take advantage of Omid’s support for transaction management. Omid utilizes these features of the key-value store in order to provide snapshot isolation using multi-version concurrency control (MVCC). MVCC is a common technique used to implement snapshot isolation in other relational databases, such as Oracle and PostgreSQL.

Below we can see an example of how rolling back a transaction will restore the state of the database before the transaction began.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sqlline> !autocommit off
 
sqlline> select * from books;
+----+-----------+-------------+
| ID |   NAME    |   AUTHOR    |
+----+-----------+-------------+
| 1  | The Trial | Franz Kafka |
+----+-----------+-------------+
1 row selected (0.045 seconds)
 
sqlline> update books set name ='The Castle' where id = 1;
1 row affected (0.346 seconds)
 
sqlline> select * from books;
+----+------------+-------------+
| ID |    NAME    |   AUTHOR    |
+----+------------+-------------+
| 1  | The Castle | Franz Kafka |
+----+------------+-------------+
1 row selected (0.038 seconds)
 
sqlline> !rollback
Rollback complete (0.059 seconds)
 
sqlline> select * from books;
+----+-----------+-------------+
| ID |   NAME    |   AUTHOR    |
+----+-----------+-------------+
| 1  | The Trial | Franz Kafka |
+----+-----------+-------------+
1 row selected (0.032 seconds)

Transactions can of course span multiple rows and multiple tables.

Avatica for JDBC

KarelDB can actually be run in two modes, as an embedded database or as a server. In the case of a server, KarelDB uses Apache Avatica to provide RPC protocol support. Avatica provides both a server framework that wraps KarelDB, as well as a JDBC driver that can communicate with the server using Avatica RPC.

One advantage of using Kafka is that multiple servers can all “tail” the same set of topics. This allows multiple KarelDB servers to run as a cluster, with no single-point of failure. In this case, one of the servers will be elected as the leader while the others will be followers (or replicas). When a follower receives a JDBC request, it will use the Avatica JDBC driver to forward the JDBC request to the leader. If the leader fails, one of the followers will be elected as a new leader.

Database by Components

Today, open-source libraries have achieved what component-based software development was hoping to do many years ago. With open-source libraries, complex systems such as relational databases can be assembled by integrating a few well-designed components, each of which specializes in one thing that it does particularly well.

Above I’ve shown how KarelDB is an assemblage of several existing open-source components:

Currently, KarelDB is designed as a single-node database, which can be replicated, but it is not a distributed database. Also, KarelDB is a plain-old relational database, and does not handle stream processing. For a distributed, stream-relational database, please consider using KSQL instead, which is production-proven.

KarelDB is still in its early stages, but give it a try if you’re interesting in using Kafka to back your plain-old relational data.

  1. KarelDB is named after Karel Capek, a Czech science-fiction author who invented the word “robot”. There is also a programming language named after him. Franz Kafka is sometimes referred to as a Czech writer.  He was born in Prague, which is the capital of what is now the Czech Republic.

Like this:

Loading...

Leave a Reply