From 502304f016500cdaae500df3ddb42d77cd7e8884 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Thu, 26 Sep 2024 11:33:44 +0100 Subject: [PATCH 01/11] Issue #657 - session conflict handling - added mutex-based handling to PackageRevision.Create operation - if another request is already creating a package revision with the same details, fails with HTTP 409 - set cachedRepository to poll once more for PackageRevs to make sure of fresh state when trying to delete them from Kubernetes - otherwise they can't be updated to remove the finalizers and unstick the deletion https://github.com/nephio-project/nephio/issues/657 --- .vscode/launch.json | 22 +++- pkg/cache/repository.go | 4 +- pkg/registry/porch/packagerevision.go | 48 +++++++-- test/e2e/e2e_test.go | 141 +++++++++++++++++++++++++- test/e2e/suite.go | 21 ++++ test/e2e/suite_utils.go | 32 +++++- 6 files changed, 253 insertions(+), 15 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index a72ff53f..9979c12c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -37,7 +37,18 @@ } }, { - "name": "Launch test function", + "name": "Run Porchctl command", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/cmd/porchctl/main.go", + "args": [ + "-n", "porch", "rpkg", "approve", "blueprints-835222fccff6e9e042e3b01b8c554f394d6e55d1" + ], + "cwd": "${workspaceFolder}" + }, + { + "name": "Launch E2E tests", "type": "go", "request": "launch", "mode": "test", @@ -45,7 +56,7 @@ "args": [ "-test.v", "-test.run", - "TestE2E/PorchSuite/TestGitRepositoryWithReleaseTagsAndDirectory" + "TestE2E/PorchSuite/TestConcurrentInits" ], "env": { "E2E": "1"} }, @@ -63,5 +74,12 @@ "namespace=foo" ] } + ], + "compounds": [ + { + "name": "Launch Server and Controllers", + "configurations": ["Launch Server", "Launch Controllers"], + "stopAll": true + } ] } \ No newline at end of file diff --git a/pkg/cache/repository.go b/pkg/cache/repository.go index 5dad725d..55f39b2c 100644 --- a/pkg/cache/repository.go +++ b/pkg/cache/repository.go @@ -319,6 +319,7 @@ func (r *cachedRepository) DeletePackage(ctx context.Context, old repository.Pac } func (r *cachedRepository) Close() error { + r.pollOnce(context.TODO()) r.cancel() // Make sure that watch events are sent for packagerevisions that are @@ -338,12 +339,13 @@ func (r *cachedRepository) Close() error { // There isn't much use in returning an error here, so we just log it // and create a PackageRevisionMeta with just name and namespace. This // makes sure that the Delete event is sent. - klog.Warningf("Error looking up PackageRev CR for %s: %v", nn.Name, err) + klog.Warningf("Error deleting PackageRev CR for %s: %v", nn.Name, err) pkgRevMeta = meta.PackageRevisionMeta{ Name: nn.Name, Namespace: nn.Namespace, } } + klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name) sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta) } klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions)) diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index 51dd98cd..1226875b 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -17,6 +17,7 @@ package porch import ( "context" "fmt" + "sync" api "github.com/nephio-project/porch/api/porch/v1alpha1" "github.com/nephio-project/porch/pkg/engine" @@ -34,6 +35,13 @@ import ( var tracer = otel.Tracer("apiserver") +var mutexMapMutex sync.Mutex +var packageRevisionCreationMutexes = map[string]*sync.Mutex{} + +const ( + ConflictErrorMsg = "another request is already in progress to create %s with details %s" +) + type packageRevisions struct { packageCommon rest.TableConvertor @@ -49,13 +57,11 @@ var _ rest.GracefulDeleter = &packageRevisions{} var _ rest.Watcher = &packageRevisions{} var _ rest.SingularNameProvider = &packageRevisions{} - // GetSingularName implements the SingularNameProvider interface -func (r *packageRevisions) GetSingularName() (string) { +func (r *packageRevisions) GetSingularName() string { return "packagerevision" } - func (r *packageRevisions) New() runtime.Object { return &api.PackageRevision{} } @@ -120,7 +126,7 @@ func (r *packageRevisions) Get(ctx context.Context, name string, options *metav1 } // Create implements the Creater interface. -func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc, +func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { ctx, span := tracer.Start(ctx, "packageRevisions::Create", trace.WithAttributes()) defer span.End() @@ -166,6 +172,35 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj parentPackage = p } + uncreatedPackageKey := fmt.Sprintf("%s-%s-%s-%s", + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName) + + mutexMapMutex.Lock() + packageMutex, alreadyPresent := packageRevisionCreationMutexes[uncreatedPackageKey] + if !alreadyPresent { + packageMutex = &sync.Mutex{} + packageRevisionCreationMutexes[uncreatedPackageKey] = packageMutex + } + mutexMapMutex.Unlock() + + lockAcquired := packageMutex.TryLock() + if !lockAcquired { + return nil, + apierrors.NewConflict( + api.Resource("packagerevisions"), + fmt.Sprintf("(new creation)"), + fmt.Errorf(ConflictErrorMsg, "package revision", fmt.Sprintf( + "namespace=%s, repository=%s, package=%s,workspace=%s", + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName))) + } + defer packageMutex.Unlock() + createdRepoPkgRev, err := r.cad.CreatePackageRevision(ctx, repositoryObj, newApiPkgRev, parentPackage) if err != nil { return nil, apierrors.NewInternalError(err) @@ -184,13 +219,12 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj // Update finds a resource in the storage and updates it. Some implementations // may allow updates creates the object - they should set the created boolean // to true. -func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, +func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { ctx, span := tracer.Start(ctx, "packageRevisions::Update", trace.WithAttributes()) defer span.End() - return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, - ) + return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate) } // Delete implements the GracefulDeleter interface. diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5233073d..f4c7b0d0 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "reflect" + "slices" "strings" "testing" "time" @@ -260,7 +261,7 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { Namespace: t.Namespace, }, Spec: porchapi.PackageRevisionSpec{ - PackageName: "empty-package", + PackageName: packageName, WorkspaceName: workspace, RepositoryName: repository, }, @@ -285,6 +286,52 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { } } +func (t *PorchSuite) TestConcurrentInits(ctx context.Context) { + // Create a new package via init, no task specified + const ( + repository = "git" + packageName = "empty-package-concurrent" + revision = "v1" + workspace = "test-workspace" + description = "empty-package description" + ) + + // Register the repository + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := &porchapi.PackageRevision{ + TypeMeta: metav1.TypeMeta{ + Kind: "PackageRevision", + APIVersion: porchapi.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: t.Namespace, + }, + Spec: porchapi.PackageRevisionSpec{ + PackageName: packageName, + WorkspaceName: workspace, + RepositoryName: repository, + }, + } + // Two clients try to create it at the same time + creationFunction := func() any { + return t.Client.Create(ctx, pr) + } + results := RunInParallel(creationFunction, creationFunction) + + expectedResultCount := 2 + actualResultCount := len(results) + assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") +} + func (t *PorchSuite) TestInitTaskPackage(ctx context.Context) { const ( repository = "git" @@ -568,6 +615,98 @@ func (t *PorchSuite) TestEditPackageRevision(ctx context.Context) { assert.Equal(t, 2, len(tasks)) } +func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) { + const ( + repository = "edit-test" + packageName = "simple-package-concurrent" + workspace = "workspace" + workspace2 = "workspace2" + ) + + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := &porchapi.PackageRevision{ + TypeMeta: metav1.TypeMeta{ + Kind: "PackageRevision", + APIVersion: porchapi.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: t.Namespace, + }, + Spec: porchapi.PackageRevisionSpec{ + PackageName: packageName, + WorkspaceName: workspace, + RepositoryName: repository, + }, + } + t.CreateF(ctx, pr) + + // Publish and approve the source package to make it a valid source for edit. + pr.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed + t.UpdateF(ctx, pr) + pr.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished + t.UpdateApprovalF(ctx, pr, metav1.UpdateOptions{}) + + // Create a new revision of the package with a source that is a revision + // of the same package. + editPR := &porchapi.PackageRevision{ + TypeMeta: metav1.TypeMeta{ + Kind: "PackageRevision", + APIVersion: porchapi.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: t.Namespace, + }, + Spec: porchapi.PackageRevisionSpec{ + PackageName: packageName, + WorkspaceName: workspace2, + RepositoryName: repository, + Tasks: []porchapi.Task{ + { + Type: porchapi.TaskTypeEdit, + Edit: &porchapi.PackageEditTaskSpec{ + Source: &porchapi.PackageRevisionRef{ + Name: pr.Name, + }, + }, + }, + }, + }, + } + + // Two clients try to create the new package at the same time + editOperation := func() any { + return t.Client.Create(ctx, editPR) + } + results := RunInParallel( + editOperation, + editOperation) + + expectedResultCount := 2 + actualResultCount := len(results) + assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + + // Check its task list + var pkgRev porchapi.PackageRevision + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: editPR.Name, + }, &pkgRev) + tasks := pkgRev.Spec.Tasks + for _, tsk := range tasks { + t.Logf("Task: %s", tsk.Type) + } + assert.Equal(t, 2, len(tasks)) +} + // Test will initialize an empty package, update its resources, adding a function // to the Kptfile's pipeline, and then check that the package was re-rendered. func (t *PorchSuite) TestUpdateResources(ctx context.Context) { diff --git a/test/e2e/suite.go b/test/e2e/suite.go index 2fcadc9f..db6c8ffb 100644 --- a/test/e2e/suite.go +++ b/test/e2e/suite.go @@ -127,6 +127,27 @@ func RunSuite(suite interface{}, t *testing.T) { }) } +func RunInParallel(functions ...func() any) []any { + var group sync.WaitGroup + var results []any + for _, eachFunction := range functions { + group.Add(1) + go func() { + defer group.Done() + if reflect.TypeOf(eachFunction).NumOut() == 0 { + results = append(results, nil) + eachFunction() + } else { + eachResult := eachFunction() + + results = append(results, eachResult) + } + }() + } + group.Wait() + return results +} + func (t *TestSuite) SetT(tt *testing.T) { t.T = tt } diff --git a/test/e2e/suite_utils.go b/test/e2e/suite_utils.go index 801325a0..afc9e80b 100644 --- a/test/e2e/suite_utils.go +++ b/test/e2e/suite_utils.go @@ -213,7 +213,7 @@ func (t *TestSuite) registerGitRepositoryFromConfigF(ctx context.Context, name s t.Cleanup(func() { t.DeleteE(ctx, repository) t.WaitUntilRepositoryDeleted(ctx, name, t.Namespace) - t.WaitUntilAllPackagesDeleted(ctx, name) + t.WaitUntilAllPackagesDeleted(ctx, name, t.Namespace) }) // Make sure the repository is ready before we test to (hopefully) @@ -365,7 +365,31 @@ func (t *TestSuite) WaitUntilRepositoryDeleted(ctx context.Context, name, namesp } } +<<<<<<< HEAD:test/e2e/suite_utils.go func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { +======= +<<<<<<< HEAD:test/e2e/e2e_utils_test.go +func (t *TestSuite) waitUntilAllPackagesDeleted(ctx context.Context, repoName string) { +======= +<<<<<<< HEAD:test/e2e/suite_utils.go +func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { +======= +<<<<<<< HEAD:test/e2e/e2e_utils_test.go +func (t *TestSuite) waitUntilAllPackagesDeleted(ctx context.Context, repoName string) { +======= +<<<<<<< HEAD:test/e2e/suite_utils.go +func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { +======= +<<<<<<< HEAD:test/e2e/e2e_utils_test.go +func (t *TestSuite) waitUntilAllPackagesDeleted(ctx context.Context, repoName string) { +======= +func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string, namespace string) { +>>>>>>> 2ed49bf (Issue #657 - session conflict handling):test/e2e/suite_utils.go +>>>>>>> 02825a0 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go +>>>>>>> 9835dc1 (Issue #657 - session conflict handling):test/e2e/suite_utils.go +>>>>>>> 021ed5a (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go +>>>>>>> aadf3eb (Issue #657 - session conflict handling):test/e2e/suite_utils.go +>>>>>>> 6983ea7 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go t.Helper() err := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (done bool, err error) { t.Helper() @@ -375,7 +399,7 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st return false, nil } for _, pkgRev := range pkgRevList.Items { - if strings.HasPrefix(fmt.Sprintf("%s-", pkgRev.Name), repoName) { + if pkgRev.Namespace == namespace && strings.HasPrefix(fmt.Sprintf("%s-", pkgRev.Name), repoName) { t.Logf("Found package %s from repo %s", pkgRev.Name, repoName) return false, nil } @@ -387,8 +411,8 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st return false, nil } for _, internalPkgRev := range internalPkgRevList.Items { - if strings.HasPrefix(fmt.Sprintf("%s-", internalPkgRev.Name), repoName) { - t.Logf("Found internalPkg %s from repo %s", internalPkgRev.Name, repoName) + if internalPkgRev.Namespace == namespace && strings.HasPrefix(fmt.Sprintf("%s-", internalPkgRev.Name), repoName) { + t.Logf("Found internalPkg %s/%s from repo %s", internalPkgRev.Namespace, internalPkgRev.Name, repoName) return false, nil } } From 34d393faf31cdf4a74f1c482e446b02c19522408 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Fri, 27 Sep 2024 17:30:35 +0100 Subject: [PATCH 02/11] Issue #657 - session conflict handling - added mutex-based handling to PackageRevision.Delete operation - added tests for delete, edit, and clone PackageRevision https://github.com/nephio-project/nephio/issues/657 --- .vscode/launch.json | 2 +- pkg/registry/porch/packagerevision.go | 32 ++++- test/e2e/e2e_test.go | 172 +++++++++++++++++--------- test/e2e/suite_utils.go | 44 +++++-- 4 files changed, 174 insertions(+), 76 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 9979c12c..c26fcad6 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -56,7 +56,7 @@ "args": [ "-test.v", "-test.run", - "TestE2E/PorchSuite/TestConcurrentInits" + "TestE2E/PorchSuite/TestConcurrentDeletes" ], "env": { "E2E": "1"} }, diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index 1226875b..a1a47ea0 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -36,10 +36,11 @@ import ( var tracer = otel.Tracer("apiserver") var mutexMapMutex sync.Mutex -var packageRevisionCreationMutexes = map[string]*sync.Mutex{} +var pkgRevOperationMutexes = map[string]*sync.Mutex{} const ( - ConflictErrorMsg = "another request is already in progress to create %s with details %s" + CreateConflictErrorMsg = "another request is already in progress to create %s with details %s" + DeleteConflictErrorMsg = "another request is already in progress to delete %s \"%s\"" ) type packageRevisions struct { @@ -172,17 +173,17 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj parentPackage = p } - uncreatedPackageKey := fmt.Sprintf("%s-%s-%s-%s", + packageMutexKey := fmt.Sprintf("%s-%s-%s-%s", newApiPkgRev.Namespace, newApiPkgRev.Spec.RepositoryName, newApiPkgRev.Spec.PackageName, newApiPkgRev.Spec.WorkspaceName) mutexMapMutex.Lock() - packageMutex, alreadyPresent := packageRevisionCreationMutexes[uncreatedPackageKey] + packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] if !alreadyPresent { packageMutex = &sync.Mutex{} - packageRevisionCreationMutexes[uncreatedPackageKey] = packageMutex + pkgRevOperationMutexes[packageMutexKey] = packageMutex } mutexMapMutex.Unlock() @@ -192,7 +193,7 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj apierrors.NewConflict( api.Resource("packagerevisions"), fmt.Sprintf("(new creation)"), - fmt.Errorf(ConflictErrorMsg, "package revision", fmt.Sprintf( + fmt.Errorf(CreateConflictErrorMsg, "package revision", fmt.Sprintf( "namespace=%s, repository=%s, package=%s,workspace=%s", newApiPkgRev.Namespace, newApiPkgRev.Spec.RepositoryName, @@ -262,6 +263,25 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida return nil, false, err } + packageMutexKey := fmt.Sprintf("%s/%s", ns, name) + mutexMapMutex.Lock() + packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] + if !alreadyPresent { + packageMutex = &sync.Mutex{} + pkgRevOperationMutexes[packageMutexKey] = packageMutex + } + mutexMapMutex.Unlock() + + lockAcquired := packageMutex.TryLock() + if !lockAcquired { + return nil, false, + apierrors.NewConflict( + api.Resource("packagerevisions"), + name, + fmt.Errorf(DeleteConflictErrorMsg, "package revision", packageMutexKey)) + } + defer packageMutex.Unlock() + if err := r.cad.DeletePackageRevision(ctx, repositoryObj, repoPkgRev); err != nil { return nil, false, apierrors.NewInternalError(err) } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index f4c7b0d0..f5a92252 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -152,13 +152,13 @@ func (t *PorchSuite) TestCloneFromUpstream(ctx context.Context) { var list porchapi.PackageRevisionList t.ListE(ctx, &list, client.InNamespace(t.Namespace)) - basens := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v1"}) + upstreamPr := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v1"}) // Register the repository as 'downstream' t.RegisterMainGitRepositoryF(ctx, "downstream") // Create PackageRevision from upstream repo - pr := &porchapi.PackageRevision{ + clonedPr := &porchapi.PackageRevision{ TypeMeta: metav1.TypeMeta{ Kind: "PackageRevision", APIVersion: porchapi.SchemeGroupVersion.String(), @@ -176,7 +176,7 @@ func (t *PorchSuite) TestCloneFromUpstream(ctx context.Context) { Clone: &porchapi.PackageCloneTaskSpec{ Upstream: porchapi.UpstreamPackage{ UpstreamRef: &porchapi.PackageRevisionRef{ - Name: basens.Name, + Name: upstreamPr.Name, }, }, }, @@ -185,13 +185,13 @@ func (t *PorchSuite) TestCloneFromUpstream(ctx context.Context) { }, } - t.CreateF(ctx, pr) + t.CreateF(ctx, clonedPr) // Get istions resources var istions porchapi.PackageRevisionResources t.GetF(ctx, client.ObjectKey{ Namespace: t.Namespace, - Name: pr.Name, + Name: clonedPr.Name, }, &istions) kptfile := t.ParseKptfileF(&istions) @@ -238,6 +238,62 @@ func (t *PorchSuite) TestCloneFromUpstream(ctx context.Context) { } } +func (t *PorchSuite) TestConcurrentClones(ctx context.Context) { + const ( + upstreamRepository = "upstream" + upstreamPackage = "basens" + downstreamRepository = "downstream" + downstreamPackage = "istions-concurrent" + workspace = "test-workspace" + ) + + // Register Upstream and Downstream Repositories + t.RegisterGitRepositoryF(ctx, testBlueprintsRepo, upstreamRepository, "") + t.RegisterMainGitRepositoryF(ctx, downstreamRepository) + + var list porchapi.PackageRevisionList + t.ListE(ctx, &list, client.InNamespace(t.Namespace)) + upstreamPr := MustFindPackageRevision(t.T, + &list, + repository.PackageRevisionKey{ + Repository: upstreamRepository, + Package: upstreamPackage, + Revision: "v1"}) + + // Create PackageRevision from upstream repo + clonedPr := t.CreatePackageSkeleton(downstreamRepository, downstreamPackage, workspace) + clonedPr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeClone, + Clone: &porchapi.PackageCloneTaskSpec{ + Upstream: porchapi.UpstreamPackage{ + UpstreamRef: &porchapi.PackageRevisionRef{ + Name: upstreamPr.Name, + }, + }, + }, + }, + } + + // Two clients at the same time try to run the Create operation for the clone + cloneFunction := func() any { + return t.Client.Create(ctx, clonedPr) + } + results := RunInParallel(cloneFunction, cloneFunction) + + expectedResultCount := 2 + actualResultCount := len(results) + assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + +} + func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { // Create a new package via init, no task specified const ( @@ -299,27 +355,14 @@ func (t *PorchSuite) TestConcurrentInits(ctx context.Context) { // Register the repository t.RegisterMainGitRepositoryF(ctx, repository) - // Create a new package (via init) - pr := &porchapi.PackageRevision{ - TypeMeta: metav1.TypeMeta{ - Kind: "PackageRevision", - APIVersion: porchapi.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: t.Namespace, - }, - Spec: porchapi.PackageRevisionSpec{ - PackageName: packageName, - WorkspaceName: workspace, - RepositoryName: repository, - }, - } - // Two clients try to create it at the same time + // Two clients try to create the same new draft package + pr := t.CreatePackageSkeleton(repository, packageName, workspace) creationFunction := func() any { return t.Client.Create(ctx, pr) } results := RunInParallel(creationFunction, creationFunction) + // one client succeeds; one receives a conflict error expectedResultCount := 2 actualResultCount := len(results) assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) @@ -626,20 +669,7 @@ func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) { t.RegisterMainGitRepositoryF(ctx, repository) // Create a new package (via init) - pr := &porchapi.PackageRevision{ - TypeMeta: metav1.TypeMeta{ - Kind: "PackageRevision", - APIVersion: porchapi.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: t.Namespace, - }, - Spec: porchapi.PackageRevisionSpec{ - PackageName: packageName, - WorkspaceName: workspace, - RepositoryName: repository, - }, - } + pr := t.CreatePackageSkeleton(repository, packageName, workspace) t.CreateF(ctx, pr) // Publish and approve the source package to make it a valid source for edit. @@ -650,26 +680,13 @@ func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) { // Create a new revision of the package with a source that is a revision // of the same package. - editPR := &porchapi.PackageRevision{ - TypeMeta: metav1.TypeMeta{ - Kind: "PackageRevision", - APIVersion: porchapi.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: t.Namespace, - }, - Spec: porchapi.PackageRevisionSpec{ - PackageName: packageName, - WorkspaceName: workspace2, - RepositoryName: repository, - Tasks: []porchapi.Task{ - { - Type: porchapi.TaskTypeEdit, - Edit: &porchapi.PackageEditTaskSpec{ - Source: &porchapi.PackageRevisionRef{ - Name: pr.Name, - }, - }, + editPR := t.CreatePackageSkeleton(repository, packageName, workspace2) + editPR.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeEdit, + Edit: &porchapi.PackageEditTaskSpec{ + Source: &porchapi.PackageRevisionRef{ + Name: pr.Name, }, }, }, @@ -701,9 +718,6 @@ func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) { Name: editPR.Name, }, &pkgRev) tasks := pkgRev.Spec.Tasks - for _, tsk := range tasks { - t.Logf("Task: %s", tsk.Type) - } assert.Equal(t, 2, len(tasks)) } @@ -1032,6 +1046,48 @@ func (t *PorchSuite) TestDeleteDraft(ctx context.Context) { t.MustNotExist(ctx, &draft) } +func (t *PorchSuite) TestConcurrentDeletes(ctx context.Context) { + const ( + repository = "delete-draft" + packageName = "test-delete-draft-concurrent" + revision = "v1" + workspace = "test-workspace" + ) + + // Register the repository and create a draft package + t.RegisterMainGitRepositoryF(ctx, repository) + created := t.CreatePackageDraftF(ctx, repository, packageName, workspace) + + // Check the package exists + var draft porchapi.PackageRevision + t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &draft) + + // Delete the same package with two clients at the same time + deleteOperation := func() any { + return t.Client.Delete(ctx, &porchapi.PackageRevision{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: t.Namespace, + Name: created.Name, + }, + }) + } + results := RunInParallel( + deleteOperation, + deleteOperation) + + expectedResultCount := 2 + actualResultCount := len(results) + assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + t.MustNotExist(ctx, &draft) +} + func (t *PorchSuite) TestDeleteProposed(ctx context.Context) { const ( repository = "delete-proposed" diff --git a/test/e2e/suite_utils.go b/test/e2e/suite_utils.go index afc9e80b..d22d526c 100644 --- a/test/e2e/suite_utils.go +++ b/test/e2e/suite_utils.go @@ -249,9 +249,21 @@ func InNamespace(ns string) RepositoryOption { } // Creates an empty package draft by initializing an empty package -func (t *TestSuite) CreatePackageDraftF(ctx context.Context, repository, name, workspace string) *porchapi.PackageRevision { +func (t *TestSuite) CreatePackageDraftF(ctx context.Context, repository, packageName, workspace string) *porchapi.PackageRevision { t.Helper() - pr := &porchapi.PackageRevision{ + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + pr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeInit, + Init: &porchapi.PackageInitTaskSpec{}, + }, + } + t.CreateF(ctx, pr) + return pr +} + +func (t *TestSuite) CreatePackageSkeleton(repository, packageName, workspace string) *porchapi.PackageRevision { + return &porchapi.PackageRevision{ TypeMeta: metav1.TypeMeta{ Kind: "PackageRevision", APIVersion: porchapi.SchemeGroupVersion.String(), @@ -260,19 +272,13 @@ func (t *TestSuite) CreatePackageDraftF(ctx context.Context, repository, name, w Namespace: t.Namespace, }, Spec: porchapi.PackageRevisionSpec{ - PackageName: name, + PackageName: packageName, WorkspaceName: porchapi.WorkspaceName(workspace), RepositoryName: repository, - Tasks: []porchapi.Task{ - { - Type: porchapi.TaskTypeInit, - Init: &porchapi.PackageInitTaskSpec{}, - }, - }, + // empty tasks list - set them as needed in the particular usage + Tasks: []porchapi.Task{}, }, } - t.CreateF(ctx, pr) - return pr } func (t *TestSuite) MustExist(ctx context.Context, key client.ObjectKey, obj client.Object) { @@ -378,6 +384,7 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st func (t *TestSuite) waitUntilAllPackagesDeleted(ctx context.Context, repoName string) { ======= <<<<<<< HEAD:test/e2e/suite_utils.go +<<<<<<< HEAD:test/e2e/suite_utils.go func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { ======= <<<<<<< HEAD:test/e2e/e2e_utils_test.go @@ -386,10 +393,25 @@ func (t *TestSuite) waitUntilAllPackagesDeleted(ctx context.Context, repoName st func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string, namespace string) { >>>>>>> 2ed49bf (Issue #657 - session conflict handling):test/e2e/suite_utils.go >>>>>>> 02825a0 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go +<<<<<<< HEAD:test/e2e/e2e_utils_test.go >>>>>>> 9835dc1 (Issue #657 - session conflict handling):test/e2e/suite_utils.go +<<<<<<< HEAD:test/e2e/suite_utils.go >>>>>>> 021ed5a (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go +<<<<<<< HEAD:test/e2e/e2e_utils_test.go >>>>>>> aadf3eb (Issue #657 - session conflict handling):test/e2e/suite_utils.go +<<<<<<< HEAD:test/e2e/suite_utils.go >>>>>>> 6983ea7 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go +======= +======= +======= +======= +======= +func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string, namespace string) { +>>>>>>> 90101d4 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go +>>>>>>> 5286ef6 (Issue #657 - session conflict handling):test/e2e/suite_utils.go +>>>>>>> 7ed9d28 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go +>>>>>>> a9e3581 (Issue #657 - session conflict handling):test/e2e/suite_utils.go +>>>>>>> 2bf829c (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go t.Helper() err := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (done bool, err error) { t.Helper() From 233c8e7ed84542a333e24737d0ae5e47a8114b24 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Mon, 30 Sep 2024 18:01:34 +0100 Subject: [PATCH 03/11] Issue #657 - session conflict handling - added mutex-based handling to PackageRevision.Update operation - the meat of the operation is in packagecommon.go, so it covers some other operations as well: - propose - packageRevisionsApproval.Update (approve, propose-delete) - added tests for operations mentioned above https://github.com/nephio-project/nephio/issues/657 --- pkg/registry/porch/packagecommon.go | 13 ++ pkg/registry/porch/packagerevision.go | 33 +++-- test/e2e/e2e_test.go | 173 ++++++++++++++++++++++++-- 3 files changed, 189 insertions(+), 30 deletions(-) diff --git a/pkg/registry/porch/packagecommon.go b/pkg/registry/porch/packagecommon.go index 32b26b95..25875ff5 100644 --- a/pkg/registry/porch/packagecommon.go +++ b/pkg/registry/porch/packagecommon.go @@ -210,6 +210,19 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string, return nil, false, apierrors.NewBadRequest("namespace must be specified") } + packageMutexKey := fmt.Sprintf("%s/%s", ns, name) + packageMutex := getMutexForPackage(packageMutexKey) + + lockAcquired := packageMutex.TryLock() + if !lockAcquired { + return nil, false, + apierrors.NewConflict( + api.Resource("packagerevisions"), + name, + fmt.Errorf(GenericConflictErrorMsg, "package revision", packageMutexKey)) + } + defer packageMutex.Unlock() + // isCreate tracks whether this is an update that creates an object (this happens in server-side apply) isCreate := false diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index a1a47ea0..ab103b4d 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -39,8 +39,8 @@ var mutexMapMutex sync.Mutex var pkgRevOperationMutexes = map[string]*sync.Mutex{} const ( - CreateConflictErrorMsg = "another request is already in progress to create %s with details %s" - DeleteConflictErrorMsg = "another request is already in progress to delete %s \"%s\"" + CreateConflictErrorMsg = "another request is already in progress to create %s with details %s" + GenericConflictErrorMsg = "another request is already in progress on %s \"%s\"" ) type packageRevisions struct { @@ -179,13 +179,7 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj newApiPkgRev.Spec.PackageName, newApiPkgRev.Spec.WorkspaceName) - mutexMapMutex.Lock() - packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] - if !alreadyPresent { - packageMutex = &sync.Mutex{} - pkgRevOperationMutexes[packageMutexKey] = packageMutex - } - mutexMapMutex.Unlock() + packageMutex := getMutexForPackage(packageMutexKey) lockAcquired := packageMutex.TryLock() if !lockAcquired { @@ -264,13 +258,7 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida } packageMutexKey := fmt.Sprintf("%s/%s", ns, name) - mutexMapMutex.Lock() - packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] - if !alreadyPresent { - packageMutex = &sync.Mutex{} - pkgRevOperationMutexes[packageMutexKey] = packageMutex - } - mutexMapMutex.Unlock() + packageMutex := getMutexForPackage(packageMutexKey) lockAcquired := packageMutex.TryLock() if !lockAcquired { @@ -278,7 +266,7 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida apierrors.NewConflict( api.Resource("packagerevisions"), name, - fmt.Errorf(DeleteConflictErrorMsg, "package revision", packageMutexKey)) + fmt.Errorf(GenericConflictErrorMsg, "package revision", packageMutexKey)) } defer packageMutex.Unlock() @@ -289,3 +277,14 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida // TODO: Should we do an async delete? return apiPkgRev, true, nil } + +func getMutexForPackage(packageMutexKey string) *sync.Mutex { + mutexMapMutex.Lock() + defer mutexMapMutex.Unlock() + packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] + if !alreadyPresent { + packageMutex = &sync.Mutex{} + pkgRevOperationMutexes[packageMutexKey] = packageMutex + } + return packageMutex +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index f5a92252..6e06d626 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -281,17 +281,12 @@ func (t *PorchSuite) TestConcurrentClones(ctx context.Context) { } results := RunInParallel(cloneFunction, cloneFunction) - expectedResultCount := 2 - actualResultCount := len(results) - assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) - assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") }) - assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") - + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) } func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { @@ -372,7 +367,7 @@ func (t *PorchSuite) TestConcurrentInits(ctx context.Context) { conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") }) - assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) } func (t *PorchSuite) TestInitTaskPackage(ctx context.Context) { @@ -700,16 +695,12 @@ func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) { editOperation, editOperation) - expectedResultCount := 2 - actualResultCount := len(results) - assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) - assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") }) - assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) // Check its task list var pkgRev porchapi.PackageRevision @@ -1017,6 +1008,68 @@ func (t *PorchSuite) TestProposeApprove(ctx context.Context) { } } +func (t *PorchSuite) TestConcurrentProposeApprove(ctx context.Context) { + const ( + repository = "lifecycle" + packageName = "test-package" + workspace = "workspace" + ) + + // Register the repository + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + pr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeInit, + Init: &porchapi.PackageInitTaskSpec{}, + }, + } + t.CreateF(ctx, pr) + + var pkg porchapi.PackageRevision + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: pr.Name, + }, &pkg) + + // Propose the package revision to be finalized + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed + proposeFunction := func() any { + return t.Client.Update(ctx, &pkg) + } + proposeResults := RunInParallel(proposeFunction, proposeFunction) + + assert.Contains(t, proposeResults, nil, "expected one 'propose' request to succeed, but did not happen - results: %v", proposeResults) + + conflictFailurePresent := slices.ContainsFunc(proposeResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'propose' request to fail with a conflict, but did not happen - results: %v", proposeResults) + + var proposed porchapi.PackageRevision + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: pr.Name, + }, &proposed) + + // Approve the package + proposed.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished + approveFunction := func() any { + _, err := t.Clientset.PorchV1alpha1().PackageRevisions(proposed.Namespace).UpdateApproval(ctx, proposed.Name, &proposed, metav1.UpdateOptions{}) + return err + } + approveResults := RunInParallel(approveFunction, approveFunction) + + assert.Contains(t, approveResults, nil, "expected one 'approve' request to succeed, but did not happen - results: %v", approveResults) + + conflictFailurePresent = slices.ContainsFunc(approveResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'approve' request to fail with a conflict, but did not happen - results: %v", approveResults) +} + func (t *PorchSuite) TestDeleteDraft(ctx context.Context) { const ( repository = "delete-draft" @@ -1084,7 +1137,7 @@ func (t *PorchSuite) TestConcurrentDeletes(ctx context.Context) { conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") }) - assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) t.MustNotExist(ctx, &draft) } @@ -1175,6 +1228,44 @@ func (t *PorchSuite) TestDeleteFinal(ctx context.Context) { t.MustNotExist(ctx, &pkg) } +func (t *PorchSuite) TestConcurrentProposeDeletes(ctx context.Context) { + const ( + repository = "delete-final" + packageName = "test-delete-final" + workspace = "workspace" + ) + + // Register the repository and create a draft package + t.RegisterMainGitRepositoryF(ctx, repository) + created := t.CreatePackageDraftF(ctx, repository, packageName, workspace) + // Check the package exists + var pkg porchapi.PackageRevision + t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &pkg) + + // Propose and approve the package revision to be finalized + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed + t.UpdateF(ctx, &pkg) + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished + t.UpdateApprovalF(ctx, &pkg, metav1.UpdateOptions{}) + + t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &pkg) + + // Propose deletion with two clients at once + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleDeletionProposed + proposeDeleteFunction := func() any { + _, err := t.Clientset.PorchV1alpha1().PackageRevisions(pkg.Namespace).UpdateApproval(ctx, pkg.Name, &pkg, metav1.UpdateOptions{}) + return err + } + proposeDeleteResults := RunInParallel(proposeDeleteFunction, proposeDeleteFunction) + + assert.Contains(t, proposeDeleteResults, nil, "expected one 'propose-delete' request to succeed, but did not happen - results: %v", proposeDeleteResults) + + conflictFailurePresent := slices.ContainsFunc(proposeDeleteResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'propose-delete' request to fail with a conflict, but did not happen") +} + func (t *PorchSuite) TestProposeDeleteAndUndo(ctx context.Context) { const ( repository = "test-propose-delete-and-undo" @@ -1548,7 +1639,63 @@ func (t *PorchSuite) TestPackageUpdate(ctx context.Context) { if _, found := revisionResources.Spec.Resources["resourcequota.yaml"]; !found { t.Errorf("Updated package should contain 'resourcequota.yaml` file") } +} + +func (t *PorchSuite) TestConcurrentPackageUpdates(ctx context.Context) { + const ( + gitRepository = "package-update" + packageName = "testns" + workspace = "test-workspace" + ) + + t.RegisterGitRepositoryF(ctx, testBlueprintsRepo, "test-blueprints", "") + + var list porchapi.PackageRevisionList + t.ListE(ctx, &list, client.InNamespace(t.Namespace)) + + basensV1 := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v1"}) + basensV2 := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v2"}) + + // Register the repository as 'downstream' + t.RegisterMainGitRepositoryF(ctx, gitRepository) + + // Create PackageRevision from upstream repo + pr := t.CreatePackageSkeleton(gitRepository, packageName, workspace) + pr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeClone, + Clone: &porchapi.PackageCloneTaskSpec{ + Upstream: porchapi.UpstreamPackage{ + UpstreamRef: &porchapi.PackageRevisionRef{ + Name: basensV1.Name, + }, + }, + }, + }, + } + t.CreateF(ctx, pr) + + upstream := pr.Spec.Tasks[0].Clone.Upstream.DeepCopy() + upstream.UpstreamRef.Name = basensV2.Name + pr.Spec.Tasks = append(pr.Spec.Tasks, porchapi.Task{ + Type: porchapi.TaskTypeUpdate, + Update: &porchapi.PackageUpdateTaskSpec{ + Upstream: *upstream, + }, + }) + + // Two clients at the same time try to update the downstream package + cloneFunction := func() any { + return t.Client.Update(ctx, pr, &client.UpdateOptions{}) + } + results := RunInParallel(cloneFunction, cloneFunction) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) } func (t *PorchSuite) TestRegisterRepository(ctx context.Context) { From 5b848be9eafc74e514f7217ca1edf8889c8e60a8 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Mon, 30 Sep 2024 22:34:57 +0100 Subject: [PATCH 04/11] Issue #657 - session conflict handling - fixed compilation error from conflict debris accidentally pushed https://github.com/nephio-project/nephio/issues/657 --- test/e2e/suite_utils.go | 40 ---------------------------------------- 1 file changed, 40 deletions(-) diff --git a/test/e2e/suite_utils.go b/test/e2e/suite_utils.go index d22d526c..7fc612ea 100644 --- a/test/e2e/suite_utils.go +++ b/test/e2e/suite_utils.go @@ -371,47 +371,7 @@ func (t *TestSuite) WaitUntilRepositoryDeleted(ctx context.Context, name, namesp } } -<<<<<<< HEAD:test/e2e/suite_utils.go -func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { -======= -<<<<<<< HEAD:test/e2e/e2e_utils_test.go -func (t *TestSuite) waitUntilAllPackagesDeleted(ctx context.Context, repoName string) { -======= -<<<<<<< HEAD:test/e2e/suite_utils.go -func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { -======= -<<<<<<< HEAD:test/e2e/e2e_utils_test.go -func (t *TestSuite) waitUntilAllPackagesDeleted(ctx context.Context, repoName string) { -======= -<<<<<<< HEAD:test/e2e/suite_utils.go -<<<<<<< HEAD:test/e2e/suite_utils.go -func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { -======= -<<<<<<< HEAD:test/e2e/e2e_utils_test.go -func (t *TestSuite) waitUntilAllPackagesDeleted(ctx context.Context, repoName string) { -======= func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string, namespace string) { ->>>>>>> 2ed49bf (Issue #657 - session conflict handling):test/e2e/suite_utils.go ->>>>>>> 02825a0 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go -<<<<<<< HEAD:test/e2e/e2e_utils_test.go ->>>>>>> 9835dc1 (Issue #657 - session conflict handling):test/e2e/suite_utils.go -<<<<<<< HEAD:test/e2e/suite_utils.go ->>>>>>> 021ed5a (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go -<<<<<<< HEAD:test/e2e/e2e_utils_test.go ->>>>>>> aadf3eb (Issue #657 - session conflict handling):test/e2e/suite_utils.go -<<<<<<< HEAD:test/e2e/suite_utils.go ->>>>>>> 6983ea7 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go -======= -======= -======= -======= -======= -func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string, namespace string) { ->>>>>>> 90101d4 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go ->>>>>>> 5286ef6 (Issue #657 - session conflict handling):test/e2e/suite_utils.go ->>>>>>> 7ed9d28 (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go ->>>>>>> a9e3581 (Issue #657 - session conflict handling):test/e2e/suite_utils.go ->>>>>>> 2bf829c (Issue #657 - session conflict handling):test/e2e/e2e_utils_test.go t.Helper() err := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (done bool, err error) { t.Helper() From 112c3ee9c7aa70eea8e06fecd0e0b06a0075c1e5 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Tue, 1 Oct 2024 09:13:07 +0100 Subject: [PATCH 05/11] Issue #657 - session conflict handling - deduplicated package names in new tests https://github.com/nephio-project/nephio/issues/657 --- test/e2e/e2e_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 6e06d626..62939310 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -1011,7 +1011,7 @@ func (t *PorchSuite) TestProposeApprove(ctx context.Context) { func (t *PorchSuite) TestConcurrentProposeApprove(ctx context.Context) { const ( repository = "lifecycle" - packageName = "test-package" + packageName = "test-package-concurrent" workspace = "workspace" ) @@ -1231,7 +1231,7 @@ func (t *PorchSuite) TestDeleteFinal(ctx context.Context) { func (t *PorchSuite) TestConcurrentProposeDeletes(ctx context.Context) { const ( repository = "delete-final" - packageName = "test-delete-final" + packageName = "test-delete-final-concurrent" workspace = "workspace" ) @@ -1644,7 +1644,7 @@ func (t *PorchSuite) TestPackageUpdate(ctx context.Context) { func (t *PorchSuite) TestConcurrentPackageUpdates(ctx context.Context) { const ( gitRepository = "package-update" - packageName = "testns" + packageName = "testns-concurrent" workspace = "test-workspace" ) From 425ef8d7a2cde266a8f7c1c1b45848b2d09025b1 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Tue, 1 Oct 2024 19:15:34 +0100 Subject: [PATCH 06/11] Issue #657 - session conflict handling - last bit of mutex-based handling - PackageRevisionResources.Update operation https://github.com/nephio-project/nephio/issues/657 --- pkg/registry/porch/packagerevision.go | 9 ++--- .../porch/packagerevisionresources.go | 16 ++++++-- test/e2e/e2e_test.go | 38 ++++++++++++++++++- 3 files changed, 53 insertions(+), 10 deletions(-) diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index ab103b4d..231b6a01 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -39,7 +39,7 @@ var mutexMapMutex sync.Mutex var pkgRevOperationMutexes = map[string]*sync.Mutex{} const ( - CreateConflictErrorMsg = "another request is already in progress to create %s with details %s" + CreateConflictErrorMsg = "another request is already in progress to create package revision with details namespace=%q, repository=%q, package=%q,workspace=%q" GenericConflictErrorMsg = "another request is already in progress on %s \"%s\"" ) @@ -186,13 +186,12 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj return nil, apierrors.NewConflict( api.Resource("packagerevisions"), - fmt.Sprintf("(new creation)"), - fmt.Errorf(CreateConflictErrorMsg, "package revision", fmt.Sprintf( - "namespace=%s, repository=%s, package=%s,workspace=%s", + "(new creation)", + fmt.Errorf(CreateConflictErrorMsg, newApiPkgRev.Namespace, newApiPkgRev.Spec.RepositoryName, newApiPkgRev.Spec.PackageName, - newApiPkgRev.Spec.WorkspaceName))) + newApiPkgRev.Spec.WorkspaceName)) } defer packageMutex.Unlock() diff --git a/pkg/registry/porch/packagerevisionresources.go b/pkg/registry/porch/packagerevisionresources.go index e19135c1..88395758 100644 --- a/pkg/registry/porch/packagerevisionresources.go +++ b/pkg/registry/porch/packagerevisionresources.go @@ -46,13 +46,11 @@ var _ rest.Scoper = &packageRevisionResources{} var _ rest.Updater = &packageRevisionResources{} var _ rest.SingularNameProvider = &packageRevisionResources{} - // GetSingularName implements the SingularNameProvider interface -func (r *packageRevisionResources) GetSingularName() (string) { +func (r *packageRevisionResources) GetSingularName() string { return "packagerevisionresources" } - func (r *packageRevisionResources) New() runtime.Object { return &api.PackageRevisionResources{} } @@ -127,6 +125,18 @@ func (r *packageRevisionResources) Update(ctx context.Context, name string, objI return nil, false, apierrors.NewBadRequest("namespace must be specified") } + packageMutexKey := fmt.Sprintf("%s/%s", ns, name) + packageMutex := getMutexForPackage(packageMutexKey) + lockAcquired := packageMutex.TryLock() + if !lockAcquired { + return nil, false, + apierrors.NewConflict( + api.Resource("packagerevisionresources"), + name, + fmt.Errorf(GenericConflictErrorMsg, "package revision resources", packageMutexKey)) + } + defer packageMutex.Unlock() + oldRepoPkgRev, err := r.packageCommon.getRepoPkgRev(ctx, name) if err != nil { return nil, false, err diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 62939310..a38c632c 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -847,6 +847,40 @@ func (t *PorchSuite) TestUpdateResourcesEmptyPatch(ctx context.Context) { assert.True(t, reflect.DeepEqual(tasksBeforeUpdate, tasksAfterUpdate)) } +func (t *PorchSuite) TestConcurrentResourceUpdates(ctx context.Context) { + const ( + repository = "concurrent-update-test" + packageName = "simple-package" + workspace = "workspace" + ) + + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + t.CreateF(ctx, pr) + + // Get the package resources + var newPackageResources porchapi.PackageRevisionResources + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: pr.Name, + }, &newPackageResources) + + // "Update" the package resources with two clients at the same time + updateFunction := func() any { + return t.Client.Update(ctx, &newPackageResources) + } + results := RunInParallel(updateFunction, updateFunction) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) +} + func (t *PorchSuite) TestFunctionRepository(ctx context.Context) { repo := &configapi.Repository{ ObjectMeta: metav1.ObjectMeta{ @@ -1685,10 +1719,10 @@ func (t *PorchSuite) TestConcurrentPackageUpdates(ctx context.Context) { }) // Two clients at the same time try to update the downstream package - cloneFunction := func() any { + updateFunction := func() any { return t.Client.Update(ctx, pr, &client.UpdateOptions{}) } - results := RunInParallel(cloneFunction, cloneFunction) + results := RunInParallel(updateFunction, updateFunction) assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) From a30818a5720b8317fdecaed44bae215c3aed0202 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Thu, 3 Oct 2024 10:29:13 +0100 Subject: [PATCH 07/11] Issue #657 - session conflict handling - backed out launch.json changes - abbreviated some variable names https://github.com/nephio-project/nephio/issues/657 --- .vscode/launch.json | 26 +++------------ pkg/registry/porch/packagecommon.go | 13 ++++---- pkg/registry/porch/packagerevision.go | 32 +++++++++---------- .../porch/packagerevisionresources.go | 12 +++---- test/e2e/e2e_test.go | 12 +++---- 5 files changed, 39 insertions(+), 56 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index c26fcad6..908e4d51 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -14,14 +14,16 @@ "--secure-port=4443", "--kubeconfig=${env:KUBECONFIG}", "--cache-directory=${workspaceFolder}/.cache", - "--function-runner=${env:FUNCTION_RUNNER_IP}:9445", + "--function-runner=172.18.255.201:9445", "--repo-sync-frequency=60s" ], "cwd": "${workspaceFolder}", "env": { "CERT_STORAGE_DIR": "${workspaceFolder}/.build/pki/tmp", "WEBHOOK_HOST": "localhost", - "GOOGLE_API_GO_EXPERIMENTAL_DISABLE_NEW_AUTH_LIB": "true" + "GOOGLE_API_GO_EXPERIMENTAL_DISABLE_NEW_AUTH_LIB": "true", + "OTEL_SERVICE_NAME": "porch-server", + "OTEL": "otel://localhost:4317" } }, { @@ -36,17 +38,6 @@ "ENABLE_PACKAGEVARIANTSETS": "true" } }, - { - "name": "Run Porchctl command", - "type": "go", - "request": "launch", - "mode": "auto", - "program": "${workspaceFolder}/cmd/porchctl/main.go", - "args": [ - "-n", "porch", "rpkg", "approve", "blueprints-835222fccff6e9e042e3b01b8c554f394d6e55d1" - ], - "cwd": "${workspaceFolder}" - }, { "name": "Launch E2E tests", "type": "go", @@ -56,7 +47,7 @@ "args": [ "-test.v", "-test.run", - "TestE2E/PorchSuite/TestConcurrentDeletes" + "TestE2E/PorchSuite/TestGitRepositoryWithReleaseTagsAndDirectory" ], "env": { "E2E": "1"} }, @@ -74,12 +65,5 @@ "namespace=foo" ] } - ], - "compounds": [ - { - "name": "Launch Server and Controllers", - "configurations": ["Launch Server", "Launch Controllers"], - "stopAll": true - } ] } \ No newline at end of file diff --git a/pkg/registry/porch/packagecommon.go b/pkg/registry/porch/packagecommon.go index 25875ff5..794b6763 100644 --- a/pkg/registry/porch/packagecommon.go +++ b/pkg/registry/porch/packagecommon.go @@ -210,22 +210,21 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string, return nil, false, apierrors.NewBadRequest("namespace must be specified") } - packageMutexKey := fmt.Sprintf("%s/%s", ns, name) - packageMutex := getMutexForPackage(packageMutexKey) + pkgMutexKey := fmt.Sprintf("%s/%s", ns, name) + pkgMutex := getMutexForPackage(pkgMutexKey) - lockAcquired := packageMutex.TryLock() - if !lockAcquired { + locked := pkgMutex.TryLock() + if !locked { return nil, false, apierrors.NewConflict( api.Resource("packagerevisions"), name, - fmt.Errorf(GenericConflictErrorMsg, "package revision", packageMutexKey)) + fmt.Errorf(GenericConflictErrorMsg, "package revision", pkgMutexKey)) } - defer packageMutex.Unlock() + defer pkgMutex.Unlock() // isCreate tracks whether this is an update that creates an object (this happens in server-side apply) isCreate := false - oldRepoPkgRev, err := r.getRepoPkgRev(ctx, name) if err != nil { if forceAllowCreate && apierrors.IsNotFound(err) { diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index 231b6a01..526723bc 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -173,16 +173,16 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj parentPackage = p } - packageMutexKey := fmt.Sprintf("%s-%s-%s-%s", + pkgMutexKey := fmt.Sprintf("%s-%s-%s-%s", newApiPkgRev.Namespace, newApiPkgRev.Spec.RepositoryName, newApiPkgRev.Spec.PackageName, newApiPkgRev.Spec.WorkspaceName) - packageMutex := getMutexForPackage(packageMutexKey) + pkgMutex := getMutexForPackage(pkgMutexKey) - lockAcquired := packageMutex.TryLock() - if !lockAcquired { + locked := pkgMutex.TryLock() + if !locked { return nil, apierrors.NewConflict( api.Resource("packagerevisions"), @@ -193,7 +193,7 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj newApiPkgRev.Spec.PackageName, newApiPkgRev.Spec.WorkspaceName)) } - defer packageMutex.Unlock() + defer pkgMutex.Unlock() createdRepoPkgRev, err := r.cad.CreatePackageRevision(ctx, repositoryObj, newApiPkgRev, parentPackage) if err != nil { @@ -256,18 +256,18 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida return nil, false, err } - packageMutexKey := fmt.Sprintf("%s/%s", ns, name) - packageMutex := getMutexForPackage(packageMutexKey) + pkgMutexKey := fmt.Sprintf("%s/%s", ns, name) + pkgMutex := getMutexForPackage(pkgMutexKey) - lockAcquired := packageMutex.TryLock() - if !lockAcquired { + locked := pkgMutex.TryLock() + if !locked { return nil, false, apierrors.NewConflict( api.Resource("packagerevisions"), name, - fmt.Errorf(GenericConflictErrorMsg, "package revision", packageMutexKey)) + fmt.Errorf(GenericConflictErrorMsg, "package revision", pkgMutexKey)) } - defer packageMutex.Unlock() + defer pkgMutex.Unlock() if err := r.cad.DeletePackageRevision(ctx, repositoryObj, repoPkgRev); err != nil { return nil, false, apierrors.NewInternalError(err) @@ -277,13 +277,13 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida return apiPkgRev, true, nil } -func getMutexForPackage(packageMutexKey string) *sync.Mutex { +func getMutexForPackage(pkgMutexKey string) *sync.Mutex { mutexMapMutex.Lock() defer mutexMapMutex.Unlock() - packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] + pkgMutex, alreadyPresent := pkgRevOperationMutexes[pkgMutexKey] if !alreadyPresent { - packageMutex = &sync.Mutex{} - pkgRevOperationMutexes[packageMutexKey] = packageMutex + pkgMutex = &sync.Mutex{} + pkgRevOperationMutexes[pkgMutexKey] = pkgMutex } - return packageMutex + return pkgMutex } diff --git a/pkg/registry/porch/packagerevisionresources.go b/pkg/registry/porch/packagerevisionresources.go index 88395758..400671f4 100644 --- a/pkg/registry/porch/packagerevisionresources.go +++ b/pkg/registry/porch/packagerevisionresources.go @@ -125,17 +125,17 @@ func (r *packageRevisionResources) Update(ctx context.Context, name string, objI return nil, false, apierrors.NewBadRequest("namespace must be specified") } - packageMutexKey := fmt.Sprintf("%s/%s", ns, name) - packageMutex := getMutexForPackage(packageMutexKey) - lockAcquired := packageMutex.TryLock() - if !lockAcquired { + pkgMutexKey := fmt.Sprintf("%s/%s", ns, name) + pkgMutex := getMutexForPackage(pkgMutexKey) + locked := pkgMutex.TryLock() + if !locked { return nil, false, apierrors.NewConflict( api.Resource("packagerevisionresources"), name, - fmt.Errorf(GenericConflictErrorMsg, "package revision resources", packageMutexKey)) + fmt.Errorf(GenericConflictErrorMsg, "package revision resources", pkgMutexKey)) } - defer packageMutex.Unlock() + defer pkgMutex.Unlock() oldRepoPkgRev, err := r.packageCommon.getRepoPkgRev(ctx, name) if err != nil { diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index a38c632c..c18325a0 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -688,12 +688,12 @@ func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) { } // Two clients try to create the new package at the same time - editOperation := func() any { + editFunction := func() any { return t.Client.Create(ctx, editPR) } results := RunInParallel( - editOperation, - editOperation) + editFunction, + editFunction) assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) @@ -1150,7 +1150,7 @@ func (t *PorchSuite) TestConcurrentDeletes(ctx context.Context) { t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &draft) // Delete the same package with two clients at the same time - deleteOperation := func() any { + deleteFunction := func() any { return t.Client.Delete(ctx, &porchapi.PackageRevision{ ObjectMeta: metav1.ObjectMeta{ Namespace: t.Namespace, @@ -1159,8 +1159,8 @@ func (t *PorchSuite) TestConcurrentDeletes(ctx context.Context) { }) } results := RunInParallel( - deleteOperation, - deleteOperation) + deleteFunction, + deleteFunction) expectedResultCount := 2 actualResultCount := len(results) From 50a99f64acc0594265a9a642598296153620f2c4 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Thu, 3 Oct 2024 10:46:18 +0100 Subject: [PATCH 08/11] Issue #657 - session conflict handling - backed out remaining launch.json changes https://github.com/nephio-project/nephio/issues/657 --- .vscode/launch.json | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 908e4d51..a72ff53f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -14,16 +14,14 @@ "--secure-port=4443", "--kubeconfig=${env:KUBECONFIG}", "--cache-directory=${workspaceFolder}/.cache", - "--function-runner=172.18.255.201:9445", + "--function-runner=${env:FUNCTION_RUNNER_IP}:9445", "--repo-sync-frequency=60s" ], "cwd": "${workspaceFolder}", "env": { "CERT_STORAGE_DIR": "${workspaceFolder}/.build/pki/tmp", "WEBHOOK_HOST": "localhost", - "GOOGLE_API_GO_EXPERIMENTAL_DISABLE_NEW_AUTH_LIB": "true", - "OTEL_SERVICE_NAME": "porch-server", - "OTEL": "otel://localhost:4317" + "GOOGLE_API_GO_EXPERIMENTAL_DISABLE_NEW_AUTH_LIB": "true" } }, { @@ -39,7 +37,7 @@ } }, { - "name": "Launch E2E tests", + "name": "Launch test function", "type": "go", "request": "launch", "mode": "test", From 03ee3e14ef3d4621e7a1e8490ea8b7741bc8b99b Mon Sep 17 00:00:00 2001 From: ezmcdja Date: Tue, 8 Oct 2024 21:50:38 +0100 Subject: [PATCH 09/11] Issue #657 - PR comment changes - moved package mutexes into packagecommon.go - reworked construction of "request already in progress" error messages - replaced cachedRepository polling with retrying the PackageRev update to allow PackageRev deletion without redundant polling https://github.com/nephio-project/nephio/issues/657 --- pkg/cache/repository.go | 3 +-- pkg/meta/store.go | 26 ++++++++++++--------- pkg/registry/porch/packagecommon.go | 19 +++++++++++++++ pkg/registry/porch/packagerevision.go | 33 +++++++-------------------- test/e2e/e2e_test.go | 2 +- test/e2e/suite_utils.go | 4 ++-- 6 files changed, 46 insertions(+), 41 deletions(-) diff --git a/pkg/cache/repository.go b/pkg/cache/repository.go index 55f39b2c..80649934 100644 --- a/pkg/cache/repository.go +++ b/pkg/cache/repository.go @@ -319,7 +319,6 @@ func (r *cachedRepository) DeletePackage(ctx context.Context, old repository.Pac } func (r *cachedRepository) Close() error { - r.pollOnce(context.TODO()) r.cancel() // Make sure that watch events are sent for packagerevisions that are @@ -339,7 +338,7 @@ func (r *cachedRepository) Close() error { // There isn't much use in returning an error here, so we just log it // and create a PackageRevisionMeta with just name and namespace. This // makes sure that the Delete event is sent. - klog.Warningf("Error deleting PackageRev CR for %s: %v", nn.Name, err) + klog.Warningf("repo %s: error deleting packagerev for %s: %v", r.id, nn.Name, err) pkgRevMeta = meta.PackageRevisionMeta{ Name: nn.Name, Namespace: nn.Namespace, diff --git a/pkg/meta/store.go b/pkg/meta/store.go index 6ff13ae1..bfe2bf04 100644 --- a/pkg/meta/store.go +++ b/pkg/meta/store.go @@ -25,6 +25,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -194,21 +195,24 @@ func (c *crdMetadataStore) Delete(ctx context.Context, namespacedName types.Name defer span.End() var internalPkgRev internalapi.PackageRev - err := c.coreClient.Get(ctx, namespacedName, &internalPkgRev) - if err != nil { - return PackageRevisionMeta{}, err - } + retriedErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := c.coreClient.Get(ctx, namespacedName, &internalPkgRev) + if err != nil { + return err + } - if clearFinalizers { - internalPkgRev.Finalizers = []string{} - if err = c.coreClient.Update(ctx, &internalPkgRev); err != nil { - return PackageRevisionMeta{}, err + if clearFinalizers { + internalPkgRev.Finalizers = []string{} + if err = c.coreClient.Update(ctx, &internalPkgRev); err != nil { + return err + } } - } + return nil + }) klog.Infof("Deleting packagerev %s/%s", internalPkgRev.Namespace, internalPkgRev.Name) - if err := c.coreClient.Delete(ctx, &internalPkgRev); err != nil { - return PackageRevisionMeta{}, err + if retriedErr = c.coreClient.Delete(ctx, &internalPkgRev); retriedErr != nil { + return PackageRevisionMeta{}, retriedErr } return toPackageRevisionMeta(&internalPkgRev), nil } diff --git a/pkg/registry/porch/packagecommon.go b/pkg/registry/porch/packagecommon.go index 794b6763..3ecce293 100644 --- a/pkg/registry/porch/packagecommon.go +++ b/pkg/registry/porch/packagecommon.go @@ -17,6 +17,7 @@ package porch import ( "context" "fmt" + "sync" unversionedapi "github.com/nephio-project/porch/api/porch" api "github.com/nephio-project/porch/api/porch/v1alpha1" @@ -34,6 +35,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +const ConflictErrorMsgBase = "another request is already in progress %s" + +var GenericConflictErrorMsg = fmt.Sprintf(ConflictErrorMsgBase, "on %s \"%s\"") + +var mutexMapMutex sync.Mutex +var pkgRevOperationMutexes = map[string]*sync.Mutex{} + type packageCommon struct { // scheme holds our scheme, for type conversions etc scheme *runtime.Scheme @@ -481,3 +489,14 @@ func (r *packageCommon) validateUpdate(ctx context.Context, newRuntimeObj runtim r.updateStrategy.Canonicalize(newRuntimeObj) return nil } + +func getMutexForPackage(pkgMutexKey string) *sync.Mutex { + mutexMapMutex.Lock() + defer mutexMapMutex.Unlock() + pkgMutex, alreadyPresent := pkgRevOperationMutexes[pkgMutexKey] + if !alreadyPresent { + pkgMutex = &sync.Mutex{} + pkgRevOperationMutexes[pkgMutexKey] = pkgMutex + } + return pkgMutex +} diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index 526723bc..d8be2d24 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -17,7 +17,6 @@ package porch import ( "context" "fmt" - "sync" api "github.com/nephio-project/porch/api/porch/v1alpha1" "github.com/nephio-project/porch/pkg/engine" @@ -35,14 +34,6 @@ import ( var tracer = otel.Tracer("apiserver") -var mutexMapMutex sync.Mutex -var pkgRevOperationMutexes = map[string]*sync.Mutex{} - -const ( - CreateConflictErrorMsg = "another request is already in progress to create package revision with details namespace=%q, repository=%q, package=%q,workspace=%q" - GenericConflictErrorMsg = "another request is already in progress on %s \"%s\"" -) - type packageRevisions struct { packageCommon rest.TableConvertor @@ -183,15 +174,18 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj locked := pkgMutex.TryLock() if !locked { + + conflictError := fmt.Errorf( + fmt.Sprintf(ConflictErrorMsgBase, "to create package revision with details namespace=%q, repository=%q, package=%q,workspace=%q"), + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName) return nil, apierrors.NewConflict( api.Resource("packagerevisions"), "(new creation)", - fmt.Errorf(CreateConflictErrorMsg, - newApiPkgRev.Namespace, - newApiPkgRev.Spec.RepositoryName, - newApiPkgRev.Spec.PackageName, - newApiPkgRev.Spec.WorkspaceName)) + conflictError) } defer pkgMutex.Unlock() @@ -276,14 +270,3 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida // TODO: Should we do an async delete? return apiPkgRev, true, nil } - -func getMutexForPackage(pkgMutexKey string) *sync.Mutex { - mutexMapMutex.Lock() - defer mutexMapMutex.Unlock() - pkgMutex, alreadyPresent := pkgRevOperationMutexes[pkgMutexKey] - if !alreadyPresent { - pkgMutex = &sync.Mutex{} - pkgRevOperationMutexes[pkgMutexKey] = pkgMutex - } - return pkgMutex -} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index c18325a0..d939a6d4 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -340,7 +340,7 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { func (t *PorchSuite) TestConcurrentInits(ctx context.Context) { // Create a new package via init, no task specified const ( - repository = "git" + repository = "git-concurrent" packageName = "empty-package-concurrent" revision = "v1" workspace = "test-workspace" diff --git a/test/e2e/suite_utils.go b/test/e2e/suite_utils.go index 7fc612ea..def36972 100644 --- a/test/e2e/suite_utils.go +++ b/test/e2e/suite_utils.go @@ -262,7 +262,7 @@ func (t *TestSuite) CreatePackageDraftF(ctx context.Context, repository, package return pr } -func (t *TestSuite) CreatePackageSkeleton(repository, packageName, workspace string) *porchapi.PackageRevision { +func (t *TestSuite) CreatePackageSkeleton(repoName, packageName, workspace string) *porchapi.PackageRevision { return &porchapi.PackageRevision{ TypeMeta: metav1.TypeMeta{ Kind: "PackageRevision", @@ -274,7 +274,7 @@ func (t *TestSuite) CreatePackageSkeleton(repository, packageName, workspace str Spec: porchapi.PackageRevisionSpec{ PackageName: packageName, WorkspaceName: porchapi.WorkspaceName(workspace), - RepositoryName: repository, + RepositoryName: repoName, // empty tasks list - set them as needed in the particular usage Tasks: []porchapi.Task{}, }, From 08d2babd6822503db8e3283dbb23b251bb5b7a11 Mon Sep 17 00:00:00 2001 From: ezmcdja Date: Wed, 9 Oct 2024 09:51:04 +0100 Subject: [PATCH 10/11] Issue #657 - extract functions to assemble package mutex keys - needed a separate one in packagerevision.go for the Create() - since we're putting that key together from parts as yet uncombined into a proper PackageRevision name - also extracted conflict error message assembly for the Create operation https://github.com/nephio-project/nephio/issues/657 --- pkg/registry/porch/packagecommon.go | 6 ++- pkg/registry/porch/packagerevision.go | 39 ++++++++++++------- .../porch/packagerevisionresources.go | 2 +- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/pkg/registry/porch/packagecommon.go b/pkg/registry/porch/packagecommon.go index 3ecce293..9a6c6254 100644 --- a/pkg/registry/porch/packagecommon.go +++ b/pkg/registry/porch/packagecommon.go @@ -218,7 +218,7 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string, return nil, false, apierrors.NewBadRequest("namespace must be specified") } - pkgMutexKey := fmt.Sprintf("%s/%s", ns, name) + pkgMutexKey := getPackageMutexKey(ns, name) pkgMutex := getMutexForPackage(pkgMutexKey) locked := pkgMutex.TryLock() @@ -490,6 +490,10 @@ func (r *packageCommon) validateUpdate(ctx context.Context, newRuntimeObj runtim return nil } +func getPackageMutexKey(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + func getMutexForPackage(pkgMutexKey string) *sync.Mutex { mutexMapMutex.Lock() defer mutexMapMutex.Unlock() diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index d8be2d24..dc4e7cbd 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -164,23 +164,12 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj parentPackage = p } - pkgMutexKey := fmt.Sprintf("%s-%s-%s-%s", - newApiPkgRev.Namespace, - newApiPkgRev.Spec.RepositoryName, - newApiPkgRev.Spec.PackageName, - newApiPkgRev.Spec.WorkspaceName) - + pkgMutexKey := uncreatedPackageMutexKey(newApiPkgRev) pkgMutex := getMutexForPackage(pkgMutexKey) locked := pkgMutex.TryLock() if !locked { - - conflictError := fmt.Errorf( - fmt.Sprintf(ConflictErrorMsgBase, "to create package revision with details namespace=%q, repository=%q, package=%q,workspace=%q"), - newApiPkgRev.Namespace, - newApiPkgRev.Spec.RepositoryName, - newApiPkgRev.Spec.PackageName, - newApiPkgRev.Spec.WorkspaceName) + conflictError := creationConflictError(newApiPkgRev) return nil, apierrors.NewConflict( api.Resource("packagerevisions"), @@ -250,7 +239,7 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida return nil, false, err } - pkgMutexKey := fmt.Sprintf("%s/%s", ns, name) + pkgMutexKey := getPackageMutexKey(ns, name) pkgMutex := getMutexForPackage(pkgMutexKey) locked := pkgMutex.TryLock() @@ -270,3 +259,25 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida // TODO: Should we do an async delete? return apiPkgRev, true, nil } + +func uncreatedPackageMutexKey(newApiPkgRev *api.PackageRevision) string { + return fmt.Sprintf("%s-%s-%s-%s", + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName, + ) +} + +func creationConflictError(newApiPkgRev *api.PackageRevision) error { + return fmt.Errorf( + fmt.Sprintf( + ConflictErrorMsgBase, + "to create package revision with details namespace=%q, repository=%q, package=%q,workspace=%q", + ), + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName, + ) +} diff --git a/pkg/registry/porch/packagerevisionresources.go b/pkg/registry/porch/packagerevisionresources.go index 400671f4..4f71a443 100644 --- a/pkg/registry/porch/packagerevisionresources.go +++ b/pkg/registry/porch/packagerevisionresources.go @@ -125,7 +125,7 @@ func (r *packageRevisionResources) Update(ctx context.Context, name string, objI return nil, false, apierrors.NewBadRequest("namespace must be specified") } - pkgMutexKey := fmt.Sprintf("%s/%s", ns, name) + pkgMutexKey := getPackageMutexKey(ns, name) pkgMutex := getMutexForPackage(pkgMutexKey) locked := pkgMutex.TryLock() if !locked { From 5fee3d139edcceb47e495acbd6509f074af6864f Mon Sep 17 00:00:00 2001 From: James McDermott Date: Thu, 10 Oct 2024 12:16:47 +0100 Subject: [PATCH 11/11] Issue #657 - return error from retried PackageRev metadata update - extra return in crdMetadataStore.Delete() is case of retry exhaustion and an error from the RetryOnConflict block https://github.com/nephio-project/nephio/issues/657 --- pkg/meta/store.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/meta/store.go b/pkg/meta/store.go index bfe2bf04..bbb6e402 100644 --- a/pkg/meta/store.go +++ b/pkg/meta/store.go @@ -209,10 +209,13 @@ func (c *crdMetadataStore) Delete(ctx context.Context, namespacedName types.Name } return nil }) + if retriedErr != nil { + return PackageRevisionMeta{}, retriedErr + } klog.Infof("Deleting packagerev %s/%s", internalPkgRev.Namespace, internalPkgRev.Name) - if retriedErr = c.coreClient.Delete(ctx, &internalPkgRev); retriedErr != nil { - return PackageRevisionMeta{}, retriedErr + if err := c.coreClient.Delete(ctx, &internalPkgRev); err != nil { + return PackageRevisionMeta{}, err } return toPackageRevisionMeta(&internalPkgRev), nil }