diff --git a/Makefile b/Makefile index 16bdcb3e..fb1ef72d 100644 --- a/Makefile +++ b/Makefile @@ -125,7 +125,7 @@ server: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go server-profile: export DL_ENV=dev server-profile: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go - go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT) --profile cpu.prof --log-level info + go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT) --profile cpu.prof --memprofile mem.pb.gz --log-level info cached: export DL_ENV=dev cached: export DL_TOKEN=$(DEV_SHARED_READER_TOKEN) diff --git a/internal/db/content.go b/internal/db/content.go index 8fc69246..e596245f 100644 --- a/internal/db/content.go +++ b/internal/db/content.go @@ -235,6 +235,11 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m contents[hash] = value } } + + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to iterate rows: %w", err) + } } return contents, nil @@ -262,5 +267,10 @@ func RandomContents(ctx context.Context, conn DbConnector, sample float32) ([]Ha hashes = append(hashes, hash) } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to iterate rows: %w", err) + } + return hashes, nil } diff --git a/internal/db/gc.go b/internal/db/gc.go index dfb60070..7905adf1 100644 --- a/internal/db/gc.go +++ b/internal/db/gc.go @@ -46,6 +46,11 @@ func GcProjectObjects(ctx context.Context, conn DbConnector, project int64, keep hashes = append(hashes, hash) } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to iterate rows: %w", err) + } + return hashes, nil } diff --git a/internal/db/project.go b/internal/db/project.go index 45a37860..323b26e5 100644 --- a/internal/db/project.go +++ b/internal/db/project.go @@ -68,6 +68,11 @@ func ListProjects(ctx context.Context, tx pgx.Tx) ([]*pb.Project, error) { projects = append(projects, &pb.Project{Id: id, Version: version}) } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to iterate rows: %w", err) + } + return projects, nil } @@ -96,6 +101,11 @@ func RandomProjects(ctx context.Context, conn DbConnector, sample float32) ([]in projects = append(projects, project) } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to iterate rows: %w", err) + } + // With only few projects this sometimes returns no results. if len(projects) > 0 { break diff --git a/internal/db/query.go b/internal/db/query.go index 75e54c3c..04c6a771 100644 --- a/internal/db/query.go +++ b/internal/db/query.go @@ -121,11 +121,8 @@ func executeQuery(ctx context.Context, tx pgx.Tx, queryBuilder *queryBuilder) ([ defer rows.Close() var dbObjects []DbObject - for { - if !rows.Next() { - break - } + for rows.Next() { var object DbObject err := rows.Scan(&object.path, &object.mode, &object.size, &object.cached, &object.packed, &object.deleted, &object.hash.H1, &object.hash.H2) @@ -136,6 +133,11 @@ func executeQuery(ctx context.Context, tx pgx.Tx, queryBuilder *queryBuilder) ([ dbObjects = append(dbObjects, object) } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to iterate rows: %w", err) + } + return dbObjects, nil } @@ -351,6 +353,10 @@ func GetCacheTars(ctx context.Context, tx pgx.Tx) (cacheTarStream, CloseFunc, er return func() (int64, []byte, *Hash, error) { if !rows.Next() { + err := rows.Err() + if err != nil { + return 0, nil, nil, fmt.Errorf("failed to iterate rows: %w", err) + } return 0, nil, nil, io.EOF } diff --git a/internal/db/update.go b/internal/db/update.go index bf53f1cd..781f3246 100644 --- a/internal/db/update.go +++ b/internal/db/update.go @@ -128,6 +128,11 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje } rows.Close() + err = rows.Err() + if err != nil { + return false, fmt.Errorf("failed to iterate rows: %w", err) + } + shouldInsert := true updated, err := updateObjects(content, updates) if errors.Is(err, ErrEmptyPack) { diff --git a/pkg/cli/server.go b/pkg/cli/server.go index c99df734..d653c70f 100644 --- a/pkg/cli/server.go +++ b/pkg/cli/server.go @@ -11,6 +11,7 @@ import ( "net" "os" "os/signal" + "runtime" "runtime/pprof" "syscall" @@ -34,15 +35,16 @@ func NewServerCommand() *cobra.Command { ) var ( - level *zapcore.Level - encoding string - tracing bool - profilePath string - port int - dbUri string - certFile string - keyFile string - pasetoFile string + level *zapcore.Level + encoding string + tracing bool + profilePath string + memProfilePath string + port int + dbUri string + certFile string + keyFile string + pasetoFile string ) cmd := &cobra.Command{ @@ -128,6 +130,21 @@ func NewServerCommand() *cobra.Command { signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM) go func() { <-osSignals + + if memProfilePath != "" { + memProfile, err := os.Create(memProfilePath) + if err != nil { + logger.Error(ctx, "cannot create heap profile file", zap.Error(err), zap.String("path", memProfilePath)) + } + defer memProfile.Close() + runtime.GC() + + err = pprof.WriteHeapProfile(memProfile) + if err != nil { + logger.Error(ctx, "cannot write heap profile", zap.Error(err), zap.String("path", memProfilePath)) + } + } + s.Grpc.GracefulStop() }() @@ -153,7 +170,8 @@ func NewServerCommand() *cobra.Command { flags.AddGoFlag(flag.CommandLine.Lookup("log-level")) flags.StringVar(&encoding, "log-encoding", "console", "Log encoding (console | json)") flags.BoolVar(&tracing, "tracing", false, "Whether tracing is enabled") - flags.StringVar(&profilePath, "profile", "", "CPU profile output path (profiling enabled if set)") + flags.StringVar(&profilePath, "profile", "", "CPU profile output path (CPU profiling enabled if set)") + flags.StringVar(&memProfilePath, "memprofile", "", "Memory profile output path") flags.IntVar(&port, "port", 5051, "GRPC server port") flags.StringVar(&dbUri, "dburi", "postgres://postgres@127.0.0.1:5432/dl", "Postgres URI") diff --git a/pkg/server/server.go b/pkg/server/server.go index a48b0ba4..5da3c213 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -103,8 +103,12 @@ func (d *DbPoolConnector) Connect(ctx context.Context) (pgx.Tx, db.CloseFunc, er return nil, nil, err } + typeMap := conn.Conn().TypeMap() for _, extraType := range d.extraTypes { - conn.Conn().TypeMap().RegisterType(extraType) + _, found := typeMap.TypeForOID(extraType.OID) + if !found { + typeMap.RegisterType(extraType) + } } tx, err := conn.Begin(ctx) @@ -177,15 +181,14 @@ func NewServer(ctx context.Context, dbConn *DbPoolConnector, cert *tls.Certifica } func (s *Server) monitorDbPool(ctx context.Context, dbConn *DbPoolConnector) { - ticker := time.NewTicker(time.Second) - go func() { for { + timer := time.NewTimer(time.Second) select { case <-ctx.Done(): s.Health.SetServingStatus("dateilager.server", healthpb.HealthCheckResponse_NOT_SERVING) return - case <-ticker.C: + case <-timer.C: ctxTimeout, cancel := context.WithTimeout(ctx, 800*time.Millisecond) status := healthpb.HealthCheckResponse_SERVING diff --git a/test/db_test.go b/test/db_test.go index 66575888..c7feed9b 100644 --- a/test/db_test.go +++ b/test/db_test.go @@ -53,6 +53,7 @@ func latestCacheVersionHashes(tc util.TestCtx) (int64, []db.Hash) { hashes = append(hashes, hash) } + require.NoError(tc.T(), rows.Err(), "iterate rows") return version, hashes } diff --git a/test/shared_test.go b/test/shared_test.go index 97ce77e2..048053bb 100644 --- a/test/shared_test.go +++ b/test/shared_test.go @@ -727,6 +727,7 @@ func debugProjects(tc util.TestCtx) { fmt.Printf("%d,\t%d,\t\t%v\n", id, latestVersion, packPatterns) } + require.NoError(tc.T(), rows.Err(), "iterate rows") fmt.Println() } @@ -755,6 +756,7 @@ func debugObjects(tc util.TestCtx) { fmt.Printf("%d,\t\t%d,\t\t%s,\t\t%s,\t%s,\t%d,\t%v,\t(%x, %x)\n", project, start_version, formatPtr(stop_version), path, formatMode(mode), size, packed, h1, h2) } + require.NoError(tc.T(), rows.Err(), "iterate rows") fmt.Println() }