Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support native S3 conditional writes #6682

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ jobs:

- name: Setup LocalStack (AWS emulation)
run: |
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.3.0)" >> $GITHUB_ENV
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.8.1)" >> $GITHUB_ENV
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
Expand All @@ -161,6 +161,12 @@ jobs:
- name: Run object_store tests
run: cargo test --features=aws,azure,gcp,http

- name: Run object_store tests (AWS native conditional put)
run: cargo test --features=aws
env:
AWS_CONDITIONAL_PUT: etag-put-if-not-exists
AWS_COPY_IF_NOT_EXISTS: multipart

- name: GCS Output
if: ${{ !cancelled() }}
run: docker logs $GCS_CONTAINER
Expand Down
95 changes: 81 additions & 14 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::client::header::{get_put_result, get_version};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
ListResponse,
CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
InitiateMultipartUploadResult, ListResponse,
};
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
Expand Down Expand Up @@ -98,8 +98,11 @@ pub(crate) enum Error {
#[snafu(display("Error getting create multipart response body: {}", source))]
CreateMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Error performing complete multipart request: {}", source))]
CompleteMultipartRequest { source: crate::client::retry::Error },
#[snafu(display("Error performing complete multipart request: {}: {}", path, source))]
CompleteMultipartRequest {
source: crate::client::retry::Error,
path: String,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the path also be in the displayed error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, done.

},

#[snafu(display("Error getting complete multipart response body: {}", source))]
CompleteMultipartResponseBody { source: reqwest::Error },
Expand All @@ -118,13 +121,32 @@ pub(crate) enum Error {

impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
Self::Generic {
store: STORE,
source: Box::new(err),
match err {
Error::CompleteMultipartRequest { source, path } => source.error(STORE, path),
_ => Self::Generic {
store: STORE,
source: Box::new(err),
},
}
}
}

pub(crate) enum PutPartPayload<'a> {
Part(PutPayload),
Copy(&'a Path),
}

impl Default for PutPartPayload<'_> {
fn default() -> Self {
Self::Part(PutPayload::default())
}
}

pub(crate) enum CompleteMultipartMode {
Overwrite,
Create,
}

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
struct BatchDeleteResponse {
Expand Down Expand Up @@ -605,15 +627,24 @@ impl S3Client {
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
data: PutPayload,
data: PutPartPayload<'_>,
) -> Result<PartId> {
let is_copy = matches!(data, PutPartPayload::Copy(_));
let part = (part_idx + 1).to_string();

let mut request = self
.request(Method::PUT, path)
.with_payload(data)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true);

request = match data {
PutPartPayload::Part(payload) => request.with_payload(payload),
PutPartPayload::Copy(path) => request.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
),
};

if self
.config
.encryption_headers
Expand All @@ -625,21 +656,48 @@ impl S3Client {
}
let response = request.send().await?;

let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
let content_id = match is_copy {
false => get_etag(response.headers()).context(MetadataSnafu)?,
true => {
let response = response
tustvold marked this conversation as resolved.
Show resolved Hide resolved
.bytes()
.await
.context(CreateMultipartResponseBodySnafu)?;
let response: CopyPartResult = quick_xml::de::from_reader(response.reader())
.context(InvalidMultipartResponseSnafu)?;
response.e_tag
}
};
Ok(PartId { content_id })
}

pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> {
self.request(Method::DELETE, location)
.query(&[("uploadId", upload_id)])
.with_encryption_headers()
.send()
.await?;

Ok(())
}

pub(crate) async fn complete_multipart(
&self,
location: &Path,
upload_id: &str,
parts: Vec<PartId>,
mode: CompleteMultipartMode,
) -> Result<PutResult> {
let parts = if parts.is_empty() {
// If no parts were uploaded, upload an empty part
// otherwise the completion request will fail
let part = self
.put_part(location, &upload_id.to_string(), 0, PutPayload::default())
.put_part(
location,
&upload_id.to_string(),
0,
PutPartPayload::default(),
)
.await?;
vec![part]
} else {
Expand All @@ -651,18 +709,27 @@ impl S3Client {
let credential = self.config.get_session_credential().await?;
let url = self.config.path_url(location);

let response = self
let request = self
.client
.request(Method::POST, url)
.query(&[("uploadId", upload_id)])
.body(body)
.with_aws_sigv4(credential.authorizer(), None)
.with_aws_sigv4(credential.authorizer(), None);

let request = match mode {
CompleteMultipartMode::Overwrite => request,
CompleteMultipartMode::Create => request.header("If-None-Match", "*"),
};

let response = request
.retryable(&self.config.retry_config)
.idempotent(true)
.retry_error_body(true)
.send()
.await
.context(CompleteMultipartRequestSnafu)?;
.context(CompleteMultipartRequestSnafu {
path: location.as_ref(),
})?;

let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;

Expand Down
77 changes: 68 additions & 9 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
use url::Url;

use crate::aws::client::{RequestError, S3Client};
use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3Client};
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
Expand Down Expand Up @@ -169,7 +169,10 @@ impl ObjectStore for AmazonS3 {
match (opts.mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
(PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
(
PutMode::Create,
Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagPutIfNotExists),
) => {
match request.header(&IF_NONE_MATCH, "*").do_put().await {
// Technically If-None-Match should return NotModified but some stores,
// such as R2, instead return PreconditionFailed
Expand All @@ -193,6 +196,7 @@ impl ObjectStore for AmazonS3 {
source: "ETag required for conditional put".to_string().into(),
})?;
match put {
S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented),
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
}
Expand Down Expand Up @@ -293,6 +297,47 @@ impl ObjectStore for AmazonS3 {
let (k, v, status) = match &self.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED),
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
Some(S3CopyIfNotExists::Multipart) => {
let upload_id = self
.client
.create_multipart(to, PutMultipartOpts::default())
.await?;

let res = async {
let part_id = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this error should also trigger cleanup, might be worth encapsulating the steps into a function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, doesn't it? If anything in this async block (L306-329) returns an error, the cleanup on L336 should fire.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I missed that this is wrapped in an async block, cunning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that async blocks can function like try blocks is one of my favorite Rust tricks!

match self
.client
.complete_multipart(
to,
&upload_id,
vec![part_id],
CompleteMultipartMode::Create,
)
.await
{
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
}),
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
.await;

// If the multipart upload failed, make a best effort attempt to
// clean it up. It's the caller's responsibility to add a
// lifecycle rule if guaranteed cleanup is required, as we
// cannot protect against an ill-timed process crash.
if res.is_err() {
let _ = self.client.abort_multipart(to, &upload_id).await;
}

return res;
}
Some(S3CopyIfNotExists::Dynamo(lock)) => {
return lock.copy_if_not_exists(&self.client, from, to).await
}
Expand Down Expand Up @@ -340,7 +385,12 @@ impl MultipartUpload for S3MultiPartUpload {
Box::pin(async move {
let part = state
.client
.put_part(&state.location, &state.upload_id, idx, data)
.put_part(
&state.location,
&state.upload_id,
idx,
PutPartPayload::Part(data),
)
.await?;
state.parts.put(idx, part);
Ok(())
Expand All @@ -352,7 +402,12 @@ impl MultipartUpload for S3MultiPartUpload {

self.state
.client
.complete_multipart(&self.state.location, &self.state.upload_id, parts)
.complete_multipart(
&self.state.location,
&self.state.upload_id,
parts,
CompleteMultipartMode::Overwrite,
)
.await
}

Expand Down Expand Up @@ -384,7 +439,9 @@ impl MultipartStore for AmazonS3 {
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
self.client.put_part(path, id, part_idx, data).await
self.client
.put_part(path, id, part_idx, PutPartPayload::Part(data))
.await
}

async fn complete_multipart(
Expand All @@ -393,7 +450,9 @@ impl MultipartStore for AmazonS3 {
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.complete_multipart(path, id, parts).await
self.client
.complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite)
.await
}

async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
Expand Down Expand Up @@ -427,7 +486,6 @@ mod tests {
let integration = config.build().unwrap();
let config = &integration.client.config;
let test_not_exists = config.copy_if_not_exists.is_some();
let test_conditional_put = config.conditional_put.is_some();

put_get_delete_list(&integration).await;
get_opts(&integration).await;
Expand Down Expand Up @@ -458,8 +516,9 @@ mod tests {
if test_not_exists {
copy_if_not_exists(&integration).await;
}
if test_conditional_put {
put_opts(&integration, true).await;
if let Some(conditional_put) = &config.conditional_put {
let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagPutIfNotExists);
put_opts(&integration, supports_update).await;
}

// run integration test with unsigned payload enabled
Expand Down
Loading
Loading