Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 4, 2024
2 parents 87a529f + a9d33fd commit 821366d
Show file tree
Hide file tree
Showing 56 changed files with 588 additions and 361 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/update-docker-compose-stable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Update docker-compose.yaml tags

on:
schedule:
- cron: '0 15 * * 1'
workflow_dispatch:
inputs: {}
permissions:
issues: write
pull-requests: write
contents: write


env:
PR_BRANCH: automated/docker-compose-image-tags-upgrade
PR_LABEL: dependencies
PR_TITLE: "feat: upgrade `docker-compose.yml` stable image tags"

jobs:
update-docker-compose-tag:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
ref: main
- name: create-PR
shell: bash
run: |
set -eou pipefail
latest_tag="$(gh api \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
/repos/${{ github.repository }}/releases/latest | jq -r '.tag_name')"
sed -i -E 's|(image: ghcr\.io/peerdb\-io/.*?:stable-)(.*$)|\1'"${latest_tag}"'|g' docker-compose.yml
git config --global user.name "${GITHUB_ACTOR}"
git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com"
git checkout -b "${PR_BRANCH}"
git fetch || true
git add -u
git commit -m 'chore(automated): upgrade docker-compose.yml stable tags'
git push -u origin "${PR_BRANCH}" --force-with-lease
PR_ID=$(gh pr list --label "${PR_LABEL}" --head "${PR_BRANCH}" --json number | jq -r '.[0].number // ""')
if [ "$PR_ID" == "" ]; then
PR_ID=$(gh pr create -l "$PR_LABEL" -t "$PR_TITLE" --body "")
fi
gh pr merge --auto --squash
env:
GH_TOKEN: ${{ github.token }}
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: postgres:17-alpine@sha256:0d9624535618a135c5453258fd629f4963390338b11aaffb92292c12df3a6c17
image: postgres:17-alpine@sha256:e7897baa70dae1968d23d785adb4aeb699175e0bcaae44f98a7083ecb9668b93
command: -c config_file=/etc/postgresql.conf
ports:
- 9901:5432
Expand Down
12 changes: 6 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: postgres:17-alpine@sha256:0d9624535618a135c5453258fd629f4963390338b11aaffb92292c12df3a6c17
image: postgres:17-alpine@sha256:e7897baa70dae1968d23d785adb4aeb699175e0bcaae44f98a7083ecb9668b93
command: -c config_file=/etc/postgresql.conf
restart: unless-stopped
ports:
Expand Down Expand Up @@ -112,7 +112,7 @@ services:

flow-api:
container_name: flow_api
image: ghcr.io/peerdb-io/flow-api:stable-v0.19.1
image: ghcr.io/peerdb-io/flow-api:stable-v0.20.0
restart: unless-stopped
ports:
- 8112:8112
Expand All @@ -128,7 +128,7 @@ services:

flow-snapshot-worker:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.19.1
image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.20.0
restart: unless-stopped
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
Expand All @@ -138,7 +138,7 @@ services:

flow-worker:
container_name: flow-worker
image: ghcr.io/peerdb-io/flow-worker:stable-v0.19.1
image: ghcr.io/peerdb-io/flow-worker:stable-v0.20.0
restart: unless-stopped
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
Expand All @@ -151,7 +151,7 @@ services:
peerdb:
container_name: peerdb-server
stop_signal: SIGINT
image: ghcr.io/peerdb-io/peerdb-server:stable-v0.19.1
image: ghcr.io/peerdb-io/peerdb-server:stable-v0.20.0
restart: unless-stopped
environment:
<<: *catalog-config
Expand All @@ -167,7 +167,7 @@ services:

peerdb-ui:
container_name: peerdb-ui
image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.19.1
image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.20.0
restart: unless-stopped
ports:
- 3000:3000
Expand Down
3 changes: 2 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
defer connectors.CloseConnector(ctx, dstConn)

if err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatchSync.SchemaDeltas); err != nil {
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

Expand Down Expand Up @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
})

errGroup.Go(func() error {
var err error
rowsSynced, err = syncRecords(dstConn, errCtx, config, partition, outstream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down
15 changes: 4 additions & 11 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"runtime"

"github.com/grafana/pyroscope-go"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/client"
temporalotel "go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/worker"
Expand Down Expand Up @@ -159,18 +158,12 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {

var otelManager *otel_metrics.OtelManager
if opts.EnableOtelMetrics {
metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker")
if metricsErr != nil {
return nil, metricsErr
}
otelManager = &otel_metrics.OtelManager{
MetricsProvider: metricsProvider,
Meter: metricsProvider.Meter("io.peerdb.flow-worker"),
Float64GaugesCache: make(map[string]metric.Float64Gauge),
Int64GaugesCache: make(map[string]metric.Int64Gauge),
Int64CountersCache: make(map[string]metric.Int64Counter),
otelManager, err = otel_metrics.NewOtelManager()
if err != nil {
return nil, fmt.Errorf("unable to create otel manager: %w", err)
}
}

w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(context.Background(), conn),
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *BigQueryConnector) waitForTableReady(ctx context.Context, datasetTable
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *BigQueryConnector) SyncQRepRecords(
partition.PartitionId, destTable))

avroSync := NewQRepAvroSyncMethod(c, config.StagingPath, config.FlowJobName)
return avroSync.SyncQRepRecords(ctx, config.FlowJobName, destTable, partition,
return avroSync.SyncQRepRecords(ctx, config.Env, config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName)
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
}
}

err = c.ReplayTableSchemaDeltas(ctx, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
if err != nil {
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
}

stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, strconv.FormatInt(syncBatchID, 10))
numRecords, err := s.writeToStage(ctx, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
numRecords, err := s.writeToStage(ctx, req.Env, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
&datasetTable{
project: s.connector.projectID,
dataset: s.connector.datasetID,
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("dstTableName", rawTableName))

err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
err = s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand Down Expand Up @@ -139,6 +139,7 @@ func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softD

func (s *QRepAvroSyncMethod) SyncQRepRecords(
ctx context.Context,
env map[string]string,
flowJobName string,
dstTableName string,
partition *protos.QRepPartition,
Expand Down Expand Up @@ -167,7 +168,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
table: fmt.Sprintf("%s_%s_staging", dstDatasetTable.table,
strings.ReplaceAll(partition.PartitionId, "-", "_")),
}
numRecords, err := s.writeToStage(ctx, partition.PartitionId, flowJobName, avroSchema,
numRecords, err := s.writeToStage(ctx, env, partition.PartitionId, flowJobName, avroSchema,
stagingDatasetTable, stream, flowJobName)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %w", err)
Expand Down Expand Up @@ -389,6 +390,7 @@ func GetAvroField(bqField *bigquery.FieldSchema) (AvroField, error) {

func (s *QRepAvroSyncMethod) writeToStage(
ctx context.Context,
env map[string]string,
syncID string,
objectFolder string,
avroSchema *model.QRecordAvroSchemaDefinition,
Expand All @@ -408,7 +410,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(ctx)

numRecords, err := ocfWriter.WriteOCF(ctx, w)
numRecords, err := ocfWriter.WriteOCF(ctx, env, w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
Expand All @@ -426,7 +428,7 @@ func (s *QRepAvroSyncMethod) writeToStage(

avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
s.connector.logger.Info("writing records to local file", idLog)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, avroFilePath)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, env, avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
return nil, err
}

if err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

Expand All @@ -120,7 +120,10 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return res, nil
}

func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJobName string,
func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
Expand All @@ -133,7 +136,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ
}

for _, addedColumn := range schemaDelta.AddedColumns {
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(protos.DBType_CLICKHOUSE)
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(ctx, env, protos.DBType_CLICKHOUSE, addedColumn)
if err != nil {
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
}
Expand Down
38 changes: 10 additions & 28 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"golang.org/x/sync/errgroup"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -81,16 +80,6 @@ func getColName(overrides map[string]string, name string) string {
return name
}

func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string {
rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier)
if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision {
return "String"
} else {
precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{})
return fmt.Sprintf("Decimal(%d, %d)", precision, scale)
}
}

func generateCreateTableSQLForNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
Expand Down Expand Up @@ -142,14 +131,10 @@ func generateCreateTableSQLForNormalizedTable(
}

if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
}
var err error
clickHouseType, err = colType.ToDWHColumnType(ctx, config.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
}
}
if (tableSchema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
Expand Down Expand Up @@ -368,16 +353,13 @@ func (c *ClickHouseConnector) NormalizeRecords(

colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName))
if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
var err error
clickHouseType, err = colType.ToDWHColumnType(ctx, req.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}

if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
clickHouseType = fmt.Sprintf("Nullable(%s)", clickHouseType)
}
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords(
s.logger.Info("sync function called and schema acquired",
slog.String("dstTable", dstTableName))

avroSchema, err := s.getAvroSchema(dstTableName, schema)
avroSchema, err := s.getAvroSchema(ctx, env, dstTableName, schema)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
stagingPath := s.credsProvider.BucketPath
startTime := time.Now()

avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema())
avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema())
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -165,10 +165,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
}

func (s *ClickHouseAvroSyncMethod) getAvroSchema(
ctx context.Context,
env map[string]string,
dstTableName string,
schema qvalue.QRecordSchema,
) (*model.QRecordAvroSchemaDefinition, error) {
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_CLICKHOUSE)
avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_CLICKHOUSE)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ type CDCSyncConnectorCore interface {
// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
// Connectors which are non-normalizing should implement this as a nop.
ReplayTableSchemaDeltas(ctx context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error
ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error
}

type CDCSyncConnector interface {
Expand Down Expand Up @@ -463,8 +463,6 @@ var (
_ CDCSyncConnector = &connclickhouse.ClickHouseConnector{}
_ CDCSyncConnector = &connelasticsearch.ElasticsearchConnector{}

_ CDCSyncPgConnector = &connpostgres.PostgresConnector{}

_ CDCNormalizeConnector = &connpostgres.PostgresConnector{}
_ CDCNormalizeConnector = &connbigquery.BigQueryConnector{}
_ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{}
Expand Down
Loading

0 comments on commit 821366d

Please sign in to comment.