diff --git a/pkg/gcsstore/gcsservice.go b/pkg/gcsstore/gcsservice.go index 191ae5f84..e9688d5af 100644 --- a/pkg/gcsstore/gcsservice.go +++ b/pkg/gcsstore/gcsservice.go @@ -7,7 +7,6 @@ import ( "hash/crc32" "io" "math" - "strconv" "strings" "cloud.google.com/go/storage" @@ -43,6 +42,9 @@ type GCSFilterParams struct { // Prefix specifies the prefix of which you want to filter object names with. Prefix string + + // IncludeInfoObject indicates whether to include or ignore the info object. + IncludeInfoObject bool } // GCSReader implements cloud.google.com/go/storage.Reader. @@ -202,8 +204,9 @@ func (service *GCSService) recursiveCompose(ctx context.Context, srcs []string, // Remove all the temporary composition objects prefix := fmt.Sprintf("%s_tmp", params.Destination) filterParams := GCSFilterParams{ - Bucket: params.Bucket, - Prefix: prefix, + Bucket: params.Bucket, + Prefix: prefix, + IncludeInfoObject: false, } err = service.DeleteObjectsWithFilter(ctx, filterParams) @@ -340,57 +343,52 @@ func (service *GCSService) FilterObjects(ctx context.Context, params GCSFilterPa Versions: false, } - it := bkt.Objects(ctx, &q) - names := make([]string, 0) -loop: - for { - objAttrs, err := it.Next() - if err == iterator.Done { - break - } - if err != nil { - return nil, err - } - - if strings.HasSuffix(objAttrs.Name, "info") { - continue - } - - fileNameParts := strings.Split(objAttrs.Name, "/") + // If the object name does not split on "_", we have a composed object. + // If the object name splits on "_" in to four pieces we + // know the object name we are working with is in the format + // [uid]_tmp_[recursion_lvl]_[chunk_idx]. The only time we filter + // these temporary objects is on a delete operation so we can just + // append and continue without worrying about index order + isValidObject := func(name string) (bool, error) { + fileNameParts := strings.Split(name, "/") fileName := fileNameParts[len(fileNameParts)-1] split := strings.Split(fileName, "_") - // If the object name does not split on "_", we have a composed object. - // If the object name splits on "_" in to four pieces we - // know the object name we are working with is in the format - // [uid]_tmp_[recursion_lvl]_[chunk_idx]. The only time we filter - // these temporary objects is on a delete operation so we can just - // append and continue without worrying about index order - switch len(split) { case 1: - names = []string{objAttrs.Name} - break loop case 2: case 4: - names = append(names, objAttrs.Name) - continue default: - err := errors.New("invalid filter format for object name") - return nil, err + return false, errors.New("invalid filter format for object name") } - idx, err := strconv.Atoi(split[1]) + return true, nil + } + + it := bkt.Objects(ctx, &q) + var names []string + + for { + objAttrs, err := it.Next() + if err == iterator.Done { + break + } if err != nil { return nil, err } - if len(names) <= idx { - names = append(names, make([]string, idx-len(names)+1)...) + if !params.IncludeInfoObject && strings.HasSuffix(objAttrs.Name, "info") { + continue } - names[idx] = objAttrs.Name + ok, err := isValidObject(objAttrs.Name) + if err != nil { + return nil, err + } + if ok { + names = append(names, objAttrs.Name) + } } return names, nil diff --git a/pkg/gcsstore/gcsservice_test.go b/pkg/gcsstore/gcsservice_test.go index ecbb7ea3f..8b1fa018c 100644 --- a/pkg/gcsstore/gcsservice_test.go +++ b/pkg/gcsstore/gcsservice_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "net/http" + reflect "reflect" "testing" "gopkg.in/h2non/gock.v1" @@ -99,8 +100,9 @@ func TestDeleteObjectWithFilter(t *testing.T) { } err = service.DeleteObjectsWithFilter(ctx, GCSFilterParams{ - Bucket: "test-bucket", - Prefix: "test-prefix", + Bucket: "test-bucket", + Prefix: "test-prefix", + IncludeInfoObject: true, }) if err != nil { @@ -479,6 +481,7 @@ func TestFilterObject(t *testing.T) { resp := googleBucketResponse{[]googleObjectResponse{ {Name: "test_directory/test-prefix_1"}, + {Name: "test_directory/test-prefix_2"}, }} gock.New("https://storage.googleapis.com"). @@ -510,8 +513,165 @@ func TestFilterObject(t *testing.T) { } objects, err := service.FilterObjects(ctx, GCSFilterParams{ - Bucket: "test-bucket", - Prefix: "test-prefix", + Bucket: "test-bucket", + Prefix: "test-prefix", + IncludeInfoObject: false, + }) + + if err != nil { + t.Errorf("Error: %v", err) + return + } + + if !reflect.DeepEqual(objects, []string{"test_directory/test-prefix_1", "test_directory/test-prefix_2"}) { + t.Errorf("Didn't get appropriate objects back: got %v from %v", len(objects), objects) + } +} + +func TestFilterObject_WithoutChunk(t *testing.T) { + defer gock.Off() + + resp := googleBucketResponse{[]googleObjectResponse{ + {Name: "test_directory/test-prefix"}, + }} + + gock.New("https://storage.googleapis.com"). + Get("/storage/v1/b/test-bucket/o"). + MatchParam("alt", "json"). + MatchParam("pageToken", ""). + MatchParam("prefix", "test-prefix"). + MatchParam("projection", "full"). + Reply(200). + JSON(resp) + + gock.New("https://accounts.google.com/"). + Post("/o/oauth2/token").Reply(200).JSON(map[string]string{ + "access_token": "H3l5321N123sdI4HLY/RF39FjrCRF39FjrCRF39FjrCRF39FjrC_RF39FjrCRF39FjrC", + "token_type": "Bearer", + "refresh_token": "1/smWJksmWJksmWJksmWJksmWJk_smWJksmWJksmWJksmWJksmWJk", + "expiry_date": "1425333671141", + }) + + ctx := context.Background() + client, err := storage.NewClient(ctx, option.WithHTTPClient(http.DefaultClient), option.WithAPIKey("foo")) + if err != nil { + t.Fatal(err) + return + } + + service := GCSService{ + Client: client, + } + + objects, err := service.FilterObjects(ctx, GCSFilterParams{ + Bucket: "test-bucket", + Prefix: "test-prefix", + IncludeInfoObject: false, + }) + + if err != nil { + t.Errorf("Error: %v", err) + return + } + + if !reflect.DeepEqual(objects, []string{"test_directory/test-prefix"}) { + t.Errorf("Didn't get appropriate objects back: got %v from %v", len(objects), objects) + } +} + +func TestFilterObject_IncludeInfoObject(t *testing.T) { + defer gock.Off() + + resp := googleBucketResponse{[]googleObjectResponse{ + {Name: "test_directory/test-prefix_1"}, + {Name: "test_directory/test-prefix.info"}, + {Name: "test_directory/test-prefix_2"}, + }} + + gock.New("https://storage.googleapis.com"). + Get("/storage/v1/b/test-bucket/o"). + MatchParam("alt", "json"). + MatchParam("pageToken", ""). + MatchParam("prefix", "test-prefix"). + MatchParam("projection", "full"). + Reply(200). + JSON(resp) + + gock.New("https://accounts.google.com/"). + Post("/o/oauth2/token").Reply(200).JSON(map[string]string{ + "access_token": "H3l5321N123sdI4HLY/RF39FjrCRF39FjrCRF39FjrCRF39FjrC_RF39FjrCRF39FjrC", + "token_type": "Bearer", + "refresh_token": "1/smWJksmWJksmWJksmWJksmWJk_smWJksmWJksmWJksmWJksmWJk", + "expiry_date": "1425333671141", + }) + + ctx := context.Background() + client, err := storage.NewClient(ctx, option.WithHTTPClient(http.DefaultClient), option.WithAPIKey("foo")) + if err != nil { + t.Fatal(err) + return + } + + service := GCSService{ + Client: client, + } + + objects, err := service.FilterObjects(ctx, GCSFilterParams{ + Bucket: "test-bucket", + Prefix: "test-prefix", + IncludeInfoObject: true, + }) + + if err != nil { + t.Errorf("Error: %v", err) + return + } + + if !reflect.DeepEqual(objects, []string{"test_directory/test-prefix_1", "test_directory/test-prefix.info", "test_directory/test-prefix_2"}) { + t.Errorf("Didn't get appropriate objects back: got %v from %v", len(objects), objects) + } +} + +func TestFilterObject_IncludeInfoObject_NonChunked(t *testing.T) { + defer gock.Off() + + resp := googleBucketResponse{[]googleObjectResponse{ + {Name: "test_directory/test-prefix"}, + {Name: "test_directory/test-prefix.info"}, + }} + + gock.New("https://storage.googleapis.com"). + Get("/storage/v1/b/test-bucket/o"). + MatchParam("alt", "json"). + MatchParam("pageToken", ""). + MatchParam("prefix", "test-prefix"). + MatchParam("projection", "full"). + Reply(200). + JSON(resp) + + gock.New("https://accounts.google.com/"). + Post("/o/oauth2/token").Reply(200).JSON(map[string]string{ + "access_token": "H3l5321N123sdI4HLY/RF39FjrCRF39FjrCRF39FjrCRF39FjrC_RF39FjrCRF39FjrC", + "token_type": "Bearer", + "refresh_token": "1/smWJksmWJksmWJksmWJksmWJk_smWJksmWJksmWJksmWJksmWJk", + "expiry_date": "1425333671141", + }) + + ctx := context.Background() + client, err := storage.NewClient(ctx, option.WithHTTPClient(http.DefaultClient), option.WithAPIKey("foo")) + if err != nil { + t.Fatal(err) + return + } + + service := GCSService{ + Client: client, + } + + objects, err := service.FilterObjects(ctx, GCSFilterParams{ + Bucket: "test-bucket", + Prefix: "test-prefix", + IncludeInfoObject: true, }) if err != nil { @@ -519,7 +679,7 @@ func TestFilterObject(t *testing.T) { return } - if len(objects) != 2 { - t.Errorf("Didn't get appropriate amount of objects back: got %v from %v", len(objects), objects) + if !reflect.DeepEqual(objects, []string{"test_directory/test-prefix", "test_directory/test-prefix.info"}) { + t.Errorf("Didn't get appropriate objects back: got %v from %v", len(objects), objects) } } diff --git a/pkg/gcsstore/gcsstore.go b/pkg/gcsstore/gcsstore.go index 835fd63cb..1a9f3f0b9 100644 --- a/pkg/gcsstore/gcsstore.go +++ b/pkg/gcsstore/gcsstore.go @@ -94,8 +94,9 @@ func (upload gcsUpload) WriteChunk(ctx context.Context, offset int64, src io.Rea prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id)) filterParams := GCSFilterParams{ - Bucket: store.Bucket, - Prefix: prefix, + Bucket: store.Bucket, + Prefix: prefix, + IncludeInfoObject: false, } names, err := store.Service.FilterObjects(ctx, filterParams) @@ -165,8 +166,9 @@ func (upload gcsUpload) GetInfo(ctx context.Context) (handler.FileInfo, error) { prefix := store.keyWithPrefix(id) filterParams := GCSFilterParams{ - Bucket: store.Bucket, - Prefix: prefix, + Bucket: store.Bucket, + Prefix: prefix, + IncludeInfoObject: false, } names, err := store.Service.FilterObjects(ctx, filterParams) @@ -261,8 +263,9 @@ func (upload gcsUpload) FinishUpload(ctx context.Context) error { prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id)) filterParams := GCSFilterParams{ - Bucket: store.Bucket, - Prefix: prefix, + Bucket: store.Bucket, + Prefix: prefix, + IncludeInfoObject: false, } names, err := store.Service.FilterObjects(ctx, filterParams) @@ -313,8 +316,9 @@ func (upload gcsUpload) Terminate(ctx context.Context) error { store := upload.store filterParams := GCSFilterParams{ - Bucket: store.Bucket, - Prefix: store.keyWithPrefix(id), + Bucket: store.Bucket, + Prefix: store.keyWithPrefix(id), + IncludeInfoObject: true, } err := store.Service.DeleteObjectsWithFilter(ctx, filterParams) diff --git a/pkg/gcsstore/gcsstore_test.go b/pkg/gcsstore/gcsstore_test.go index 20ea1fe24..72f4fd9fc 100644 --- a/pkg/gcsstore/gcsstore_test.go +++ b/pkg/gcsstore/gcsstore_test.go @@ -145,8 +145,9 @@ func TestGetInfo(t *testing.T) { r := MockGetInfoReader{} filterParams := gcsstore.GCSFilterParams{ - Bucket: store.Bucket, - Prefix: mockID, + Bucket: store.Bucket, + Prefix: mockID, + IncludeInfoObject: false, } mockObjectParams0 := gcsstore.GCSObjectParams{ @@ -288,8 +289,9 @@ func TestTerminate(t *testing.T) { assert.Equal(store.Bucket, mockBucket) filterParams := gcsstore.GCSFilterParams{ - Bucket: store.Bucket, - Prefix: mockID, + Bucket: store.Bucket, + Prefix: mockID, + IncludeInfoObject: true, } ctx := context.Background() @@ -313,13 +315,15 @@ func TestFinishUpload(t *testing.T) { assert.Equal(store.Bucket, mockBucket) filterParams := gcsstore.GCSFilterParams{ - Bucket: store.Bucket, - Prefix: fmt.Sprintf("%s_", mockID), + Bucket: store.Bucket, + Prefix: fmt.Sprintf("%s_", mockID), + IncludeInfoObject: false, } filterParams2 := gcsstore.GCSFilterParams{ - Bucket: store.Bucket, - Prefix: mockID, + Bucket: store.Bucket, + Prefix: mockID, + IncludeInfoObject: false, } composeParams := gcsstore.GCSComposeParams{ @@ -406,8 +410,9 @@ func TestWriteChunk(t *testing.T) { // filter objects filterParams := gcsstore.GCSFilterParams{ - Bucket: store.Bucket, - Prefix: fmt.Sprintf("%s_", mockID), + Bucket: store.Bucket, + Prefix: fmt.Sprintf("%s_", mockID), + IncludeInfoObject: false, } var partials = []string{mockPartial0}