Skip to content

Commit

Permalink
feat: add maintenance mode for upgrades (#2211)
Browse files Browse the repository at this point in the history
- Introduces Maintenance mode (status is available via dynamic config:
`PEERDB_MAINTENANCE_MODE_ENABLED`)
 - Maintenance mode consists of 2 Workflows:
   - `StartMaintenance` - for pre-upgrade, responsible for
     - Waiting for running snapshots
     - Updating dynamic config to true
     - Pausing and backing up currently running mirrors
   - `EndMaintenance` - for post-upgrade, responsible for
     - Resuming backed up mirrors
     - Updating dynamic config to false
- During the upgrade (between `Start` and `End`), mirrors cannot be
mutated/created in any way,
- There is also an instance info API which returns `Ready`/`Maintenance`
which can be used for UI changes later.


There are 2 ways to trigger these 2 workflows:
1. API call to flow-api
2. Running the new `maintenance` entrypoint with the respective args

A new task queue is added so that the maintenance tasks can be spun up
even during pre-upgrade hooks (from version earlier than ones containing
this PR) and this also ensures that always the latest version of the
maintenance flows run irrespective of the old version.
  • Loading branch information
iamKunalGupta authored Nov 14, 2024
1 parent 350686f commit 42b0208
Show file tree
Hide file tree
Showing 27 changed files with 1,393 additions and 71 deletions.
30 changes: 30 additions & 0 deletions docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ group "default" {
"flow-worker",
"flow-api",
"flow-snapshot-worker",
"flow-maintenance",
"peerdb-ui"
]
}
Expand Down Expand Up @@ -45,6 +46,9 @@ target "flow-snapshot-worker" {
"linux/amd64",
"linux/arm64",
]
args = {
PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}"
}
tags = [
"${REGISTRY}/flow-snapshot-worker:${TAG}",
"${REGISTRY}/flow-snapshot-worker:${SHA_SHORT}",
Expand All @@ -59,19 +63,42 @@ target "flow-worker" {
"linux/amd64",
"linux/arm64",
]
args = {
PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}"
}
tags = [
"${REGISTRY}/flow-worker:${TAG}",
"${REGISTRY}/flow-worker:${SHA_SHORT}",
]
}

target "flow-maintenance" {
context = "."
dockerfile = "stacks/flow.Dockerfile"
target = "flow-maintenance"
platforms = [
"linux/amd64",
"linux/arm64",
]
args = {
PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}"
}
tags = [
"${REGISTRY}/flow-maintenance:${TAG}",
"${REGISTRY}/flow-maintenance:${SHA_SHORT}",
]
}

target "peerdb" {
context = "."
dockerfile = "stacks/peerdb-server.Dockerfile"
platforms = [
"linux/amd64",
"linux/arm64",
]
args = {
PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}"
}
tags = [
"${REGISTRY}/peerdb-server:${TAG}",
"${REGISTRY}/peerdb-server:${SHA_SHORT}",
Expand All @@ -85,6 +112,9 @@ target "peerdb-ui" {
"linux/amd64",
"linux/arm64",
]
args = {
PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}"
}
tags = [
"${REGISTRY}/peerdb-ui:${TAG}",
"${REGISTRY}/peerdb-ui:${SHA_SHORT}",
Expand Down
284 changes: 284 additions & 0 deletions flow/activities/maintenance_activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
package activities

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
)

const (
mirrorStateBackup = "backup"
mirrorStateRestored = "restore"
)

type MaintenanceActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
TemporalClient client.Client
}

func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (*protos.MaintenanceMirrors, error) {
rows, err := a.CatalogPool.Query(ctx, `
select distinct on(name)
id, name, workflow_id,
created_at, coalesce(query_string, '')='' is_cdc
from flows
`)
if err != nil {
return &protos.MaintenanceMirrors{}, err
}

maintenanceMirrorItems, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MaintenanceMirror, error) {
var info protos.MaintenanceMirror
var createdAt time.Time
err := row.Scan(&info.MirrorId, &info.MirrorName, &info.WorkflowId, &createdAt, &info.IsCdc)
info.MirrorCreatedAt = timestamppb.New(createdAt)
return &info, err
})
return &protos.MaintenanceMirrors{
Mirrors: maintenanceMirrorItems,
}, err
}

func (a *MaintenanceActivity) getMirrorStatus(ctx context.Context, mirror *protos.MaintenanceMirror) (protos.FlowStatus, error) {
return shared.GetWorkflowStatus(ctx, a.TemporalClient, mirror.WorkflowId)
}

func (a *MaintenanceActivity) WaitForRunningSnapshots(ctx context.Context) (*protos.MaintenanceMirrors, error) {
mirrors, err := a.GetAllMirrors(ctx)
if err != nil {
return &protos.MaintenanceMirrors{}, err
}

slog.Info("Found mirrors for snapshot check", "mirrors", mirrors, "len", len(mirrors.Mirrors))

for _, mirror := range mirrors.Mirrors {
lastStatus, err := a.checkAndWaitIfSnapshot(ctx, mirror, 2*time.Minute)
if err != nil {
return &protos.MaintenanceMirrors{}, err
}
slog.Info("Finished checking and waiting for snapshot",
"mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "lastStatus", lastStatus.String())
}
slog.Info("Finished checking and waiting for all mirrors to finish snapshot")
return mirrors, nil
}

func (a *MaintenanceActivity) checkAndWaitIfSnapshot(
ctx context.Context,
mirror *protos.MaintenanceMirror,
logEvery time.Duration,
) (protos.FlowStatus, error) {
// In case a mirror was just kicked off, it shows up in the running state, we wait for a bit before checking for snapshot
if mirror.MirrorCreatedAt.AsTime().After(time.Now().Add(-30 * time.Second)) {
slog.Info("Mirror was created less than 30 seconds ago, waiting for it to be ready before checking for snapshot",
"mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId)
time.Sleep(30 * time.Second)
}

flowStatus, err := RunEveryIntervalUntilFinish(ctx, func() (bool, protos.FlowStatus, error) {
activity.RecordHeartbeat(ctx, fmt.Sprintf("Waiting for mirror %s to finish snapshot", mirror.MirrorName))
mirrorStatus, err := a.getMirrorStatus(ctx, mirror)
if err != nil {
return false, mirrorStatus, err
}
if mirrorStatus == protos.FlowStatus_STATUS_SNAPSHOT || mirrorStatus == protos.FlowStatus_STATUS_SETUP {
return false, mirrorStatus, nil
}
return true, mirrorStatus, nil
}, 10*time.Second, fmt.Sprintf("Waiting for mirror %s to finish snapshot", mirror.MirrorName), logEvery)
return flowStatus, err
}

func (a *MaintenanceActivity) EnableMaintenanceMode(ctx context.Context) error {
slog.Info("Enabling maintenance mode")
return peerdbenv.UpdatePeerDBMaintenanceModeEnabled(ctx, a.CatalogPool, true)
}

func (a *MaintenanceActivity) BackupAllPreviouslyRunningFlows(ctx context.Context, mirrors *protos.MaintenanceMirrors) error {
tx, err := a.CatalogPool.Begin(ctx)
if err != nil {
return err
}
defer shared.RollbackTx(tx, slog.Default())

for _, mirror := range mirrors.Mirrors {
_, err := tx.Exec(ctx, `
insert into maintenance.maintenance_flows
(flow_id, flow_name, workflow_id, flow_created_at, is_cdc, state, from_version)
values
($1, $2, $3, $4, $5, $6, $7)
`, mirror.MirrorId, mirror.MirrorName, mirror.WorkflowId, mirror.MirrorCreatedAt.AsTime(), mirror.IsCdc, mirrorStateBackup,
peerdbenv.PeerDBVersionShaShort())
if err != nil {
return err
}
}
return tx.Commit(ctx)
}

func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirror *protos.MaintenanceMirror) (bool, error) {
mirrorStatus, err := a.getMirrorStatus(ctx, mirror)
if err != nil {
return false, err
}

slog.Info("Checking if mirror is running", "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "status", mirrorStatus.String())

if mirrorStatus != protos.FlowStatus_STATUS_RUNNING {
return false, nil
}

slog.Info("Pausing mirror for maintenance", "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId)

if err := model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "", model.PauseSignal); err != nil {
slog.Error("Error signaling mirror running to pause for maintenance",
"mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "error", err)
return false, err
}

return RunEveryIntervalUntilFinish(ctx, func() (bool, bool, error) {
updatedMirrorStatus, statusErr := a.getMirrorStatus(ctx, mirror)
if statusErr != nil {
return false, false, statusErr
}
activity.RecordHeartbeat(ctx, "Waiting for mirror to pause with current status "+updatedMirrorStatus.String())
if statusErr := model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "",
model.PauseSignal); statusErr != nil {
return false, false, statusErr
}
if updatedMirrorStatus == protos.FlowStatus_STATUS_PAUSED {
return true, true, nil
}
return false, false, nil
}, 10*time.Second, "Waiting for mirror to pause", 30*time.Second)
}

func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error {
_, err := a.CatalogPool.Exec(ctx, `
update maintenance.maintenance_flows
set state = $1,
restored_at = now(),
to_version = $2
where state = $3
`, mirrorStateRestored, peerdbenv.PeerDBVersionShaShort(), mirrorStateBackup)
return err
}

func (a *MaintenanceActivity) GetBackedUpFlows(ctx context.Context) (*protos.MaintenanceMirrors, error) {
rows, err := a.CatalogPool.Query(ctx, `
select flow_id, flow_name, workflow_id, flow_created_at, is_cdc
from maintenance.maintenance_flows
where state = $1
`, mirrorStateBackup)
if err != nil {
return nil, err
}

maintenanceMirrorItems, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MaintenanceMirror, error) {
var info protos.MaintenanceMirror
var createdAt time.Time
err := row.Scan(&info.MirrorId, &info.MirrorName, &info.WorkflowId, &createdAt, &info.IsCdc)
info.MirrorCreatedAt = timestamppb.New(createdAt)
return &info, err
})
if err != nil {
return nil, err
}

return &protos.MaintenanceMirrors{
Mirrors: maintenanceMirrorItems,
}, nil
}

func (a *MaintenanceActivity) ResumeMirror(ctx context.Context, mirror *protos.MaintenanceMirror) error {
mirrorStatus, err := a.getMirrorStatus(ctx, mirror)
if err != nil {
return err
}

if mirrorStatus != protos.FlowStatus_STATUS_PAUSED {
slog.Error("Cannot resume mirror that is not paused",
"mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "status", mirrorStatus.String())
return nil
}

// There can also be "workflow already completed" errors, what should we do in that case?
if err := model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "", model.NoopSignal); err != nil {
slog.Error("Error signaling mirror to resume for maintenance",
"mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "error", err)
return err
}
return nil
}

func (a *MaintenanceActivity) DisableMaintenanceMode(ctx context.Context) error {
slog.Info("Disabling maintenance mode")
return peerdbenv.UpdatePeerDBMaintenanceModeEnabled(ctx, a.CatalogPool, false)
}

func (a *MaintenanceActivity) BackgroundAlerter(ctx context.Context) error {
heartbeatTicker := time.NewTicker(30 * time.Second)
defer heartbeatTicker.Stop()

alertTicker := time.NewTicker(time.Duration(peerdbenv.PeerDBMaintenanceModeWaitAlertSeconds()) * time.Second)
defer alertTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-heartbeatTicker.C:
activity.RecordHeartbeat(ctx, "Maintenance Workflow is still running")
case <-alertTicker.C:
slog.Warn("Maintenance Workflow is still running")
a.Alerter.LogNonFlowWarning(ctx, telemetry.MaintenanceWait, "Waiting", "Maintenance mode is still running")
}
}
}

func RunEveryIntervalUntilFinish[T any](
ctx context.Context,
runFunc func() (finished bool, result T, err error),
runInterval time.Duration,
logMessage string,
logInterval time.Duration,
) (T, error) {
runTicker := time.NewTicker(runInterval)
defer runTicker.Stop()

logTicker := time.NewTicker(logInterval)
defer logTicker.Stop()
var lastResult T
for {
select {
case <-ctx.Done():
return lastResult, ctx.Err()
case <-runTicker.C:
finished, result, err := runFunc()
lastResult = result
if err != nil {
return lastResult, err
}
if finished {
return lastResult, err
}
case <-logTicker.C:
slog.Info(logMessage, "lastResult", lastResult)
}
}
}
6 changes: 3 additions & 3 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,18 @@ func (a *Alerter) sendTelemetryMessage(
}

if a.snsTelemetrySender != nil {
if status, err := a.snsTelemetrySender.SendMessage(ctx, details, details, attributes); err != nil {
if response, err := a.snsTelemetrySender.SendMessage(ctx, details, details, attributes); err != nil {
logger.Warn("failed to send message to snsTelemetrySender", slog.Any("error", err))
} else {
logger.Info("received status from snsTelemetrySender", slog.String("status", status))
logger.Info("received response from snsTelemetrySender", slog.String("response", response))
}
}

if a.incidentIoTelemetrySender != nil {
if status, err := a.incidentIoTelemetrySender.SendMessage(ctx, details, details, attributes); err != nil {
logger.Warn("failed to send message to incidentIoTelemetrySender", slog.Any("error", err))
} else {
logger.Info("received status from incident.io", slog.String("status", status))
logger.Info("received response from incident.io", slog.String("response", status))
}
}
}
Expand Down
Loading

0 comments on commit 42b0208

Please sign in to comment.