Skip to content

Commit

Permalink
Merge pull request #145 from tonytheleg/ARO-7410-add-sub-data-to-cosmos
Browse files Browse the repository at this point in the history
adds db calls for subscriptions
  • Loading branch information
mbarnes authored May 29, 2024
2 parents aa1394a + a6fc6c4 commit 436907c
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 48 deletions.
86 changes: 71 additions & 15 deletions frontend/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"context"
"encoding/json"
"fmt"
"errors"
"os"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand All @@ -12,8 +12,13 @@ import (

const (
clustersContainer = "Clusters"
subsContainer = "Subscriptions"
billingContainer = "Billing"
asyncContainer = "AsyncOperations"
)

var ErrNotFound = errors.New("DocumentNotFound")

// DBClient defines the needed values to perform CRUD operations against the async DB
type DBClient struct {
client *azcosmos.Client
Expand Down Expand Up @@ -75,11 +80,11 @@ func (d *DBClient) DBConnectionTest(ctx context.Context) (string, error) {
return result.DatabaseProperties.ID, nil
}

// GetCluster retreives a cluster document from async DB using resource ID
func (d *DBClient) GetClusterDoc(ctx context.Context, resourceID string, partitionKey string) (*HCPOpenShiftClusterDocument, bool, error) {
// GetClusterDoc retreives a cluster document from async DB using resource ID
func (d *DBClient) GetClusterDoc(ctx context.Context, resourceID string, partitionKey string) (*HCPOpenShiftClusterDocument, error) {
container, err := d.client.NewContainer(d.config.DBName, clustersContainer)
if err != nil {
return nil, false, err
return nil, err
}

query := "SELECT * FROM c WHERE c.key = @key"
Expand All @@ -95,24 +100,23 @@ func (d *DBClient) GetClusterDoc(ctx context.Context, resourceID string, partiti
for queryPager.More() {
queryResponse, err := queryPager.NextPage(ctx)
if err != nil {
return nil, false, err
return nil, err
}

for _, item := range queryResponse.Items {
err = json.Unmarshal(item, &doc)
if err != nil {
return nil, false, err
return nil, err
}
}
}
if doc != nil {
return doc, true, nil
return doc, nil
}
return nil, false, nil

return nil, ErrNotFound
}

// SetCluster creates/updates a cluster document in the async DB during cluster creation/patching
// SetClusterDoc creates/updates a cluster document in the async DB during cluster creation/patching
func (d *DBClient) SetClusterDoc(ctx context.Context, doc *HCPOpenShiftClusterDocument) error {
data, err := json.Marshal(doc)
if err != nil {
Expand All @@ -132,12 +136,9 @@ func (d *DBClient) SetClusterDoc(ctx context.Context, doc *HCPOpenShiftClusterDo
return nil
}

// DeleteCluster removes a cluter document from the async DB using resource ID
// DeleteClusterDoc removes a cluter document from the async DB using resource ID
func (d *DBClient) DeleteClusterDoc(ctx context.Context, resourceID string, partitionKey string) error {
doc, found, err := d.GetClusterDoc(ctx, resourceID, partitionKey)
if !found {
return fmt.Errorf("document with key %s not found", partitionKey)
}
doc, err := d.GetClusterDoc(ctx, resourceID, partitionKey)
if err != nil {
return err
}
Expand All @@ -153,3 +154,58 @@ func (d *DBClient) DeleteClusterDoc(ctx context.Context, resourceID string, part
}
return nil
}

// GetSubscriptionDoc retreives a subscription document from async DB using the subscription ID
func (d *DBClient) GetSubscriptionDoc(ctx context.Context, partitionKey string) (*SubscriptionDocument, error) {
container, err := d.client.NewContainer(d.config.DBName, subsContainer)
if err != nil {
return nil, err
}

query := "SELECT * FROM c WHERE c.partitionKey = @partitionKey"
opt := azcosmos.QueryOptions{
PageSizeHint: 1,
QueryParameters: []azcosmos.QueryParameter{{Name: "@partitionKey", Value: partitionKey}},
}

pk := azcosmos.NewPartitionKeyString(partitionKey)
queryPager := container.NewQueryItemsPager(query, pk, &opt)

var doc *SubscriptionDocument
for queryPager.More() {
queryResponse, err := queryPager.NextPage(ctx)
if err != nil {
return nil, err
}

for _, item := range queryResponse.Items {
err = json.Unmarshal(item, &doc)
if err != nil {
return nil, err
}
}
}
if doc != nil {
return doc, nil
}
return nil, ErrNotFound
}

// SetClusterDoc creates/updates a subscription document in the async DB during cluster creation/patching
func (d *DBClient) SetSubscriptionDoc(ctx context.Context, doc *SubscriptionDocument) error {
data, err := json.Marshal(doc)
if err != nil {
return err
}

container, err := d.client.NewContainer(d.config.DBName, subsContainer)
if err != nil {
return err
}

_, err = container.UpsertItem(ctx, azcosmos.NewPartitionKeyString(doc.PartitionKey), data, nil)
if err != nil {
return err
}
return nil
}
32 changes: 32 additions & 0 deletions frontend/document.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import "github.com/Azure/ARO-HCP/internal/api/arm"

// HCPOpenShiftClusterDocument represents an HCP OpenShift cluster document.
type HCPOpenShiftClusterDocument struct {
ID string `json:"id,omitempty"`
Key string `json:"key,omitempty"`
PartitionKey string `json:"partitionKey,omitempty"`
ClusterID string `json:"clusterid,omitempty"`

// Values provided by Cosmos after doc creation
ResourceID string `json:"_rid,omitempty"`
Self string `json:"_self,omitempty"`
ETag string `json:"_etag,omitempty"`
Attachments string `json:"_attachments,omitempty"`
Timestamp int `json:"_ts,omitempty"`
}

// SubscriptionDocument represents an Azure Subscription document.
type SubscriptionDocument struct {
ID string `json:"id,omitempty"`
PartitionKey string `json:"partitionKey,omitempty"`
Subscription *arm.Subscription `json:"subscription,omitempty"`

// Values provided by Cosmos after doc creation
ResourceID string `json:"_rid,omitempty"`
Self string `json:"_self,omitempty"`
ETag string `json:"_etag,omitempty"`
Attachments string `json:"_attachments,omitempty"`
Timestamp int `json:"_ts,omitempty"`
}
102 changes: 85 additions & 17 deletions frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
Expand Down Expand Up @@ -90,7 +91,8 @@ func NewFrontend(logger *slog.Logger, listener net.Listener, emitter metrics.Emi
mux.HandleFunc("/", f.NotFound)
mux.HandleFunc(MuxPattern(http.MethodGet, "healthz", "ready"), f.HealthzReady)
// TODO: determine where in the auth chain we should allow for this endpoint to be called by ARM
mux.HandleFunc(MuxPattern(http.MethodPut, PatternSubscriptions), f.ArmSubscriptionAction)
mux.HandleFunc(MuxPattern(http.MethodGet, PatternSubscriptions), f.ArmSubscriptionGet)
mux.HandleFunc(MuxPattern(http.MethodPut, PatternSubscriptions), f.ArmSubscriptionPut)

// Expose Prometheus metrics endpoint
mux.Handle(MuxPattern(http.MethodGet, "metrics"), promhttp.Handler())
Expand Down Expand Up @@ -313,21 +315,23 @@ func (f *Frontend) ArmResourceCreateOrUpdate(writer http.ResponseWriter, request

parsed, _ := azure.ParseResourceID(resourceID)
var doc *HCPOpenShiftClusterDocument
doc, found, err := f.dbClient.GetClusterDoc(ctx, resourceID, parsed.SubscriptionID)
doc, err = f.dbClient.GetClusterDoc(ctx, resourceID, parsed.SubscriptionID)
if err != nil {
f.logger.Error("failed to fetch document for %s: %v", resourceID, err)
arm.WriteInternalServerError(writer)
return
}
if !found {
f.logger.Info(fmt.Sprintf("existing document not found for cluster - creating one for %s", resourceID))
doc = &HCPOpenShiftClusterDocument{
ID: uuid.New().String(),
Key: resourceID,
ClusterID: NewUID(),
PartitionKey: parsed.SubscriptionID,
if errors.Is(err, ErrNotFound) {
f.logger.Info(fmt.Sprintf("existing document not found for cluster - creating one for %s", resourceID))
doc = &HCPOpenShiftClusterDocument{
ID: uuid.New().String(),
Key: resourceID,
ClusterID: NewUID(),
PartitionKey: parsed.SubscriptionID,
}
} else {
f.logger.Error("failed to fetch document for %s: %v", resourceID, err)
arm.WriteInternalServerError(writer)
return
}
}

err = f.dbClient.SetClusterDoc(ctx, doc)
if err != nil {
f.logger.Error("failed to create document for resource %s: %v", resourceID, err)
Expand Down Expand Up @@ -389,9 +393,15 @@ func (f *Frontend) ArmResourceDelete(writer http.ResponseWriter, request *http.R
parsed, _ := azure.ParseResourceID(resourceID)
err = f.dbClient.DeleteClusterDoc(ctx, resourceID, parsed.SubscriptionID)
if err != nil {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
if errors.Is(err, ErrNotFound) {
f.logger.Info(fmt.Sprintf("cluster document cannot be deleted -- document not found for %s", resourceID))
writer.WriteHeader(http.StatusNoContent)
return
} else {
f.logger.Error(err.Error())
arm.WriteInternalServerError(writer)
return
}
}
f.logger.Info(fmt.Sprintf("document deleted for resource %s", resourceID))

Expand All @@ -413,7 +423,38 @@ func (f *Frontend) ArmResourceAction(writer http.ResponseWriter, request *http.R
writer.WriteHeader(http.StatusOK)
}

func (f *Frontend) ArmSubscriptionAction(writer http.ResponseWriter, request *http.Request) {
func (f *Frontend) ArmSubscriptionGet(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()
subId := request.PathValue(PathSegmentSubscriptionID)

doc, err := f.dbClient.GetSubscriptionDoc(ctx, subId)
if err != nil {
if errors.Is(err, ErrNotFound) {
f.logger.Error(fmt.Sprintf("document not found for subscription %s", subId))
writer.WriteHeader(http.StatusNotFound)
return
} else {
f.logger.Error(err.Error())
writer.WriteHeader(http.StatusInternalServerError)
return
}
}

resp, err := json.Marshal(&doc)
if err != nil {
f.logger.Error(err.Error())
writer.WriteHeader(http.StatusInternalServerError)
return
}
_, err = writer.Write(resp)
if err != nil {
f.logger.Error(err.Error())
}

writer.WriteHeader(http.StatusOK)
}

func (f *Frontend) ArmSubscriptionPut(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()

body, err := BodyFromContext(ctx)
Expand Down Expand Up @@ -441,6 +482,31 @@ func (f *Frontend) ArmSubscriptionAction(writer http.ResponseWriter, request *ht
"state": string(subscription.State),
})

var doc *SubscriptionDocument
doc, err = f.dbClient.GetSubscriptionDoc(ctx, subId)
if err != nil {
if errors.Is(err, ErrNotFound) {
f.logger.Info(fmt.Sprintf("existing document not found for subscription - creating one for %s", subId))
doc = &SubscriptionDocument{
ID: uuid.New().String(),
PartitionKey: subId,
Subscription: &subscription,
}
} else {
f.logger.Error("failed to fetch document for %s: %v", subId, err)
arm.WriteInternalServerError(writer)
return
}
} else {
f.logger.Info(fmt.Sprintf("existing document found for subscription - will update document for subscription %s", subId))
doc.Subscription = &subscription
}

err = f.dbClient.SetSubscriptionDoc(ctx, doc)
if err != nil {
f.logger.Error("failed to create document for subscription %s: %v", subId, err)
}

resp, err := json.Marshal(subscription)
if err != nil {
f.logger.Error(err.Error())
Expand All @@ -451,6 +517,8 @@ func (f *Frontend) ArmSubscriptionAction(writer http.ResponseWriter, request *ht
if err != nil {
f.logger.Error(err.Error())
}

writer.WriteHeader(http.StatusCreated)
}

func (f *Frontend) ArmDeploymentPreflight(writer http.ResponseWriter, request *http.Request) {
Expand Down
16 changes: 0 additions & 16 deletions frontend/hcpclusterdocument.go

This file was deleted.

0 comments on commit 436907c

Please sign in to comment.