Skip to content

Latest commit

 

History

History
184 lines (138 loc) · 8.9 KB

README.md

File metadata and controls

184 lines (138 loc) · 8.9 KB

Talaria

Test Release Go Report Card Docker Pulls

This repository contains a fork of TalariaDB, a distributed, highly available, and low latency time-series database for Big Data systems. It was originally designed and implemented in Grab, where millions and millions of transactions and connections take place every day , which requires a platform scalable data-driven decision making.

Introduction

TalariaDB helped us to overcome the challenge of retrieving and acting upon the information from large amounts of data. It addressed our need to query at least 2-3 terabytes of data per hour with predictable low query latency and low cost. Most importantly, it plays very nicely with the different tools’ ecosystems and lets us query data using SQL.

From the original design, we have extended Talaria to be setup in a two possible ways:

  1. As an event ingestion platform. This allows you to track events using a simple gRPC endpoint from almost anywhere.
  2. As a data store for hot data. This allows you to query hot data (e.g. last 6 hours) as it goes through the data pipeline and ultimately ends up in your data lake when compacted.

Talaria is designed around event-based data model. An event is essentially a set of key-value pairs, however to make it consistent we need to define a set of commonly used keys. Each event will consist of the following:

  • Hash key (e.g: using "event" key). This represents the type of the event and could be prefixed with the source scope (eg. "table1") and using the dot as a logical separator. The separation and namespacing is not required, but strongly recommended to make your system more usable.
  • Sort key (e.g: using "time" key). This represents the time at which the update has occurred, in unix timestamp (as precise as the source allows) and encoded as a 64-bit integer value.
  • Other key-value pairs will represent various values of the columns.

Below is an example of what a payload for an event describing a table update might look like.

KEY VALUE DATA TYPE
event table1.update string
time 1586500157 int64
column1 hello string
column2 { "name": "roman" } json

Talaria supports string, int32, int64, bool, float64, timestamp and json data types which are used to construct columns that can be exposed to Presto/SQL.

Event Ingestion with Talaria

If your organisation needs a reliable and scalable data ingestion platform, you can set up Talaria as one. The main advantage is that such platform is cost-efficient, does not require a complex Kafka setup and even offers in-flight query if you also point a Presto on it. The basic setup allows you to track events using a simple gRPC endpoint from almost anywhere.

alt text

In order to setup Talaria as an ingestion platform, you will need specify a table, in this case "eventlog", and enable compaction in the configuration, something along these lines:

mode: staging
env: staging
domain: "talaria-headless.default.svc.cluster.local"
storage:
  dir: "/data"
tables:
  eventlog:
    compact:                               # enable compaction
      interval: 60                         # compact every 60 seconds
      nameFunc: "s3://bucket/namefunc.lua" # file name function
      s3:                                  # sink to Amazon S3
        region: "ap-southeast-1"
        bucket: "bucket"
...

Once this is set up, you can point a gRPC client (see protobuf definition) directly to the ingestion endpoint. Note that we also offer some pre-generated or pre-made ingestion clients in this repository.

service Ingress {
  rpc Ingest(IngestRequest) returns (IngestResponse) {}
}

Below is a list of currently supported sinks and their example configurations:

For Microsoft Azure Blob Storage and Azure Data Lake Gen 2, we support writing across multiple storage accounts. We supports two modes:

  1. Random choice, where each write is directed to a storage account randomly, for which you can just specficy a list of storage accouts.
  2. Weighted choice, where a set of weights (positive integers) are assigned and each write is directed to a storage account based on the specified weights.

An example of weighted choice is shown below:

    - azure:
        container: a_container
        prefix: a_prefix
        blobServiceURL: .storage.microsoft.net
        storageAccounts:
            - a_storage_account
            - b_storage_account
        storageAccountWeights: [1, 2]

Random choice and weighed choice are particularly useful for some key scenarios:

  • High throughput deployment where the I/O generate by Talaria exceedes the limitation of the stroage accounts.
  • When deploying on internal endpoints with multiple VPNs links and you want to split the network traffic across multiple network links.

Hot Data Query with Talaria

If your organisation requires querying of either hot data (e.g. last n hours) or in-flight data (i.e as ingested), you can also configure Talaria to serve it to Presto using built-in Presto Thrift connector.

alt text

In the example configuration below we're setting up an s3 + sqs writer to continously ingest files from an S3 bucket and an "eventlog" table which will be exposed to Presto.

mode: staging
env: staging
domain: "talaria-headless.default.svc.cluster.local"
writers:
  grpc:
    port: 8080
  s3sqs:
    region: "ap-southeast-1"
    queue: "queue-url"
    waitTimeout: 1
    retries: 5
readers:
  presto:
    schema: data
    port: 8042
storage:
  dir: "/data"
tables:
  eventlog:
    ttl: 3600         # data is persisted for 1 hour
    hashBy: event
    sortBy: time
...

Once you have set up Talaria, you'll need to configure Presto to talk to it using the Thrift Connector. You would need to make sure that:

  1. In the properties file you have configured to talk to Talaria through a kubernetes load balancer.
  2. Presto can access directly the nodes, without the load balancer.

Once this is done, you should be able to query your data via Presto.

select *
from talaria.data.eventlog
where event = 'table1.update'
limit 1000

Ingesting Files Into Talaria

To ingest existing ORC, CSV or Parquet files from a storage URL (imagine S3 or Azure Blob Storage), use the Talaria File Ingestion Client:

https://github.com/atris/TalariaFileIngestionClient

Quick Start

The easiest way to get started would be using the provided helm chart.

Contributing

We are open to contributions, feel free to submit a pull request and we'll review it as quickly as we can. TalariaDB is maintained by:

License

TalariaDB is licensed under the MIT License.