-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-38255: [Go][C++] Implement Flight SQL Bulk Ingestion #38385
GH-38255: [Go][C++] Implement Flight SQL Bulk Ingestion #38385
Conversation
Could you also add a new integration test to arrow/dev/archery/archery/integration/runner.py Lines 572 to 630 in f489996
|
@kou I added my tests under the existing scenario |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should create a new integration test scenario so that it can be individually skipped/enabled for each implementation
format/FlightSql.proto
Outdated
|
||
/* | ||
* Retrieves a boolean value indicating whether transactions are supported for bulk ingestion. If not, invoking | ||
* the method commit in the context of a bulk ingestion is a noop, and the isolation level is | ||
* `arrow.flight.protocol.sql.SqlTransactionIsolationLevel.TRANSACTION_NONE`. | ||
* | ||
* Returns: | ||
* - false: if bulk ingestion transactions are unsupported; | ||
* - true: if bulk ingestion transactions are supported. | ||
*/ | ||
INGEST_TRANSACTIONS_SUPPORTED = 577; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should go in the 0-500 range
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Done.
format/FlightSql.proto
Outdated
/* | ||
* Represents a bulk ingestion request. Used in the command member of FlightDescriptor | ||
* for the the RPC call DoPut to cause the server load the contents of the stream's | ||
* FlightData into the target destination. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why the extra indent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this
go/arrow/flight/flightsql/client.go
Outdated
if err = proto.Unmarshal(res.GetAppMetadata(), &updateResult); err != nil { | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you have to drain the read-side of the stream here, even if technically the server should only have sent one message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Let me know what you think of the approach. We don't currently ensure the stream is drained in the other DoPut methods FWIW, but I can make changes there as well if it's preferred.
Yes. Sorry for not clarifying this. |
@kou No problem, I created a separate scenario. |
func (m *flightSqlIngestionScenarioTester) flightInfoForCommand(desc *flight.FlightDescriptor, schema *arrow.Schema) *flight.FlightInfo { | ||
return &flight.FlightInfo{ | ||
Endpoint: []*flight.FlightEndpoint{ | ||
{Ticket: &flight.Ticket{Ticket: desc.Cmd}}, | ||
}, | ||
Schema: flight.SerializeSchema(schema, memory.DefaultAllocator), | ||
FlightDescriptor: desc, | ||
TotalRecords: -1, | ||
TotalBytes: -1, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't actually being used anywhere, that's why the CI is failing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, just removed.
go/arrow/flight/flightsql/client.go
Outdated
wr = flight.NewRecordWriter(stream, ipc.WithAllocator(c.Alloc), ipc.WithSchema(rdr.Schema())) | ||
wr.SetFlightDescriptor(desc) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need defer wr.Close()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, forgot to do so. Just added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM generally, though we need a second implementation before it can be voted on in the Mailing List to officially add this to the spec.
Thanks @zeroshade. I'm planning to get another implementation out in the next week or so. |
434d537
to
db70c20
Compare
@zeroshade or others, any last comments? |
} | ||
|
||
// Options to assert before/after mocked ingest call | ||
func getIngestOptions() flightsql.ExecuteIngestOpts { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this return a pointer to avoid the error about copying the mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
go/arrow/flight/flightsql/client.go
Outdated
// Servers cannot infer defaults for these parameters, so we validate the request to ensure they are set. | ||
if reqOptions.TableDefinitionOptions == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we validate that reqOptions
is not nil
first to avoid a potential panic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Since we cast to the protobuf command type anyway, I did so a bit earlier to let it take care of nil handling for us. I added a nil test for completeness too.
go/arrow/flight/flightsql/client.go
Outdated
rdr.Retain() | ||
defer rdr.Release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you aren't passing the reader to anything outside of this function, I don't think this is necessary at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, updated
go/arrow/flight/flightsql/types.go
Outdated
type CreatePreparedStatementResult = pb.ActionCreatePreparedStatementResult | ||
|
||
type TableDefinitionOptions = pb.CommandStatementIngest_TableDefinitionOptions | ||
|
||
type TableDefinitionOptionsTableNotExistOption = pb.CommandStatementIngest_TableDefinitionOptions_TableNotExistOption | ||
type TableDefinitionOptionsTableExistsOption = pb.CommandStatementIngest_TableDefinitionOptions_TableExistsOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for clarity should we gather these into a single declaration block?
type CreatePreparedStatementResult = pb.ActionCreatePreparedStatementResult | |
type TableDefinitionOptions = pb.CommandStatementIngest_TableDefinitionOptions | |
type TableDefinitionOptionsTableNotExistOption = pb.CommandStatementIngest_TableDefinitionOptions_TableNotExistOption | |
type TableDefinitionOptionsTableExistsOption = pb.CommandStatementIngest_TableDefinitionOptions_TableExistsOption | |
type ( | |
CreatePreparedStatementResult = pb.ActionCreatePreparedStatementResult | |
TableDefinitionOptions = pb.CommandStatementIngest_TableDefinitionOptions | |
TableDefinitionOptionsTableNotExistOption = pb.CommandStatementIngest_TableDefinitionOptions_TableNotExistOption | |
TableDefinitionOptionsTableExistsOption = pb.CommandStatementIngest_TableDefinitionOptions_TableExistsOption | |
) |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I gathered the ones related to TableDefinitionOptions.
The CreatePreparedStatementResult type is unrelated.
Outside of my nitpicks, this all LGTM |
I addressed the remaining comments. There are 2 failing workflows but they appear unrelated. If this is fine, would one of you please merge this or let me know if I should do so? |
You're right, the failures are unrelated to your changes. This looks good to me. If no one else objects I'll merge this later this evening |
@zeroshade It looks like you're good with this merging and there have been no objections. I'll merge this shortly. |
After merging your PR, Conbench analyzed the 5 benchmarking runs that have been run so far on merge-commit 0d1ea5d. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about 12 possible false positives for unstable benchmarks that are known to sometimes produce them. |
…#38385) ### Rationale for this change It was suggested in the discussion around apache/arrow-adbc#1107 for the Flight SQL ADBC driver that an "Ingest" command would be a helpful addition to the Flight SQL specification. This command would enable a Flight SQL client to provide a FlightData stream to the server without needing to know its SQL syntax, and have that stream loaded into a target table by whichever means the server deems appropriate. ### What changes are included in this PR? - Format: - Add CommandStatementIngest message type to Flight SQL proto definition - Add FLIGHT_SQL_SERVER_BULK_INGESTION and FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED options for SqlInfo - Go: - Generate pb - Server-side implementation - Client-side implementation - Unit + integration tests - C++: - Server-side implementation - Client-side implementation - Integration tests ### Are these changes tested? Yes, see `server_test.go`, `scenario.go`, and `test_integration.cc`. ### Are there any user-facing changes? Yes, new Flight SQL client and server functionality is being added. Changes are not expected to break existing users. * Closes: apache#38255 Lead-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Signed-off-by: Matt Topol <[email protected]>
…#38385) ### Rationale for this change It was suggested in the discussion around apache/arrow-adbc#1107 for the Flight SQL ADBC driver that an "Ingest" command would be a helpful addition to the Flight SQL specification. This command would enable a Flight SQL client to provide a FlightData stream to the server without needing to know its SQL syntax, and have that stream loaded into a target table by whichever means the server deems appropriate. ### What changes are included in this PR? - Format: - Add CommandStatementIngest message type to Flight SQL proto definition - Add FLIGHT_SQL_SERVER_BULK_INGESTION and FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED options for SqlInfo - Go: - Generate pb - Server-side implementation - Client-side implementation - Unit + integration tests - C++: - Server-side implementation - Client-side implementation - Integration tests ### Are these changes tested? Yes, see `server_test.go`, `scenario.go`, and `test_integration.cc`. ### Are there any user-facing changes? Yes, new Flight SQL client and server functionality is being added. Changes are not expected to break existing users. * Closes: apache#38255 Lead-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Signed-off-by: Matt Topol <[email protected]>
…#38385) ### Rationale for this change It was suggested in the discussion around apache/arrow-adbc#1107 for the Flight SQL ADBC driver that an "Ingest" command would be a helpful addition to the Flight SQL specification. This command would enable a Flight SQL client to provide a FlightData stream to the server without needing to know its SQL syntax, and have that stream loaded into a target table by whichever means the server deems appropriate. ### What changes are included in this PR? - Format: - Add CommandStatementIngest message type to Flight SQL proto definition - Add FLIGHT_SQL_SERVER_BULK_INGESTION and FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED options for SqlInfo - Go: - Generate pb - Server-side implementation - Client-side implementation - Unit + integration tests - C++: - Server-side implementation - Client-side implementation - Integration tests ### Are these changes tested? Yes, see `server_test.go`, `scenario.go`, and `test_integration.cc`. ### Are there any user-facing changes? Yes, new Flight SQL client and server functionality is being added. Changes are not expected to break existing users. * Closes: apache#38255 Lead-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Signed-off-by: Matt Topol <[email protected]>
…#38385) ### Rationale for this change It was suggested in the discussion around apache/arrow-adbc#1107 for the Flight SQL ADBC driver that an "Ingest" command would be a helpful addition to the Flight SQL specification. This command would enable a Flight SQL client to provide a FlightData stream to the server without needing to know its SQL syntax, and have that stream loaded into a target table by whichever means the server deems appropriate. ### What changes are included in this PR? - Format: - Add CommandStatementIngest message type to Flight SQL proto definition - Add FLIGHT_SQL_SERVER_BULK_INGESTION and FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED options for SqlInfo - Go: - Generate pb - Server-side implementation - Client-side implementation - Unit + integration tests - C++: - Server-side implementation - Client-side implementation - Integration tests ### Are these changes tested? Yes, see `server_test.go`, `scenario.go`, and `test_integration.cc`. ### Are there any user-facing changes? Yes, new Flight SQL client and server functionality is being added. Changes are not expected to break existing users. * Closes: apache#38255 Lead-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Signed-off-by: Matt Topol <[email protected]>
…#38385) ### Rationale for this change It was suggested in the discussion around apache/arrow-adbc#1107 for the Flight SQL ADBC driver that an "Ingest" command would be a helpful addition to the Flight SQL specification. This command would enable a Flight SQL client to provide a FlightData stream to the server without needing to know its SQL syntax, and have that stream loaded into a target table by whichever means the server deems appropriate. ### What changes are included in this PR? - Format: - Add CommandStatementIngest message type to Flight SQL proto definition - Add FLIGHT_SQL_SERVER_BULK_INGESTION and FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED options for SqlInfo - Go: - Generate pb - Server-side implementation - Client-side implementation - Unit + integration tests - C++: - Server-side implementation - Client-side implementation - Integration tests ### Are these changes tested? Yes, see `server_test.go`, `scenario.go`, and `test_integration.cc`. ### Are there any user-facing changes? Yes, new Flight SQL client and server functionality is being added. Changes are not expected to break existing users. * Closes: apache#38255 Lead-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Joel Lubinitsky <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Signed-off-by: Matt Topol <[email protected]>
Please look at #38255 for details on this functionality. Support for Go and C++ was added as part of #38385. This pull request is to add the required support for Java. * GitHub Issue: #38255 Lead-authored-by: Amit Mittal <[email protected]> Co-authored-by: Amit Mittal <[email protected]> Signed-off-by: David Li <[email protected]>
) Please look at apache#38255 for details on this functionality. Support for Go and C++ was added as part of apache#38385. This pull request is to add the required support for Java. * GitHub Issue: apache#38255 Lead-authored-by: Amit Mittal <[email protected]> Co-authored-by: Amit Mittal <[email protected]> Signed-off-by: David Li <[email protected]>
) Please look at apache#38255 for details on this functionality. Support for Go and C++ was added as part of apache#38385. This pull request is to add the required support for Java. * GitHub Issue: apache#38255 Lead-authored-by: Amit Mittal <[email protected]> Co-authored-by: Amit Mittal <[email protected]> Signed-off-by: David Li <[email protected]>
Rationale for this change
It was suggested in the discussion around apache/arrow-adbc#1107 for the Flight SQL ADBC driver that an "Ingest" command would be a helpful addition to the Flight SQL specification. This command would enable a Flight SQL client to provide a FlightData stream to the server without needing to know its SQL syntax, and have that stream loaded into a target table by whichever means the server deems appropriate.
What changes are included in this PR?
Are these changes tested?
Yes, see
server_test.go
,scenario.go
, andtest_integration.cc
.Are there any user-facing changes?
Yes, new Flight SQL client and server functionality is being added. Changes are not expected to break existing users.