Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[preview] sql,pg: catch db backend failure #1081

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build/package/docker/postgres.dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM postgres:16.2
FROM postgres:16.5

# Inject the init script that makes the kwild superuser and a kwild database
# owned by that kwild user, as well as a kwil_test_db database for tests.
Expand All @@ -14,4 +14,5 @@ COPY ./pginit.sql /docker-entrypoint-initdb.d/init.sql

# Override the default entrypoint/command to include the additional configuration
CMD ["postgres", "-c", "wal_level=logical", "-c", "max_wal_senders=10", "-c", "max_replication_slots=10", \
"-c", "track_commit_timestamp=true", "-c", "wal_sender_timeout=0", "-c", "max_prepared_transactions=2"]
"-c", "track_commit_timestamp=true", "-c", "wal_sender_timeout=0", "-c", "max_prepared_transactions=2", \
"-c", "max_locks_per_transaction=256", "-c", "max_connections=128"]
17 changes: 17 additions & 0 deletions common/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,30 @@ package sql
import (
"context"
"errors"
"strings"

"github.com/jackc/pgx/v5/pgconn"
)

var (
ErrNoTransaction = errors.New("no transaction")
ErrNoRows = errors.New("no rows in result set")
ErrDBFailure = errors.New("database failure") // fatal! should kill node
)

func IsFatalDBError(err error) bool {
if errors.Is(err, ErrDBFailure) {
return true
}
// In case it wasn't already joined with ErrDBFailure, catch a fatal PgError by code.
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) &&
(strings.HasPrefix(pgErr.Code, "53") || // Class 53 — Insufficient Resources
strings.HasPrefix(pgErr.Code, "58") || // Class 58 — System Error (errors external to PostgreSQL itself)
strings.HasPrefix(pgErr.Code, "XX")) // Class XX — Internal Error (internal_error, data_corrupted, index_corrupted)

}

// ResultSet is the result of a query or execution.
// It contains the returned columns and the rows.
type ResultSet struct {
Expand Down
65 changes: 65 additions & 0 deletions common/sql/sql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package sql

import (
"errors"
"testing"

"github.com/jackc/pgx/v5/pgconn"
)

func TestIsFatalDBError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "nil error",
err: nil,
want: false,
},
{
name: "ErrDBFailure",
err: ErrDBFailure,
want: true,
},
{
name: "wrapped ErrDBFailure",
err: errors.Join(errors.New("wrapped"), ErrDBFailure),
want: true,
},
{
name: "insufficient resources error",
err: &pgconn.PgError{Code: "53100"},
want: true,
},
{
name: "system error",
err: &pgconn.PgError{Code: "58000"},
want: true,
},
{
name: "internal error",
err: &pgconn.PgError{Code: "XX000"},
want: true,
},
{
name: "non-fatal pg error",
err: &pgconn.PgError{Code: "23505"},
want: false,
},
{
name: "generic error",
err: errors.New("some error"),
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsFatalDBError(tt.err); got != tt.want {
t.Errorf("IsFatalDBError() = %v, want %v", got, tt.want)
}
})
}
}
8 changes: 8 additions & 0 deletions internal/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/kwilteam/kwil-db/common/chain"
"github.com/kwilteam/kwil-db/common/chain/forks"
"github.com/kwilteam/kwil-db/common/ident"
"github.com/kwilteam/kwil-db/common/sql"
"github.com/kwilteam/kwil-db/core/log"
"github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/core/types/serialize"
Expand Down Expand Up @@ -367,6 +368,13 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal

abciRes := &abciTypes.ExecTxResult{}
if txRes.Error != nil {
// Ensure the node halt if a fatal DB error occurs (e.g. out of memory/disk,
// corruption, etc.), rather than allowing non-determinism with a failed txn.
if sql.IsFatalDBError(txRes.Error) {
return nil, txRes.Error
}

// If the transaction failed for user reasons, we still want to include it in the block.
abciRes.Log = txRes.Error.Error()
a.log.Warn("failed to execute transaction", zap.Error(txRes.Error))
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/generate/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestGenerateActionBody(t *testing.T) {
Statement: `
INSERT INTO test_schema.id_and_int (id, intval)
VALUES
(uuid_generate_v5('31276fd4-105f-4ff7-9f64-644942c14b79'::UUID, format('%s-%s'::TEXT, $1::TEXT, $2::TEXT)), $2::INT8);`,
(uuid_generate_v5('31276fd4-105f-4ff7-9f64-644942c14b79'::UUID, format('%s-%s', $1::TEXT, $2::TEXT)), $2::INT8);`,
},
},
},
Expand Down
6 changes: 6 additions & 0 deletions internal/sql/pg/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ func query(ctx context.Context, cq connQueryer, stmt string, args ...any) (*sql.

rows, err := q(ctx, stmt, args...)
if err != nil {
if sql.IsFatalDBError(err) {
return nil, errors.Join(err, sql.ErrDBFailure)
}
if errors.Is(err, pgx.ErrNoRows) {
return nil, sql.ErrNoRows
}
Expand Down Expand Up @@ -202,6 +205,9 @@ func query(ctx context.Context, cq connQueryer, stmt string, args ...any) (*sql.
}
return decodeFromPGType(pgxVals...)
})
if sql.IsFatalDBError(err) { // would probably happen above when executing, but maybe here too
return nil, errors.Join(err, sql.ErrDBFailure)
}
if errors.Is(err, pgx.ErrNoRows) {
return nil, sql.ErrNoRows
}
Expand Down
2 changes: 1 addition & 1 deletion internal/sql/pg/repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Test_repl(t *testing.T) {
t.Fatal(err)
}

wantCommitHash, _ := hex.DecodeString("cb390afbf808256307ee0927999805ee3d5af193772e2c9b71823fbc1fe8867f")
wantCommitHash, _ := hex.DecodeString("1fd01e9d38e322285723643f27123762c3d7fd22761f7ab4de57e0a947f8127b")

var wg sync.WaitGroup
wg.Add(1)
Expand Down
13 changes: 11 additions & 2 deletions internal/sql/pg/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package pg

import (
"context"
"errors"

"github.com/jackc/pgx/v5"
common "github.com/kwilteam/kwil-db/common/sql"
Expand Down Expand Up @@ -40,13 +41,21 @@ func (tx *nestedTx) BeginTx(ctx context.Context) (common.Tx, error) {
}

func (tx *nestedTx) Query(ctx context.Context, stmt string, args ...any) (*common.ResultSet, error) {
return query(ctx, tx.Tx, stmt, args...)
resSet, err := query(ctx, tx.Tx, stmt, args...)
if errors.Is(err, common.ErrDBFailure) {
tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit!
}
return resSet, err
}

// Execute is now literally identical to Query in both semantics and syntax. We
// might remove one or the other in this context (transaction methods).
func (tx *nestedTx) Execute(ctx context.Context, stmt string, args ...any) (*common.ResultSet, error) {
return query(ctx, tx.Tx, stmt, args...)
resSet, err := query(ctx, tx.Tx, stmt, args...)
if errors.Is(err, common.ErrDBFailure) {
tx.Tx.Conn().Close(ctx) // do not allow the outer txn to commit!
}
return resSet, err
}

// AccessMode returns the access mode of the transaction.
Expand Down