diff --git a/pkg/cache/repository.go b/pkg/cache/repository.go index 5dad725d..80649934 100644 --- a/pkg/cache/repository.go +++ b/pkg/cache/repository.go @@ -338,12 +338,13 @@ func (r *cachedRepository) Close() error { // There isn't much use in returning an error here, so we just log it // and create a PackageRevisionMeta with just name and namespace. This // makes sure that the Delete event is sent. - klog.Warningf("Error looking up PackageRev CR for %s: %v", nn.Name, err) + klog.Warningf("repo %s: error deleting packagerev for %s: %v", r.id, nn.Name, err) pkgRevMeta = meta.PackageRevisionMeta{ Name: nn.Name, Namespace: nn.Namespace, } } + klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name) sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta) } klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions)) diff --git a/pkg/meta/store.go b/pkg/meta/store.go index 6ff13ae1..bbb6e402 100644 --- a/pkg/meta/store.go +++ b/pkg/meta/store.go @@ -25,6 +25,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -194,16 +195,22 @@ func (c *crdMetadataStore) Delete(ctx context.Context, namespacedName types.Name defer span.End() var internalPkgRev internalapi.PackageRev - err := c.coreClient.Get(ctx, namespacedName, &internalPkgRev) - if err != nil { - return PackageRevisionMeta{}, err - } + retriedErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := c.coreClient.Get(ctx, namespacedName, &internalPkgRev) + if err != nil { + return err + } - if clearFinalizers { - internalPkgRev.Finalizers = []string{} - if err = c.coreClient.Update(ctx, &internalPkgRev); err != nil { - return PackageRevisionMeta{}, err + if clearFinalizers { + internalPkgRev.Finalizers = []string{} + if err = c.coreClient.Update(ctx, &internalPkgRev); err != nil { + return err + } } + return nil + }) + if retriedErr != nil { + return PackageRevisionMeta{}, retriedErr } klog.Infof("Deleting packagerev %s/%s", internalPkgRev.Namespace, internalPkgRev.Name) diff --git a/pkg/registry/porch/packagecommon.go b/pkg/registry/porch/packagecommon.go index 32b26b95..9a6c6254 100644 --- a/pkg/registry/porch/packagecommon.go +++ b/pkg/registry/porch/packagecommon.go @@ -17,6 +17,7 @@ package porch import ( "context" "fmt" + "sync" unversionedapi "github.com/nephio-project/porch/api/porch" api "github.com/nephio-project/porch/api/porch/v1alpha1" @@ -34,6 +35,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +const ConflictErrorMsgBase = "another request is already in progress %s" + +var GenericConflictErrorMsg = fmt.Sprintf(ConflictErrorMsgBase, "on %s \"%s\"") + +var mutexMapMutex sync.Mutex +var pkgRevOperationMutexes = map[string]*sync.Mutex{} + type packageCommon struct { // scheme holds our scheme, for type conversions etc scheme *runtime.Scheme @@ -210,9 +218,21 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string, return nil, false, apierrors.NewBadRequest("namespace must be specified") } + pkgMutexKey := getPackageMutexKey(ns, name) + pkgMutex := getMutexForPackage(pkgMutexKey) + + locked := pkgMutex.TryLock() + if !locked { + return nil, false, + apierrors.NewConflict( + api.Resource("packagerevisions"), + name, + fmt.Errorf(GenericConflictErrorMsg, "package revision", pkgMutexKey)) + } + defer pkgMutex.Unlock() + // isCreate tracks whether this is an update that creates an object (this happens in server-side apply) isCreate := false - oldRepoPkgRev, err := r.getRepoPkgRev(ctx, name) if err != nil { if forceAllowCreate && apierrors.IsNotFound(err) { @@ -469,3 +489,18 @@ func (r *packageCommon) validateUpdate(ctx context.Context, newRuntimeObj runtim r.updateStrategy.Canonicalize(newRuntimeObj) return nil } + +func getPackageMutexKey(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + +func getMutexForPackage(pkgMutexKey string) *sync.Mutex { + mutexMapMutex.Lock() + defer mutexMapMutex.Unlock() + pkgMutex, alreadyPresent := pkgRevOperationMutexes[pkgMutexKey] + if !alreadyPresent { + pkgMutex = &sync.Mutex{} + pkgRevOperationMutexes[pkgMutexKey] = pkgMutex + } + return pkgMutex +} diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index 51dd98cd..dc4e7cbd 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -49,13 +49,11 @@ var _ rest.GracefulDeleter = &packageRevisions{} var _ rest.Watcher = &packageRevisions{} var _ rest.SingularNameProvider = &packageRevisions{} - // GetSingularName implements the SingularNameProvider interface -func (r *packageRevisions) GetSingularName() (string) { +func (r *packageRevisions) GetSingularName() string { return "packagerevision" } - func (r *packageRevisions) New() runtime.Object { return &api.PackageRevision{} } @@ -120,7 +118,7 @@ func (r *packageRevisions) Get(ctx context.Context, name string, options *metav1 } // Create implements the Creater interface. -func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc, +func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { ctx, span := tracer.Start(ctx, "packageRevisions::Create", trace.WithAttributes()) defer span.End() @@ -166,6 +164,20 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj parentPackage = p } + pkgMutexKey := uncreatedPackageMutexKey(newApiPkgRev) + pkgMutex := getMutexForPackage(pkgMutexKey) + + locked := pkgMutex.TryLock() + if !locked { + conflictError := creationConflictError(newApiPkgRev) + return nil, + apierrors.NewConflict( + api.Resource("packagerevisions"), + "(new creation)", + conflictError) + } + defer pkgMutex.Unlock() + createdRepoPkgRev, err := r.cad.CreatePackageRevision(ctx, repositoryObj, newApiPkgRev, parentPackage) if err != nil { return nil, apierrors.NewInternalError(err) @@ -184,13 +196,12 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj // Update finds a resource in the storage and updates it. Some implementations // may allow updates creates the object - they should set the created boolean // to true. -func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, +func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { ctx, span := tracer.Start(ctx, "packageRevisions::Update", trace.WithAttributes()) defer span.End() - return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, - ) + return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate) } // Delete implements the GracefulDeleter interface. @@ -228,6 +239,19 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida return nil, false, err } + pkgMutexKey := getPackageMutexKey(ns, name) + pkgMutex := getMutexForPackage(pkgMutexKey) + + locked := pkgMutex.TryLock() + if !locked { + return nil, false, + apierrors.NewConflict( + api.Resource("packagerevisions"), + name, + fmt.Errorf(GenericConflictErrorMsg, "package revision", pkgMutexKey)) + } + defer pkgMutex.Unlock() + if err := r.cad.DeletePackageRevision(ctx, repositoryObj, repoPkgRev); err != nil { return nil, false, apierrors.NewInternalError(err) } @@ -235,3 +259,25 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida // TODO: Should we do an async delete? return apiPkgRev, true, nil } + +func uncreatedPackageMutexKey(newApiPkgRev *api.PackageRevision) string { + return fmt.Sprintf("%s-%s-%s-%s", + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName, + ) +} + +func creationConflictError(newApiPkgRev *api.PackageRevision) error { + return fmt.Errorf( + fmt.Sprintf( + ConflictErrorMsgBase, + "to create package revision with details namespace=%q, repository=%q, package=%q,workspace=%q", + ), + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName, + ) +} diff --git a/pkg/registry/porch/packagerevisionresources.go b/pkg/registry/porch/packagerevisionresources.go index e19135c1..4f71a443 100644 --- a/pkg/registry/porch/packagerevisionresources.go +++ b/pkg/registry/porch/packagerevisionresources.go @@ -46,13 +46,11 @@ var _ rest.Scoper = &packageRevisionResources{} var _ rest.Updater = &packageRevisionResources{} var _ rest.SingularNameProvider = &packageRevisionResources{} - // GetSingularName implements the SingularNameProvider interface -func (r *packageRevisionResources) GetSingularName() (string) { +func (r *packageRevisionResources) GetSingularName() string { return "packagerevisionresources" } - func (r *packageRevisionResources) New() runtime.Object { return &api.PackageRevisionResources{} } @@ -127,6 +125,18 @@ func (r *packageRevisionResources) Update(ctx context.Context, name string, objI return nil, false, apierrors.NewBadRequest("namespace must be specified") } + pkgMutexKey := getPackageMutexKey(ns, name) + pkgMutex := getMutexForPackage(pkgMutexKey) + locked := pkgMutex.TryLock() + if !locked { + return nil, false, + apierrors.NewConflict( + api.Resource("packagerevisionresources"), + name, + fmt.Errorf(GenericConflictErrorMsg, "package revision resources", pkgMutexKey)) + } + defer pkgMutex.Unlock() + oldRepoPkgRev, err := r.packageCommon.getRepoPkgRev(ctx, name) if err != nil { return nil, false, err diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5233073d..d939a6d4 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "reflect" + "slices" "strings" "testing" "time" @@ -151,13 +152,13 @@ func (t *PorchSuite) TestCloneFromUpstream(ctx context.Context) { var list porchapi.PackageRevisionList t.ListE(ctx, &list, client.InNamespace(t.Namespace)) - basens := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v1"}) + upstreamPr := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v1"}) // Register the repository as 'downstream' t.RegisterMainGitRepositoryF(ctx, "downstream") // Create PackageRevision from upstream repo - pr := &porchapi.PackageRevision{ + clonedPr := &porchapi.PackageRevision{ TypeMeta: metav1.TypeMeta{ Kind: "PackageRevision", APIVersion: porchapi.SchemeGroupVersion.String(), @@ -175,7 +176,7 @@ func (t *PorchSuite) TestCloneFromUpstream(ctx context.Context) { Clone: &porchapi.PackageCloneTaskSpec{ Upstream: porchapi.UpstreamPackage{ UpstreamRef: &porchapi.PackageRevisionRef{ - Name: basens.Name, + Name: upstreamPr.Name, }, }, }, @@ -184,13 +185,13 @@ func (t *PorchSuite) TestCloneFromUpstream(ctx context.Context) { }, } - t.CreateF(ctx, pr) + t.CreateF(ctx, clonedPr) // Get istions resources var istions porchapi.PackageRevisionResources t.GetF(ctx, client.ObjectKey{ Namespace: t.Namespace, - Name: pr.Name, + Name: clonedPr.Name, }, &istions) kptfile := t.ParseKptfileF(&istions) @@ -237,6 +238,57 @@ func (t *PorchSuite) TestCloneFromUpstream(ctx context.Context) { } } +func (t *PorchSuite) TestConcurrentClones(ctx context.Context) { + const ( + upstreamRepository = "upstream" + upstreamPackage = "basens" + downstreamRepository = "downstream" + downstreamPackage = "istions-concurrent" + workspace = "test-workspace" + ) + + // Register Upstream and Downstream Repositories + t.RegisterGitRepositoryF(ctx, testBlueprintsRepo, upstreamRepository, "") + t.RegisterMainGitRepositoryF(ctx, downstreamRepository) + + var list porchapi.PackageRevisionList + t.ListE(ctx, &list, client.InNamespace(t.Namespace)) + upstreamPr := MustFindPackageRevision(t.T, + &list, + repository.PackageRevisionKey{ + Repository: upstreamRepository, + Package: upstreamPackage, + Revision: "v1"}) + + // Create PackageRevision from upstream repo + clonedPr := t.CreatePackageSkeleton(downstreamRepository, downstreamPackage, workspace) + clonedPr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeClone, + Clone: &porchapi.PackageCloneTaskSpec{ + Upstream: porchapi.UpstreamPackage{ + UpstreamRef: &porchapi.PackageRevisionRef{ + Name: upstreamPr.Name, + }, + }, + }, + }, + } + + // Two clients at the same time try to run the Create operation for the clone + cloneFunction := func() any { + return t.Client.Create(ctx, clonedPr) + } + results := RunInParallel(cloneFunction, cloneFunction) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) +} + func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { // Create a new package via init, no task specified const ( @@ -260,7 +312,7 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { Namespace: t.Namespace, }, Spec: porchapi.PackageRevisionSpec{ - PackageName: "empty-package", + PackageName: packageName, WorkspaceName: workspace, RepositoryName: repository, }, @@ -285,6 +337,39 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { } } +func (t *PorchSuite) TestConcurrentInits(ctx context.Context) { + // Create a new package via init, no task specified + const ( + repository = "git-concurrent" + packageName = "empty-package-concurrent" + revision = "v1" + workspace = "test-workspace" + description = "empty-package description" + ) + + // Register the repository + t.RegisterMainGitRepositoryF(ctx, repository) + + // Two clients try to create the same new draft package + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + creationFunction := func() any { + return t.Client.Create(ctx, pr) + } + results := RunInParallel(creationFunction, creationFunction) + + // one client succeeds; one receives a conflict error + expectedResultCount := 2 + actualResultCount := len(results) + assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) +} + func (t *PorchSuite) TestInitTaskPackage(ctx context.Context) { const ( repository = "git" @@ -568,6 +653,65 @@ func (t *PorchSuite) TestEditPackageRevision(ctx context.Context) { assert.Equal(t, 2, len(tasks)) } +func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) { + const ( + repository = "edit-test" + packageName = "simple-package-concurrent" + workspace = "workspace" + workspace2 = "workspace2" + ) + + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + t.CreateF(ctx, pr) + + // Publish and approve the source package to make it a valid source for edit. + pr.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed + t.UpdateF(ctx, pr) + pr.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished + t.UpdateApprovalF(ctx, pr, metav1.UpdateOptions{}) + + // Create a new revision of the package with a source that is a revision + // of the same package. + editPR := t.CreatePackageSkeleton(repository, packageName, workspace2) + editPR.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeEdit, + Edit: &porchapi.PackageEditTaskSpec{ + Source: &porchapi.PackageRevisionRef{ + Name: pr.Name, + }, + }, + }, + } + + // Two clients try to create the new package at the same time + editFunction := func() any { + return t.Client.Create(ctx, editPR) + } + results := RunInParallel( + editFunction, + editFunction) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) + + // Check its task list + var pkgRev porchapi.PackageRevision + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: editPR.Name, + }, &pkgRev) + tasks := pkgRev.Spec.Tasks + assert.Equal(t, 2, len(tasks)) +} + // Test will initialize an empty package, update its resources, adding a function // to the Kptfile's pipeline, and then check that the package was re-rendered. func (t *PorchSuite) TestUpdateResources(ctx context.Context) { @@ -703,6 +847,40 @@ func (t *PorchSuite) TestUpdateResourcesEmptyPatch(ctx context.Context) { assert.True(t, reflect.DeepEqual(tasksBeforeUpdate, tasksAfterUpdate)) } +func (t *PorchSuite) TestConcurrentResourceUpdates(ctx context.Context) { + const ( + repository = "concurrent-update-test" + packageName = "simple-package" + workspace = "workspace" + ) + + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + t.CreateF(ctx, pr) + + // Get the package resources + var newPackageResources porchapi.PackageRevisionResources + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: pr.Name, + }, &newPackageResources) + + // "Update" the package resources with two clients at the same time + updateFunction := func() any { + return t.Client.Update(ctx, &newPackageResources) + } + results := RunInParallel(updateFunction, updateFunction) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) +} + func (t *PorchSuite) TestFunctionRepository(ctx context.Context) { repo := &configapi.Repository{ ObjectMeta: metav1.ObjectMeta{ @@ -864,6 +1042,68 @@ func (t *PorchSuite) TestProposeApprove(ctx context.Context) { } } +func (t *PorchSuite) TestConcurrentProposeApprove(ctx context.Context) { + const ( + repository = "lifecycle" + packageName = "test-package-concurrent" + workspace = "workspace" + ) + + // Register the repository + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + pr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeInit, + Init: &porchapi.PackageInitTaskSpec{}, + }, + } + t.CreateF(ctx, pr) + + var pkg porchapi.PackageRevision + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: pr.Name, + }, &pkg) + + // Propose the package revision to be finalized + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed + proposeFunction := func() any { + return t.Client.Update(ctx, &pkg) + } + proposeResults := RunInParallel(proposeFunction, proposeFunction) + + assert.Contains(t, proposeResults, nil, "expected one 'propose' request to succeed, but did not happen - results: %v", proposeResults) + + conflictFailurePresent := slices.ContainsFunc(proposeResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'propose' request to fail with a conflict, but did not happen - results: %v", proposeResults) + + var proposed porchapi.PackageRevision + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: pr.Name, + }, &proposed) + + // Approve the package + proposed.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished + approveFunction := func() any { + _, err := t.Clientset.PorchV1alpha1().PackageRevisions(proposed.Namespace).UpdateApproval(ctx, proposed.Name, &proposed, metav1.UpdateOptions{}) + return err + } + approveResults := RunInParallel(approveFunction, approveFunction) + + assert.Contains(t, approveResults, nil, "expected one 'approve' request to succeed, but did not happen - results: %v", approveResults) + + conflictFailurePresent = slices.ContainsFunc(approveResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'approve' request to fail with a conflict, but did not happen - results: %v", approveResults) +} + func (t *PorchSuite) TestDeleteDraft(ctx context.Context) { const ( repository = "delete-draft" @@ -893,6 +1133,48 @@ func (t *PorchSuite) TestDeleteDraft(ctx context.Context) { t.MustNotExist(ctx, &draft) } +func (t *PorchSuite) TestConcurrentDeletes(ctx context.Context) { + const ( + repository = "delete-draft" + packageName = "test-delete-draft-concurrent" + revision = "v1" + workspace = "test-workspace" + ) + + // Register the repository and create a draft package + t.RegisterMainGitRepositoryF(ctx, repository) + created := t.CreatePackageDraftF(ctx, repository, packageName, workspace) + + // Check the package exists + var draft porchapi.PackageRevision + t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &draft) + + // Delete the same package with two clients at the same time + deleteFunction := func() any { + return t.Client.Delete(ctx, &porchapi.PackageRevision{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: t.Namespace, + Name: created.Name, + }, + }) + } + results := RunInParallel( + deleteFunction, + deleteFunction) + + expectedResultCount := 2 + actualResultCount := len(results) + assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) + t.MustNotExist(ctx, &draft) +} + func (t *PorchSuite) TestDeleteProposed(ctx context.Context) { const ( repository = "delete-proposed" @@ -980,6 +1262,44 @@ func (t *PorchSuite) TestDeleteFinal(ctx context.Context) { t.MustNotExist(ctx, &pkg) } +func (t *PorchSuite) TestConcurrentProposeDeletes(ctx context.Context) { + const ( + repository = "delete-final" + packageName = "test-delete-final-concurrent" + workspace = "workspace" + ) + + // Register the repository and create a draft package + t.RegisterMainGitRepositoryF(ctx, repository) + created := t.CreatePackageDraftF(ctx, repository, packageName, workspace) + // Check the package exists + var pkg porchapi.PackageRevision + t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &pkg) + + // Propose and approve the package revision to be finalized + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed + t.UpdateF(ctx, &pkg) + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished + t.UpdateApprovalF(ctx, &pkg, metav1.UpdateOptions{}) + + t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &pkg) + + // Propose deletion with two clients at once + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleDeletionProposed + proposeDeleteFunction := func() any { + _, err := t.Clientset.PorchV1alpha1().PackageRevisions(pkg.Namespace).UpdateApproval(ctx, pkg.Name, &pkg, metav1.UpdateOptions{}) + return err + } + proposeDeleteResults := RunInParallel(proposeDeleteFunction, proposeDeleteFunction) + + assert.Contains(t, proposeDeleteResults, nil, "expected one 'propose-delete' request to succeed, but did not happen - results: %v", proposeDeleteResults) + + conflictFailurePresent := slices.ContainsFunc(proposeDeleteResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'propose-delete' request to fail with a conflict, but did not happen") +} + func (t *PorchSuite) TestProposeDeleteAndUndo(ctx context.Context) { const ( repository = "test-propose-delete-and-undo" @@ -1353,7 +1673,63 @@ func (t *PorchSuite) TestPackageUpdate(ctx context.Context) { if _, found := revisionResources.Spec.Resources["resourcequota.yaml"]; !found { t.Errorf("Updated package should contain 'resourcequota.yaml` file") } +} + +func (t *PorchSuite) TestConcurrentPackageUpdates(ctx context.Context) { + const ( + gitRepository = "package-update" + packageName = "testns-concurrent" + workspace = "test-workspace" + ) + t.RegisterGitRepositoryF(ctx, testBlueprintsRepo, "test-blueprints", "") + + var list porchapi.PackageRevisionList + t.ListE(ctx, &list, client.InNamespace(t.Namespace)) + + basensV1 := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v1"}) + basensV2 := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v2"}) + + // Register the repository as 'downstream' + t.RegisterMainGitRepositoryF(ctx, gitRepository) + + // Create PackageRevision from upstream repo + pr := t.CreatePackageSkeleton(gitRepository, packageName, workspace) + pr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeClone, + Clone: &porchapi.PackageCloneTaskSpec{ + Upstream: porchapi.UpstreamPackage{ + UpstreamRef: &porchapi.PackageRevisionRef{ + Name: basensV1.Name, + }, + }, + }, + }, + } + t.CreateF(ctx, pr) + + upstream := pr.Spec.Tasks[0].Clone.Upstream.DeepCopy() + upstream.UpstreamRef.Name = basensV2.Name + pr.Spec.Tasks = append(pr.Spec.Tasks, porchapi.Task{ + Type: porchapi.TaskTypeUpdate, + Update: &porchapi.PackageUpdateTaskSpec{ + Upstream: *upstream, + }, + }) + + // Two clients at the same time try to update the downstream package + updateFunction := func() any { + return t.Client.Update(ctx, pr, &client.UpdateOptions{}) + } + results := RunInParallel(updateFunction, updateFunction) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) } func (t *PorchSuite) TestRegisterRepository(ctx context.Context) { diff --git a/test/e2e/suite.go b/test/e2e/suite.go index 2fcadc9f..db6c8ffb 100644 --- a/test/e2e/suite.go +++ b/test/e2e/suite.go @@ -127,6 +127,27 @@ func RunSuite(suite interface{}, t *testing.T) { }) } +func RunInParallel(functions ...func() any) []any { + var group sync.WaitGroup + var results []any + for _, eachFunction := range functions { + group.Add(1) + go func() { + defer group.Done() + if reflect.TypeOf(eachFunction).NumOut() == 0 { + results = append(results, nil) + eachFunction() + } else { + eachResult := eachFunction() + + results = append(results, eachResult) + } + }() + } + group.Wait() + return results +} + func (t *TestSuite) SetT(tt *testing.T) { t.T = tt } diff --git a/test/e2e/suite_utils.go b/test/e2e/suite_utils.go index 801325a0..def36972 100644 --- a/test/e2e/suite_utils.go +++ b/test/e2e/suite_utils.go @@ -213,7 +213,7 @@ func (t *TestSuite) registerGitRepositoryFromConfigF(ctx context.Context, name s t.Cleanup(func() { t.DeleteE(ctx, repository) t.WaitUntilRepositoryDeleted(ctx, name, t.Namespace) - t.WaitUntilAllPackagesDeleted(ctx, name) + t.WaitUntilAllPackagesDeleted(ctx, name, t.Namespace) }) // Make sure the repository is ready before we test to (hopefully) @@ -249,9 +249,21 @@ func InNamespace(ns string) RepositoryOption { } // Creates an empty package draft by initializing an empty package -func (t *TestSuite) CreatePackageDraftF(ctx context.Context, repository, name, workspace string) *porchapi.PackageRevision { +func (t *TestSuite) CreatePackageDraftF(ctx context.Context, repository, packageName, workspace string) *porchapi.PackageRevision { t.Helper() - pr := &porchapi.PackageRevision{ + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + pr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeInit, + Init: &porchapi.PackageInitTaskSpec{}, + }, + } + t.CreateF(ctx, pr) + return pr +} + +func (t *TestSuite) CreatePackageSkeleton(repoName, packageName, workspace string) *porchapi.PackageRevision { + return &porchapi.PackageRevision{ TypeMeta: metav1.TypeMeta{ Kind: "PackageRevision", APIVersion: porchapi.SchemeGroupVersion.String(), @@ -260,19 +272,13 @@ func (t *TestSuite) CreatePackageDraftF(ctx context.Context, repository, name, w Namespace: t.Namespace, }, Spec: porchapi.PackageRevisionSpec{ - PackageName: name, + PackageName: packageName, WorkspaceName: porchapi.WorkspaceName(workspace), - RepositoryName: repository, - Tasks: []porchapi.Task{ - { - Type: porchapi.TaskTypeInit, - Init: &porchapi.PackageInitTaskSpec{}, - }, - }, + RepositoryName: repoName, + // empty tasks list - set them as needed in the particular usage + Tasks: []porchapi.Task{}, }, } - t.CreateF(ctx, pr) - return pr } func (t *TestSuite) MustExist(ctx context.Context, key client.ObjectKey, obj client.Object) { @@ -365,7 +371,7 @@ func (t *TestSuite) WaitUntilRepositoryDeleted(ctx context.Context, name, namesp } } -func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { +func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string, namespace string) { t.Helper() err := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (done bool, err error) { t.Helper() @@ -375,7 +381,7 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st return false, nil } for _, pkgRev := range pkgRevList.Items { - if strings.HasPrefix(fmt.Sprintf("%s-", pkgRev.Name), repoName) { + if pkgRev.Namespace == namespace && strings.HasPrefix(fmt.Sprintf("%s-", pkgRev.Name), repoName) { t.Logf("Found package %s from repo %s", pkgRev.Name, repoName) return false, nil } @@ -387,8 +393,8 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st return false, nil } for _, internalPkgRev := range internalPkgRevList.Items { - if strings.HasPrefix(fmt.Sprintf("%s-", internalPkgRev.Name), repoName) { - t.Logf("Found internalPkg %s from repo %s", internalPkgRev.Name, repoName) + if internalPkgRev.Namespace == namespace && strings.HasPrefix(fmt.Sprintf("%s-", internalPkgRev.Name), repoName) { + t.Logf("Found internalPkg %s/%s from repo %s", internalPkgRev.Namespace, internalPkgRev.Name, repoName) return false, nil } }