Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyarrow/adbc: no support for binding DECIMAL in Snowflake driver #2084

Open
pkit opened this issue Aug 19, 2024 · 26 comments
Open

pyarrow/adbc: no support for binding DECIMAL in Snowflake driver #2084

pkit opened this issue Aug 19, 2024 · 26 comments
Labels
Type: enhancement New feature or request

Comments

@pkit
Copy link

pkit commented Aug 19, 2024

What feature or improvement would you like to see?

I'm not sure if at the current state the driver is even usable. Almost all numeric types are decimals in Snowflake.

adbc_driver_manager.NotSupportedError: NOT_IMPLEMENTED: [Snowflake] Unsupported bind param '0' type decimal(1, 0)

I've tried to bind it directly from RecordBatch or through the DBAPI "interface", but nope.

@pkit pkit added the Type: enhancement New feature or request label Aug 19, 2024
@lidavidm
Copy link
Member

CC @zeroshade

@zeroshade
Copy link
Member

@pkit Currently, decimal is not supported for using arrow record batch data to be inserted into a query via bind params such as doing SELECT * FROM table WHERE col = ? and having a decimal replace this.

If you're doing a bulk insert, you should utilize cursor.adbc_ingest which has no issues with decimal data:

>>> tbl
pyarrow.Table
NUMBERTYPE: decimal128(38, 0)
NUMBERFLOAT: decimal128(15, 2)
----
NUMBERTYPE: [[1, 12345678901234567890123456789012345678]]
NUMBERFLOAT: [[1234567.89,9876543210.99]]
>>> conn = adbc_driver_snowflake.dbapi.connect(uri)
>>> cur = conn.cursor()
>>> cur.adbc_ingest('NUMBER_TEST', tbl)
0
>>> cur.execute('SELECT * FROM NUMBER_TEST')
>>> cur.fetch_arrow_table()
pyarrow.Table
NUMBERTYPE: decimal128(38, 0)
NUMBERFLOAT: decimal128(15, 2)
----
NUMBERTYPE: [[1, 12345678901234567890123456789012345678]]
NUMBERFLOAT: [[1234567.89,9876543210.99]]

At the same token, by default I believe we return all NUMBER(38, 0) as decimal128, but there is an option adbc.snowflake.sql.client_option.use_high_precision which can be set to "false" to have fixed-point data with a scale of 0 returned as int64 columns.

Could you share the code you were getting that error from if this doesn't answer your issue sufficiently?

@pkit
Copy link
Author

pkit commented Aug 20, 2024

cursor.adbc_ingest has additional role requirements: CREATE STAGE
More than that, it can be pretty sub-optimal if I do a lot of asynchronous inserts before commit.
Unless I don't understand how it works underneath.
Obviously I would prefer not to manage my own serialization, or to keep all the RecordBatches in memory.
Essentially my use case is akin to classic ETL: stream in batches of data (SELECT), transform, stream back (INSERT). While keeping it all under strict memory requirements (for example 4x16MB batches max at the same time).

@pkit
Copy link
Author

pkit commented Aug 20, 2024

@zeroshade It would be also nice to know where the limitation comes from? Snowflake ADBC-server implementation?

@zeroshade
Copy link
Member

cursor.adbc_ingest is going to use CREATE STAGE and COPY INTO ... FROM @state ... as documented in Snowflake's documentation at https://docs.snowflake.com/en/user-guide/data-load-local-file-system for efficient bulk loading of data. It will be significantly more performant than using INSERT INTO with bind parameters due to the way Snowflake's API works.

Essentially my use case is akin to classic ETL: stream in batches of data (SELECT), transform, stream back (INSERT). While keeping it all under strict memory requirements (for example 4x16MB batches max at the same time).

That's precisely what the adbc_ingest functionality is designed to be optimal for. Essentially the record batches are written to Parquet files in parallel (both the level of concurrency and the size of the parquet files are configurable) which are uploaded to Snowflake directly for staging before then being loaded into the table. (Again, following the steps in the above linked documentation, but with concurrency.

It would be also nice to know where the limitation comes from? Snowflake ADBC-server implementation?

The Snowflake ADBC driver utilizes Snowflake's Go client for communication, which does not support any decimal type as a bind parameter, and ADBC doesn't make any attempt currently to perform a cast for binding (we can optionally perform casting on receiving). Snowflake's Server implementation, currently, does not accept Arrow data directly for bind parameter input. That's part of why adbc_ingest uploads as Parquet files (aside from the fact that also performs compression etc. which is another reason why it's more performant than using INSERT INTO with bind parameters.

If at all possible, my recommendation here would be to either use adbc_ingest for your inserts if possible. Otherwise, you may need to cast your decimal data to int64/float64 in the Arrow record batches before you bind the stream if you can't use adbc_ingest.

@pkit
Copy link
Author

pkit commented Aug 20, 2024

@zeroshade Thanks. It all makes sense.
I've already found out that actual implementation lives in the go/adbc dir. I would prefer to not ser/de just for the sake of decimal support, mostly because I usually passthrough these columns anyway.

Some questions: at which point the data is considered sent to SF when adbc_ingest is used? How the backpressure is handled?
If I call adbc_ingest 1000 times with 4KB batches, is there a way to know how many actual parquets/copy streams were created?

@zeroshade
Copy link
Member

Some questions: at which point the data is considered sent to SF when adbc_ingest is used? How the backpressure is handled?

Backpressure and concurrency are handled in two ways:

  1. The RecordBatchReader which is passed to adbc_ingest is read from a single thread that will continuously call next on the reader and then push the record batch onto a channel. The buffer queue size (i.e. the max number of records queued for writing) is determined by the number of writers, controlled by the adbc.snowflake.statement.ingest_writer_concurrency option, which defaults to the number of CPUs.
  2. The number of concurrent file uploads and copy tasks on the Snowflake side, controlled by the adbc.snowflake.statement.ingest_upload_concurrency and adbc.snowflake.statement.ingest_copy_concurrency options.

If I call adbc_ingest 1000 times with 4KB batches, is there a way to know how many actual parquets/copy streams were created?

My personal recommendation would be to consolidate batches into fewer streams and call adbc_ingest with a consolidated streams of those batches rather than calling it 1000 times with 4KB batches which would also enable to you to have fewer batches in memory at a single time, etc. That said, you should be able to see how many actual parquet files / copy streams were created from your Snowflake monitoring which will show you all the copy tasks and files that are uploaded for the stage if you examine the queries.

@pkit
Copy link
Author

pkit commented Aug 20, 2024

Nice!
Interestingly though, the following simple code fails:

def main():
    conn_read = connect(f"{user}:{password}@{account}/{database}", db_kwargs={
        "adbc.snowflake.sql.schema": "PUBLIC",
    })
    conn_write = connect(f"{user}:{password}@{account}/{database}", db_kwargs={
        "adbc.snowflake.sql.schema": "PUBLIC",
    })
    with conn_read.cursor() as cursor_read:
        with conn_write.cursor() as cursor_write:
            cursor_read.adbc_statement.set_options(**{"adbc.snowflake.rpc.prefetch_concurrency": 2, "adbc.rpc.result_queue_size": 10})
            cursor_read.execute("SELECT * FROM T1")
            for batch in cursor_read.fetch_record_batch():
                print(batch)
                cursor_write.adbc_ingest("T2", batch, mode="append")

The failure is:

ERRO[0005]connection.go:275 gosnowflake.(*snowflakeConn).Close context canceled                              LOG_SESSION_ID=1685662994679674

And that's it.

@zeroshade
Copy link
Member

try doing:

cursor_read.execute("SELECT * FROM T1")
cursor_write.adbc_ingest("T2", cursor_read.fetch_record_batch(), mode="append")

Also: We have an upstream PR waiting to be merged to address that specific issue snowflakedb/gosnowflake#1196

@lidavidm
Copy link
Member

@zeroshade can we get some of these recommendations documented?

@zeroshade
Copy link
Member

zeroshade commented Aug 20, 2024

@lidavidm Does our documentation not already reccomend using adbc_ingest over repeated calls to cursor.execute('INSERT INTO ..... (?, ?, ?)') using bind? If not then yea, we definitely should get that documented

@lidavidm
Copy link
Member

Also any clarifications about backpressure or data types

@pkit
Copy link
Author

pkit commented Aug 20, 2024

@zeroshade
cursor_write.adbc_ingest("T2", cursor_read.fetch_record_batch(), mode="append") that defeats the purpose.
I do need to preprocess the batch before sending it to adbc_ingest

@zeroshade
Copy link
Member

zeroshade commented Aug 20, 2024

Okay, while the multiple calls to adbc_ingest would work, depending on various factors (how many sources, how large the records are, and so on) my recommendation would be to use a generator/iterator to construct a RecordBatchReader you can pass to adbc_ingest instead.

ie. something like the following:

def process_record_batches(input):
    for batch in input:
        # whatever pre-processing you want to perform on the batch
        print(batch)
        yield batch

with conn_read.cursor() as cursor_read:
        with conn_write.cursor() as cursor_write:
            cursor_read.adbc_statement.set_options(**{"adbc.snowflake.rpc.prefetch_concurrency": 2, "adbc.rpc.result_queue_size": 10})
            cursor_read.execute("SELECT * FROM T1")
            input = cursor_read.fetch_record_batch()
            reader = pyarrow.RecordBatchReader.from_batches(input.schema, process_record_batches(input))
            cursor_write.adbc_ingest("T2", reader, mode="append")

This way you don't have to pay the overhead multiple times, you can only pay the overhead once and it effectively creates a full push/pull pipeline that will handle backpressure as each part will wait for the previous stage (determined by buffer and queue sizes using the options I mentioned earlier).

@pkit
Copy link
Author

pkit commented Aug 20, 2024

@zeroshade Ok, makes sense. Although I'm here at the mercy of adbc_ingest pull for the next generator value.
Although I would prefer to front-load the pre-processing. Because sometimes it can take quite a lot of time. I hope the stage creation or other stuff it does underneath will not time out.
But I suppose it will work, I will check it out.

@pkit
Copy link
Author

pkit commented Aug 20, 2024

@zeroshade Just fyi. Calling adbc_ingest once, like in your example with generator, still produces context error.

@zeroshade
Copy link
Member

@joellubi is there a workaround for that context error until your upstream change is merged?

@joellubi
Copy link
Member

@zeroshade The simplest change we could make ourselves would be to set db.SetMaxIdleConns(0) on the database/sql DB instance, but this may increase time opening/closing connections as it effectively disables the connection pool. It does appear that there's some activity though on the upstream PR. It just got a second approval a few hours ago. I'll be keeping a close eye on it, making sure a fix is included in the next release one way or another.

@pkit Is the data still ingested when the context error is produced? In all our reproductions the error comes from a log rather than an exception, and the ingestion itself is still successful.

@joellubi
Copy link
Member

@zeroshade Ok, makes sense. Although I'm here at the mercy of adbc_ingest pull for the next generator value. Although I would prefer to front-load the pre-processing. Because sometimes it can take quite a lot of time. I hope the stage creation or other stuff it does underneath will not time out. But I suppose it will work, I will check it out.

By "front-load the pre-processing" do you mean that you would like to process the next batch while the current batch is being uploaded by adbc_ingest? Regardless of whether adbc_ingest "pulls" the batch or it is "pushed" as in your example, python will inherently only do one thing at a time assuming your pre-processing work is CPU-bound. In either case, you would need to use threading or multiprocessing to have python pre-process the next batch while the current batch is ingesting. One potential way to do this would be to use the python generator to wrap a Queue which can offload the pre-processing to another thread or process.

@joellubi
Copy link
Member

Upstream PR (snowflakedb/gosnowflake#1196) was just merged.

@pkit
Copy link
Author

pkit commented Aug 20, 2024

@joellubi Although I see "Success" for the COPY operation from the stage in SF log. The data is not ingested.

python will inherently only do one thing at a time

Yes. But in this case I just send it for preprocessing somewhere else. So it's purely I/O wait.
But you're right. adbc_ingest is a blocking call, so it's probably better to decouple read and write cursors into multiple processes and do preprocessing while reading.

@zeroshade
Copy link
Member

Upstream PR (snowflakedb/gosnowflake#1196) was just merged.

@joellubi let's bump our version of gosnowflake to pull in the fix so we can see if this fixes @pkit's issue.

But you're right. adbc_ingest is a blocking call, so it's probably better to decouple read and write cursors into multiple processes and do preprocessing while reading.

We are in the process of working out an async interface that we can implement to allow for a non-blocking adbc_ingest call, but it'll be a while for that.

@joellubi
Copy link
Member

@zeroshade I've opened PR: #2091

@pkit
Copy link
Author

pkit commented Aug 20, 2024

@zeroshade I can confirm that the fix works for disabling the error (I've built python module and friends from the PR 2901)
Unfortunately nothing gets inserted. Although I see 10 parquet files created, all looks fine in query log.

@pkit
Copy link
Author

pkit commented Aug 20, 2024

Ok, it works, I've noticed that COMMIT was not sent. Sending commit explicitly worked.

@CurtHagenlocher
Copy link
Contributor

The conversation here has strayed quite a bit away from the original problem report. It's good that the user's needs (may) have been solved, but the original problem still exists: you can only bind a few types to a Snowflake statement. This seems to be because the driver wants to convert them to types in the sql package, and that package has a decidedly minimal level of type support. I don't really know go, but my read of gosnowflake suggests that there's no extension mechanism by which it would support types not in the sql package, which means we're stuck until gosnowflake is updated. Does that look correct?

And should we close this issue and open a new issue instead?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Type: enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants