Skip to content

Commit

Permalink
in the middle
Browse files Browse the repository at this point in the history
  • Loading branch information
semihsalihoglu-uw committed Nov 11, 2024
1 parent 86f8f47 commit e6d922d
Showing 1 changed file with 187 additions and 19 deletions.
206 changes: 187 additions & 19 deletions src/content/post/2024-11-13-kuzu-v-0.7.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,201 @@ authors: ["team"]
tags: ["cypher"]
---

## Performance and scalability improvements
### Recursive joins
Our recursive joins has been heavily reworked for better scalability and performance.

#### Overview

#### Performance characteristics
After several very busy months, we are very happy to release Kùzu 0.7.0!
This release is primarily focused on performance and scalability improvements at the very
core of the system. The highlights are the following:
- **New and much faster recursive path finding algorithms** that implement relationship patterns with Kleene star.
- **Data spilling during loading** which enables loading very large graphs on machines with limited RAM.
- **Zone maps** which enables much faster scans of node/rel properties when there is
a filter on numeric properties.

### Spill to disk during COPY REL
This feature allows intermediate data during COPY REL to be spilled to a temporary file on disk when the buffer manager runs out of memory.
With this feature, we can ingests large dataset into the system with limited RAM.
For example, xxx
Importantly, we can finally scale Kùzu to load and query graphs with billions of edges
(see our microbenchmarks on LDBC1000 and Graph500-30, which respectively contain 2.2B and 17B edges).

The default path of spilling is `copy.tmp` file under the database directory. You can change the path by `CALL spill_to_file_tmp_path='new_tmp_file'`. Set the path to empty turns the spilling off.
On the usability side:
- **Improved CSV sniffing** to automatically detect several CSV configurations during data ingest.
- **Skipping and reporting erroneous CSV lines**.
- **New JSON data type** that you can store as node/rel property.
- **New official Golang API**.

The spilling only works with local file system, and is disabled under in-memory mode.
There are many other features in the release, so let's dive into the details!

### Floating point data compression
We have now integrated a new compression algorithm, ALP ([Adaptive Lossless floating-Point Compression](https://dl.acm.org/doi/pdf/10.1145/3626717)), for floating point data into the system.

// TODO: 1) brief introduction of ALP. 2) Show some micro benchmarks on db sizes.
## Performance and scalability improvements
### New recursive path finding algorithms
We spent a considerable amount of time to rewrite the query processing algorithms
that evaluate the recursive relationship pattern clauses (or recursive joins) in Cypher. These are the clauses
that contain the Kleene star (`*`) syntax in relationship patterns, such as
`(a:Person)-[e:Knows*1..10]->(b:Person)` or `(a:Person)-[e:Knows* SHORTEST]->(b:Person)`.
These are perhaps the most expensive joins that graph DBMSs evaluate as they are recursive
and in many graph databases, because each node connects to many other nodes (i.e., that the
joins are many-to-many), the search space grows very quickly. Even when searching paths
with relatively small lengths, say 4 or 5, from a single source can end up scanning and processing large chunks of
or even entire databases.

Prior to this release, our optimization
was not very optimized and we did not
parallelize the evaluation of these queries and used sparse intermediate data structures,
e.g., the data structures storing reached nodes during a shortest path computation
would grow as the number of nodes reached grows.
This works fine on queries if the depth of the recursion is small, but was very slow
on harder queries that have longer recursion depths on large datasets. We also lacked many other optimizations, such
as stopping early when all of the possible destination nodes are reached.
E.g., in the query `MATCH (a:Person)-[e:Knows* SHORTEST]->(b:Person) WHERE a.id = 0 and b.id = 1 RETURN *`,
if we start a recursive path finding routine from the Person node with id 0, we can stop the computation
when we reach the node with id 1. We lacked such optimizations.

In the last few months, we switched to a parallel implementation to use dense structures and implemented many
other optimizations, e.g., to store the intermediate paths during the computation very compact or pushing down
LIMIT and acyclic/trail/walk semantic checking to the computation.
For readers familiar with the low-level details of query processing techniques, our new
implementation mimicks the implementation in parallel graph analytics systems, such as [Ligra](https://jshun.csail.mit.edu/ligra.pdf),
[Pregel](https://dl.acm.org/doi/10.1145/1807167.1807184), or [GraphChi](https://www.usenix.org/system/files/conference/osdi12/osdi12-final-126.pdf).
The exact technical details is beyond the scope of this release post and we plan to write
a separate blog post on this topic later. From your perspective nothing changes except that you should observe
your queries using recursive path patterns getting faster and parallelizing.
All of the previous recursive path features are supported as before,
e.g., putting filters on intermediate nodes and relationships or the support for acyclic, trail, and walk semantics.
If you observe a slow down, it's certainly a performance bug, so let us know and we will fix it! Oh, and all of
the recursive computations happen by scanning data from disk, so we are not building any in-memory
index structures on the fly that can limit scalability. Kùzu's relationship/adjacency list storage is disk-based but already
optimized for good scan performance. So if your graphs are very large and don't need to fit into
Kùzu's buffer manager, we will scale out of memory transparently.

Here is a demonstrative micro-benchmark of the performance improvements on large databases.
Let's take a query that finds from a single source the shortest path lengths to all other nodes. We use two input graphs:

- LDBC1000-PK: LDBC1000 is [LDBC's social network benchmark](https://ldbcouncil.org/benchmarks/snb/) dataset at scale 1000 (so 1TB-scale database).
In LDBC1000-PK, we use only the subgraph of Person nodes and Knows edges between Person nodes. LDBC1000-PK contains **Xx** many nodes and **202M** edges;

- Graph500-30: This is the largest graph in [LDBC's graphalytics benchmarks](https://ldbcouncil.org/benchmarks/graphalytics/). This graph contains
448M nodes and 17B edges. When we load to disk, this graph takes 495GB on disk.

All experiments we present in this post use a machine with 2 AMD EPYC 7551 processors (32 cores) and 512GB DDR4 memory and **XX** SSDs.
We run Kùzu in its default setting.

In the next experiment, we find the length of shortest paths from a single source to all other nodes and count the
number of nodes at each length. The query template looks as follows:
```cypher
MATCH p = (n1:person)-[e:knows* SHORTEST]->(n2:person)
WHERE n1.ID={source_id}
RETURN len(e), count(*);
```

| # Threads | 1 | 2 | 4 | 8 | 16 | 32 |
|------------|------|-------|--------|-------|-------|-------|
| 0.6.1 (LDBC100-PK) | 43.0s | X | 38.0s | X | 48.5ms | X |
| 0.7.0 (LDBC100-PK) | 3.0s | X | 1.1s | 0.62s | 0.39s | 0.30s |
| 0.6.1 (Graph500-30) | X | X | X | X | X | X |
| 0.7.0 (Graph500-30) | 170s | X | 49s | 29s | 16.6s | 13.5s |

So on the 202M edge LDBC sub-graph we can compute shortest path lengths to all destinations in 0.3s using 32 threads. This is
a relatively small part of the entire LDBC database, and Kùzu does not scan much data from disk.
On the 17B edge graph, we can compute shortest path lengths in 13.5s. Note that we're finding shortest paths to 448M other nodes
and this is happening on a completely disk-based implementation, so all scans happen through the
buffer manager. At this scale, there is actual IOs happening at each iteration of the shortest path computation and
the computation takes **Xx** many iterations.
As you can see both queries also parallelize quite well and are much faster than the previous version of Kùzu.

There are still many more optimizations in our pipeline to make these faster, so you can keep an eye on how these
numbers are improving over the following releases.

### Data spilling during bulk relationship ingestion (COPY FROM)
Kùzu's fast path for ingesting large amounts of data is the [`COPY FROM`](https://docs.kuzudb.com/import/) statement.
`COPY FROM` can be used to ingest records from a source table into a node or relationship table.
The source table can be in a local or remote file in some tabular format
such as CSV or Parquet, or an in-memory objects, such as Pandas
data frame, or the result of another sub-query.
Prior to this release we had a scalability problem when using `COPY FROM` to ingest records into a relationship table.
Specifically, Kùzu required storing all of the relationship records in the source table in memory before it could start
creating its persistent disk-based structures. This meant that if you had machines with relatively small RAMs, you would have
to chunk the source table into multiple parts otherwise the `COPY FROM` pipeline could run out of memory and fail.

In this release, we address this issue. Kùzu now automatically spills the records it scans from the source table into
a temporary file on disk during `COPY FROM`. For interested readers in the internal implementation of this system,
this is in fact done automatically by the buffer manager of the system as `COPY FROM` demands more and more memory.
Kùzu can now ingest very large sets of relationship records (such as the edges of the 17B Graph500-30 graph in the above experiment)
when it is running on a machine with limited memory.

If you are curious about how this performs, here is a demonstrative example of the loading times when we limit
the buffer manager capacity of the system when loading the Graph500-30 graph. We are using 32 threads on the same machine as
in the above experiment:

| BM Size | Loading time | Amount of spilled data |
|------------|------|-------|
| 410GB | 3613s | 420GB |
| 205GB | 4081s | 638GB |
| 102GB | 4276s | 736GB |

So using 32 threads, we can load 17B edges in 1 hour when giving 420GB of memory to BM, 1 hour 8 minutes when giving 205GB,
and 1 hour and 10 minutes when using 102GB. These numbers should look very good to anyone working with large datasets
and existing GDBMSs. We are very proud of the performance of the `COPY FROM` pipeline and highly suggest Kùzu and
its `COPY FROM` pipeline if you need to a GDBMS that can ingest and work on very large databases!

There are several ways you can configure the spilling behavior, e.g., the directory to which
Kùzu should spill. Please see [this documentation page](XXX) for details.

### Zone maps
In this release we also finally enabled the zone maps optimizations. [Zone maps](https://dl.acm.org/doi/abs/10.14778/3137765.3137769) is a widely adopted optimization
in columnar systems to speed up scans when there is a filter in a query on a numeric column.
The idea is to store statistics, specifically the minimum and maximum values, per chunks of the column (in our case
[node groups](https://github.com/kuzudb/kuzu/issues/1474), which are roughly 130K of values).
When there is a filter on the property stored in the column,
the system can use the minimum and maximum values to infer whether the chunk can contain any value that passes the filter.
For example, suppose the query is `MATCH (a:Person) WHERE a.age < 10 RETURN *`. To evaluate this query, Kùzu needs to scan the age column
of a the Person nodes table. If for a particular node group j, the minimum value stored for the zone map is greater than 10, say 15,
then the system can skip over scanning the entire node group, since no value in node group j can pass the filter.

Here is a simple demonstrative single-threaded experiment demonstrating the performance benefits of enabling zone maps.
We run the following query template on LDBC1000's Comments node table, which contains 2.2B nodes:
```cypher
MATCH (c:Comment)
WHERE c.length>$length '
RETURN *;
```

| $length | output size | w/out zone maps | w/ zone maps |
|---------|--------------|-----------------|--------------|
| 2000 | 0 | 12.8s | 0.13s |
| 1900 | 4579 | 12.8s | 3.56s |
| 1500 | 23195 | 12.3s | 8.98s |
| 1000 | 29914 | 12.0s | 11.1s |

So if your query is searching for a very selective property range, as in the first row, large chunks of the column
can be skipped during the scan and improve your query's performance significantly! Zone maps are automatically enabled
on numeric columns and automatically used, so you do not have to do anything in your applications to benefit from this optimization.

### Other Optimizations
**ALP floating point data compression:** We are continuing to improve our storage by incorporating new compression algorithms. This release implements
the ALP ([Adaptive Lossless floating-Point Compression](https://dl.acm.org/doi/pdf/10.1145/3626717)) compression algorithm for FLOAT and DOUBLE values in the system.
Here is an example to demonstrate the compression ratio of ALP. We use two datasets, [US Stocks and CommonGovernment_1](XXX).
US Stocks contain **Xx** many tuples that consists of a single DOUBLE column.
CommonGovernment_1 contain **Xx** many tuples that consists of a 9 DOUBLE columns. We create node tables with the same
schema as these datasets and run a `COPY FROM` statement to bulk ingest the data in CSVs to their corresponding node tables.

| Dataset | v 0.6.1 (uncompressed) DB size | v 0.6.1 Copy time | v 0.7.0 (w/ ALP) DB size | v 0.7.0 Copy time |
|--------------------|--------------------------------|-------------------|--------------------------|--------------------------|
| US Stocks | 99 MB | 0.66s | 45MB (2.2x) | 0.67s (1.01x slowdown) |
| CommonGovernment_1 | 825MB | 1.41s | 290.5MB (2.8x) | 1.81s (1.28x slowdown) |

So we get, respectively, 2.2x and 2.8x compression ratios for the two datasets with 1.01x and 1.28x slowdown
of the bulk ingestion operation. Aside from the reduction in db size, as always, compression should improve
general query performance because it leads to less I/O and allows keeping larger fraction of the database to be kept in memory.
As with zone maps, this optimization also does not require any configurations and you will automatically benefit from it.

**Filter/projection push down to attached relational databases:** Our final performance optimization is about
attaching to external DuckDB, Postgres, and SQLite databases using Kùzu's [external RDBMSs extension](https://docs.kuzudb.com/extensions/attach/rdbms/).
This extension allows you to attach and scan a table in an external RDBMS directly from Cypher using the `LOAD FROM` clause.
Suppose you have attached to a remote Postgres database `uw` that has a Person table and you would like to copy the records
in this table to a Kùzu Person node table. You can run the following query: `COPY Person FROM (LOAD FROM uw.Person WHERE age > 10 RETURN name)`.
`age > 10` predicate in the above query is part of the `LOAD FROM` and filters the tuples in the Postgres table based on
their `age` values.
Part of the above query's execution requires sending a SQL statement to Postgres to scan the `uw.Person` records.
Previously, Kùzu would scan all tuples in `uw.Person` using a simple `SELECT * FROM uw.person` query
and run any filters and projections in its own query processor.
Starting from v 0.7.0, we now push down filters in the `WHERE` clause to the SQL query sent to Postgres.
Same happens for projections, i.e., when possible, projections are also pushed to SQL queries sent to the external RDBMSs.


### Filter and projection pushdown to relational database scan

## Features and usability improvements
### JSON data type
We now have native supported JSON as a data type in the system.
Expand Down

0 comments on commit e6d922d

Please sign in to comment.