From d9cdbfd490b0985bbed50c202bbf7f50f9d60144 Mon Sep 17 00:00:00 2001 From: Alexey Bedonik Date: Mon, 24 Apr 2023 16:58:03 +0200 Subject: [PATCH] fix: Errors handling and objects fetching from s3. --- .../kayenta/s3/storage/S3StorageService.java | 10 ++++- .../sql/migration/StorageDataMigrator.java | 37 +++++++++++++------ 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/kayenta-s3/src/main/java/com/netflix/kayenta/s3/storage/S3StorageService.java b/kayenta-s3/src/main/java/com/netflix/kayenta/s3/storage/S3StorageService.java index eb36d8000..5b74c0134 100644 --- a/kayenta-s3/src/main/java/com/netflix/kayenta/s3/storage/S3StorageService.java +++ b/kayenta-s3/src/main/java/com/netflix/kayenta/s3/storage/S3StorageService.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.time.Instant; import java.util.*; +import java.util.stream.Collectors; import javax.validation.constraints.NotNull; import lombok.Builder; import lombok.Getter; @@ -136,7 +137,10 @@ private String resolveSingularPath( String rootFolder = daoRoot(credentials, objectType.getGroup()) + "/" + objectKey; ObjectListing bucketListing = amazonS3.listObjects(new ListObjectsRequest(bucket, rootFolder, null, null, 10000)); - List summaries = bucketListing.getObjectSummaries(); + List summaries = + bucketListing.getObjectSummaries().stream() + .filter(summary -> !summary.getKey().endsWith("/")) + .collect(Collectors.toList()); if (summaries != null && summaries.size() == 1) { return summaries.get(0).getKey(); @@ -387,6 +391,10 @@ public List> listObjectKeys( if (summaries != null) { for (S3ObjectSummary summary : summaries) { + if (summary.getKey().endsWith("/")) { + continue; + } + String itemName = summary.getKey(); int indexOfLastSlash = itemName.lastIndexOf("/"); Map objectMetadataMap = new HashMap<>(); diff --git a/kayenta-sql/src/main/java/com/netflix/kayenta/sql/migration/StorageDataMigrator.java b/kayenta-sql/src/main/java/com/netflix/kayenta/sql/migration/StorageDataMigrator.java index 092b7d774..2fa6b1aea 100644 --- a/kayenta-sql/src/main/java/com/netflix/kayenta/sql/migration/StorageDataMigrator.java +++ b/kayenta-sql/src/main/java/com/netflix/kayenta/sql/migration/StorageDataMigrator.java @@ -20,6 +20,7 @@ import com.netflix.kayenta.storage.ObjectType; import com.netflix.kayenta.storage.StorageService; import java.util.Objects; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @@ -81,17 +82,31 @@ private void migrate(ObjectType objectType) { var errors = new ConcurrentLinkedQueue(); - for (var objectKey : objectKeysToMigrate) { - executorService.submit( - () -> { - try { - var object = - sourceStorageService.loadObject(sourceAccountName, objectType, objectKey); - targetStorageService.storeObject(targetAccountName, objectType, objectKey, object); - } catch (Exception e) { - errors.add(String.format("[objectType: %s, objectKey: %s]", objectType, objectKey)); - } - }); + var tasks = + objectKeysToMigrate.stream() + .map( + objectKey -> + (Callable) + () -> { + try { + var object = + sourceStorageService.loadObject( + sourceAccountName, objectType, objectKey); + targetStorageService.storeObject( + targetAccountName, objectType, objectKey, object); + } catch (Exception e) { + errors.add( + String.format( + "[objectType: %s, objectKey: %s]", objectType, objectKey)); + } + return null; + }) + .collect(Collectors.toList()); + + try { + executorService.invokeAll(tasks); + } catch (InterruptedException e) { + log.error("Unable to migrate objects", e); } if (!errors.isEmpty()) {