Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object-store: remove S3ConditionalPut::ETagPutIfNotExists #6802

Merged
merged 4 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ jobs:

- name: Setup LocalStack (AWS emulation)
run: |
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.8.1)" >> $GITHUB_ENV
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:latest)" >> $GITHUB_ENV
tustvold marked this conversation as resolved.
Show resolved Hide resolved
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
Expand All @@ -164,7 +164,7 @@ jobs:
- name: Run object_store tests (AWS native conditional put)
run: cargo test --features=aws
env:
AWS_CONDITIONAL_PUT: etag-put-if-not-exists
AWS_CONDITIONAL_PUT: etag
AWS_COPY_IF_NOT_EXISTS: multipart

- name: GCS Output
Expand Down
10 changes: 10 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ pub(crate) struct Request<'a> {
payload: Option<PutPayload>,
use_session_creds: bool,
idempotent: bool,
retry_on_conflict: bool,
retry_error_body: bool,
}

Expand Down Expand Up @@ -317,6 +318,13 @@ impl<'a> Request<'a> {
Self { idempotent, ..self }
}

pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self {
Self {
retry_on_conflict,
..self
}
}

pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self {
Self {
retry_error_body,
Expand Down Expand Up @@ -412,6 +420,7 @@ impl<'a> Request<'a> {
self.builder
.with_aws_sigv4(credential.authorizer(), sha)
.retryable(&self.config.retry_config)
.retry_on_conflict(self.retry_on_conflict)
.idempotent(self.idempotent)
.retry_error_body(self.retry_error_body)
.payload(self.payload)
Expand Down Expand Up @@ -448,6 +457,7 @@ impl S3Client {
config: &self.config,
use_session_creds: true,
idempotent: false,
retry_on_conflict: false,
retry_error_body: false,
}
}
Expand Down
32 changes: 23 additions & 9 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,7 @@ impl ObjectStore for AmazonS3 {
match (opts.mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
(
PutMode::Create,
Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagPutIfNotExists),
) => {
(PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
match request.header(&IF_NONE_MATCH, "*").do_put().await {
// Technically If-None-Match should return NotModified but some stores,
// such as R2, instead return PreconditionFailed
Expand All @@ -197,9 +194,26 @@ impl ObjectStore for AmazonS3 {
source: "ETag required for conditional put".to_string().into(),
})?;
match put {
S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented),
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
match request
.header(&IF_MATCH, etag.as_str())
// Real S3 will occasionally report 409 Conflict
// if there are concurrent `If-Match` requests
// in flight, so we need to be prepared to retry
// 409 responses.
.retry_on_conflict(true)
.do_put()
.await
{
// Real S3 reports NotFound rather than PreconditionFailed when the
// object doesn't exist. Convert to PreconditionFailed for
// consistency with R2. This also matches what the HTTP spec
// says the behavior should be.
Err(Error::NotFound { path, source }) => {
Err(Error::Precondition { path, source })
}
r => r,
}
}
S3ConditionalPut::Dynamo(d) => {
d.conditional_op(&self.client, location, Some(&etag), move || {
Expand Down Expand Up @@ -487,6 +501,7 @@ mod tests {
let integration = config.build().unwrap();
let config = &integration.client.config;
let test_not_exists = config.copy_if_not_exists.is_some();
let test_conditional_put = config.conditional_put.is_some();

put_get_delete_list(&integration).await;
get_opts(&integration).await;
Expand Down Expand Up @@ -517,9 +532,8 @@ mod tests {
if test_not_exists {
copy_if_not_exists(&integration).await;
}
if let Some(conditional_put) = &config.conditional_put {
let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagPutIfNotExists);
put_opts(&integration, supports_update).await;
if test_conditional_put {
put_opts(&integration, true).await;
}

// run integration test with unsigned payload enabled
Expand Down
13 changes: 0 additions & 13 deletions object_store/src/aws/precondition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,6 @@ pub enum S3ConditionalPut {
/// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
ETagMatch,

/// Like `ETagMatch`, but with support for `PutMode::Create` and not
/// `PutMode::Option`.
///
/// This is the limited form of conditional put supported by Amazon S3
/// as of August 2024 ([announcement]).
///
/// Encoded as `etag-put-if-not-exists` ignoring whitespace.
///
/// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/
ETagPutIfNotExists,

/// The name of a DynamoDB table to use for coordination
///
/// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
Expand All @@ -164,7 +153,6 @@ impl std::fmt::Display for S3ConditionalPut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ETagMatch => write!(f, "etag"),
Self::ETagPutIfNotExists => write!(f, "etag-put-if-not-exists"),
Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
}
}
Expand All @@ -174,7 +162,6 @@ impl S3ConditionalPut {
fn from_str(s: &str) -> Option<Self> {
match s.trim() {
"etag" => Some(Self::ETagMatch),
"etag-put-if-not-exists" => Some(Self::ETagPutIfNotExists),
trimmed => match trimmed.split_once(':')? {
("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
_ => None,
Expand Down
14 changes: 13 additions & 1 deletion object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ pub(crate) struct RetryableRequest {

sensitive: bool,
idempotent: Option<bool>,
retry_on_conflict: bool,
payload: Option<PutPayload>,

retry_error_body: bool,
Expand All @@ -217,6 +218,15 @@ impl RetryableRequest {
}
}

/// Set whether this request should be retried on a 409 Conflict response.
#[cfg(feature = "aws")]
pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self {
Self {
retry_on_conflict,
..self
}
}

/// Set whether this request contains sensitive data
///
/// This will avoid printing out the URL in error messages
Expand Down Expand Up @@ -340,7 +350,8 @@ impl RetryableRequest {
let status = r.status();
if retries == max_retries
|| now.elapsed() > retry_timeout
|| !status.is_server_error()
|| !(status.is_server_error()
|| (self.retry_on_conflict && status == StatusCode::CONFLICT))
{
return Err(match status.is_client_error() {
true => match r.text().await {
Expand Down Expand Up @@ -467,6 +478,7 @@ impl RetryExt for reqwest::RequestBuilder {
idempotent: None,
payload: None,
sensitive: false,
retry_on_conflict: false,
retry_error_body: false,
}
}
Expand Down
Loading