Convert PyArrow RecordBatches to Postgres' native binary format.
"""Example for README.md"""
from tempfile import mkdtemp
import psycopg
import pyarrow.dataset as ds
import requests
from pgpq import ArrowToPostgresBinaryEncoder
# let's get some example data
tmpdir = mkdtemp()
with open(f"{tmpdir}/yellow_tripdata_2023-01.parquet", mode="wb") as f:
resp = requests.get(
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
)
resp.raise_for_status()
f.write(resp.content)
# load an arrow dataset
# arrow can load datasets from partitioned parquet files locally or in S3/GCS
# it handles buffering, matching globs, etc.
dataset = ds.dataset(tmpdir)
# create an encoder object which will do the encoding
# and give us the expected Postgres table schema
encoder = ArrowToPostgresBinaryEncoder(dataset.schema)
# get the expected Postgres destination schema
# note that this is _not_ the same as the incoming arrow schema
# and not necessarily the schema of your permanent table
# instead it's the schema of the data that will be sent over the wire
# which for example does not have timezones on any timestamps
pg_schema = encoder.schema()
# assemble ddl for a temporary table
# it's often a good idea to bulk load into a temp table to:
# (1) Avoid indexes
# (2) Stay in-memory as long as possible
# (3) Be more flexible with types
# (you can't load a SMALLINT into a BIGINT column without casting)
cols = [f'"{col_name}" {col.data_type.ddl()}' for col_name, col in pg_schema.columns]
ddl = f"CREATE TEMP TABLE data ({','.join(cols)})"
with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
with conn.cursor() as cursor:
cursor.execute(ddl) # type: ignore
with cursor.copy("COPY data FROM STDIN WITH (FORMAT BINARY)") as copy:
copy.write(encoder.write_header())
for batch in dataset.to_batches():
copy.write(encoder.write_batch(batch))
copy.write(encoder.finish())
# load into your actual table, possibly doing type casts
# cursor.execute("INSERT INTO \"table\" SELECT * FROM data")
"""Showcase defining encoders for fields."""
import pgpq
import psycopg
import pyarrow as pa
from pgpq import encoders
from pgpq import schema
data = [
pa.array([1, 2, 3, 4]),
pa.array(['{"age": 33, "name": "alice"}', '{"age": 24, "name": "bob"}', "{}", "null"]),
]
arrow_schema = pa.schema([("id", pa.int64()), ("properties", pa.string())])
record_batch = pa.RecordBatch.from_arrays(data, schema=arrow_schema)
encoder = pgpq.ArrowToPostgresBinaryEncoder(record_batch.schema)
pg_schema_with_text_properties = encoder.schema()
assert [
(col_name, col.data_type.ddl())
for col_name, col in pg_schema_with_text_properties.columns
] == [("id", "INT8"), ("properties", "TEXT")]
# To support a different PostgreSQL schema, we change the default encoders generated by pgpq:
# * 'id' encoded as INT8 (BIGINT).
# * 'properties' encoded as JSONB.
field_encoders = {
"id": encoders.Int64EncoderBuilder(pa.field("id", pa.int64())),
"properties": encoders.StringEncoderBuilder.new_with_output(
pa.field("properties", pa.string()), schema.Jsonb()
),
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(record_batch.schema, field_encoders)
pg_schema_with_jsonb_properties = encoder.schema()
assert [
(col_name, col.data_type.ddl())
for col_name, col in pg_schema_with_jsonb_properties.columns
] == [("id", "INT8"), ("properties", "JSONB")]
ddl = """
CREATE TABLE id_properties (
id INT8, -- Alternative: BIGINT
properties JSONB
)
"""
# Without the right encoding, PostgreSQL will report errors in the binary data format when
# executing the following COPY: It expects properties to be encoded as JSONB not TEXT.
with psycopg.connect("postgres://posthog:posthog@localhost:5432/posthog") as conn:
with conn.cursor() as cursor:
cursor.execute(ddl)
with cursor.copy("COPY id_properties FROM STDIN WITH (FORMAT BINARY)") as copy:
copy.write(encoder.write_header())
copy.write(encoder.write_batch(record_batch))
copy.write(encoder.finish())
# The 'id' field matches our schema, so we can use the default encoder for it.
# But, we still need to encode properties as JSONB.
# `infer_encoder` can be used to obtain the default encoder for a field.
field_encoders = {
"id": pgpq.ArrowToPostgresBinaryEncoder.infer_encoder(record_batch.field("id")),
"properties": encoders.StringEncoderBuilder.new_with_output(
pa.field("properties", pa.string()), schema.Jsonb()
),
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(record_batch.schema, field_encoders)
pg_schema_inferred_id_and_jsonb_properties = encoder.schema()
assert [
(col_name, col.data_type.ddl())
for col_name, col in pg_schema_inferred_id_and_jsonb_properties.columns
] == [("id", "INT8"), ("properties", "JSONB")]
with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
with conn.cursor() as cursor:
with cursor.copy("COPY id_properties FROM STDIN WITH (FORMAT BINARY)") as copy:
copy.write(encoder.write_header())
copy.write(encoder.write_batch(record_batch))
copy.write(encoder.finish())