Skip to content

Commit

Permalink
squash
Browse files Browse the repository at this point in the history
  • Loading branch information
avantgardnerio committed Dec 2, 2024
1 parent b3f9355 commit c5ea1d0
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 60 deletions.
1 change: 1 addition & 0 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ jobs:
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key")
Expand Down
47 changes: 41 additions & 6 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::client::s3::{
InitiateMultipartUploadResult, ListResponse,
};
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::multipart::MultipartInfo;
use crate::path::DELIMITER;
use crate::{
Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path,
Expand Down Expand Up @@ -186,7 +186,7 @@ impl From<DeleteError> for Error {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct S3Config {
pub region: String,
pub endpoint: Option<String>,
Expand Down Expand Up @@ -462,6 +462,27 @@ impl S3Client {
}
}

#[allow(dead_code)]
pub(crate) fn request_with_config<'a>(
&'a self,
method: Method,
path: &'a Path,
config: &'a S3Config,
) -> Request<'a> {
let url = self.config.path_url(path);
Request {
path,
builder: self.client.request(method, url),
payload: None,
payload_sha256: None,
config,
use_session_creds: true,
idempotent: false,
retry_on_conflict: false,
retry_error_body: false,
}
}

/// Make an S3 Delete Objects request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
///
/// Produces a vector of results, one for each path in the input vector. If
Expand Down Expand Up @@ -619,6 +640,7 @@ impl S3Client {
) -> Result<MultipartId> {
let response = self
.request(Method::POST, location)
.header("x-amz-checksum-algorithm", "SHA256")
.query(&[("uploads", "")])
.with_encryption_headers()
.with_attributes(opts.attributes)
Expand All @@ -642,12 +664,16 @@ impl S3Client {
upload_id: &MultipartId,
part_idx: usize,
data: PutPartPayload<'_>,
) -> Result<PartId> {
) -> Result<MultipartInfo> {
let is_copy = matches!(data, PutPartPayload::Copy(_));
let part = (part_idx + 1).to_string();
let config = S3Config {
checksum: Some(Checksum::SHA256),
..self.config.clone()
};

let mut request = self
.request(Method::PUT, path)
.request_with_config(Method::PUT, path, &config)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true);

Expand All @@ -669,6 +695,10 @@ impl S3Client {
request = request.with_encryption_headers();
}
let response = request.send().await?;
let checksum_sha256 = response
.headers()
.get("x-amz-checksum-sha256")
.map(|v| v.to_str().unwrap().to_string());

let content_id = match is_copy {
false => get_etag(response.headers()).context(MetadataSnafu)?,
Expand All @@ -682,7 +712,12 @@ impl S3Client {
response.e_tag
}
};
Ok(PartId { content_id })
let part = MultipartInfo {
e_tag: content_id,
part_number: part_idx + 1,
checksum_sha256,
};
Ok(part)
}

pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> {
Expand All @@ -699,7 +734,7 @@ impl S3Client {
&self,
location: &Path,
upload_id: &str,
parts: Vec<PartId>,
parts: Vec<MultipartInfo>,
mode: CompleteMultipartMode,
) -> Result<PutResult> {
let parts = if parts.is_empty() {
Expand Down
72 changes: 66 additions & 6 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultipartStore, PartId};
use crate::multipart::{MultipartInfo, MultipartStore};
use crate::signer::Signer;
use crate::util::STRICT_ENCODE_SET;
use crate::{
Expand Down Expand Up @@ -319,7 +319,7 @@ impl ObjectStore for AmazonS3 {
.await?;

let res = async {
let part_id = self
let part = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
Expand All @@ -328,7 +328,7 @@ impl ObjectStore for AmazonS3 {
.complete_multipart(
to,
&upload_id,
vec![part_id],
vec![part],
CompleteMultipartMode::Create,
)
.await
Expand Down Expand Up @@ -407,7 +407,7 @@ impl MultipartUpload for S3MultiPartUpload {
PutPartPayload::Part(data),
)
.await?;
state.parts.put(idx, part);
state.parts.put(part);
Ok(())
})
}
Expand Down Expand Up @@ -453,7 +453,7 @@ impl MultipartStore for AmazonS3 {
id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
) -> Result<MultipartInfo> {
self.client
.put_part(path, id, part_idx, PutPartPayload::Part(data))
.await
Expand All @@ -463,7 +463,7 @@ impl MultipartStore for AmazonS3 {
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
parts: Vec<MultipartInfo>,
) -> Result<PutResult> {
self.client
.complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite)
Expand Down Expand Up @@ -493,6 +493,66 @@ mod tests {

const NON_EXISTENT_NAME: &str = "nonexistentname";

#[tokio::test]
async fn write_multipart_file_with_signature() {
maybe_skip_integration!();

let store = AmazonS3Builder::from_env()
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOpts::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();

let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");

store.delete(&path).await.unwrap();
}

#[tokio::test]
async fn write_multipart_file_with_signature_object_lock() {
maybe_skip_integration!();

let bucket = "test-object-lock";
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOpts::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();

let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");

store.delete(&path).await.unwrap();
}

#[tokio::test]
async fn s3_test() {
maybe_skip_integration!();
Expand Down
11 changes: 8 additions & 3 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::client::header::{get_put_result, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::multipart::{MultipartInfo, PartId};
use crate::path::DELIMITER;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
Expand Down Expand Up @@ -558,7 +558,7 @@ impl AzureClient {
path: &Path,
part_idx: usize,
payload: PutPayload,
) -> Result<PartId> {
) -> Result<MultipartInfo> {
let content_id = format!("{part_idx:20}");
let block_id = BASE64_STANDARD.encode(&content_id);

Expand All @@ -568,7 +568,12 @@ impl AzureClient {
.send()
.await?;

Ok(PartId { content_id })
let part = MultipartInfo {
e_tag: content_id,
part_number: part_idx + 1,
checksum_sha256: None,
};
Ok(part)
}

/// PUT a block list <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list>
Expand Down
17 changes: 9 additions & 8 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
//!
//! Unused blocks will automatically be dropped after 7 days.
use crate::{
multipart::{MultipartStore, PartId},
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
multipart::MultipartStore, path::Path, signer::Signer, GetOptions, GetResult, ListResult,
MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions,
PutPayload, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
Expand All @@ -50,6 +48,7 @@ mod credential;
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::azure::client::AzureClient;
use crate::client::parts::Parts;
use crate::multipart::MultipartInfo;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

Expand Down Expand Up @@ -239,13 +238,14 @@ impl MultipartUpload for AzureMultiPartUpload {
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state.client.put_block(&state.location, idx, data).await?;
state.parts.put(idx, part);
state.parts.put(part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;
let parts = parts.into_iter().map(|part| part.into()).collect();

self.state
.client
Expand All @@ -271,16 +271,17 @@ impl MultipartStore for MicrosoftAzure {
_: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
) -> Result<MultipartInfo> {
self.client.put_block(path, part_idx, data).await
}

async fn complete_multipart(
&self,
path: &Path,
_: &MultipartId,
parts: Vec<PartId>,
parts: Vec<MultipartInfo>,
) -> Result<PutResult> {
let parts = parts.into_iter().map(|p| p.into()).collect();
self.client
.put_block_list(path, parts, Default::default())
.await
Expand Down
20 changes: 10 additions & 10 deletions object_store/src/client/parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,34 @@
// specific language governing permissions and limitations
// under the License.

use crate::multipart::PartId;
use crate::multipart::MultipartInfo;
use parking_lot::Mutex;

/// An interior mutable collection of upload parts and their corresponding part index
#[derive(Debug, Default)]
pub(crate) struct Parts(Mutex<Vec<(usize, PartId)>>);
pub(crate) struct Parts(Mutex<Vec<MultipartInfo>>);

impl Parts {
/// Record the [`PartId`] for a given index
/// Record the [`MultipartInfo`] for a given index
///
/// Note: calling this method multiple times with the same `part_idx`
/// will result in multiple [`PartId`] in the final output
pub(crate) fn put(&self, part_idx: usize, id: PartId) {
self.0.lock().push((part_idx, id))
/// will result in multiple [`MultipartInfo`] in the final output
pub(crate) fn put(&self, part: MultipartInfo) {
self.0.lock().push(part)
}

/// Produce the final list of [`PartId`] ordered by `part_idx`
/// Produce the final list of [`MultipartInfo`] ordered by `part_idx`
///
/// `expected` is the number of parts expected in the final result
pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<PartId>> {
pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<MultipartInfo>> {
let mut parts = self.0.lock();
if parts.len() != expected {
return Err(crate::Error::Generic {
store: "Parts",
source: "Missing part".to_string().into(),
});
}
parts.sort_unstable_by_key(|(idx, _)| *idx);
Ok(parts.drain(..).map(|(_, v)| v).collect())
parts.sort_unstable_by_key(|part| part.part_number);
Ok(parts.drain(..).collect())
}
}
Loading

0 comments on commit c5ea1d0

Please sign in to comment.