Skip to content

Commit

Permalink
services/horizon/internal/db2/history: Include lower bound on descend…
Browse files Browse the repository at this point in the history
…ing history queries (stellar#5465)
  • Loading branch information
tamirms authored Oct 8, 2024
1 parent 7950d42 commit fe25b61
Show file tree
Hide file tree
Showing 16 changed files with 588 additions and 178 deletions.
10 changes: 5 additions & 5 deletions services/horizon/internal/actions/effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (handler GetEffectsHandler) GetResourcePage(w HeaderWriter, r *http.Request
return nil, err
}

records, err := loadEffectRecords(r.Context(), historyQ, qp, pq)
records, err := loadEffectRecords(r.Context(), historyQ, qp, pq, handler.LedgerState.CurrentStatus().HistoryElder)
if err != nil {
return nil, errors.Wrap(err, "loading transaction records")
}
Expand All @@ -94,20 +94,20 @@ func (handler GetEffectsHandler) GetResourcePage(w HeaderWriter, r *http.Request
return result, nil
}

func loadEffectRecords(ctx context.Context, hq *history.Q, qp EffectsQuery, pq db2.PageQuery) ([]history.Effect, error) {
func loadEffectRecords(ctx context.Context, hq *history.Q, qp EffectsQuery, pq db2.PageQuery, oldestLedger int32) ([]history.Effect, error) {
switch {
case qp.AccountID != "":
return hq.EffectsForAccount(ctx, qp.AccountID, pq)
return hq.EffectsForAccount(ctx, qp.AccountID, pq, oldestLedger)
case qp.LiquidityPoolID != "":
return hq.EffectsForLiquidityPool(ctx, qp.LiquidityPoolID, pq)
return hq.EffectsForLiquidityPool(ctx, qp.LiquidityPoolID, pq, oldestLedger)
case qp.OperationID > 0:
return hq.EffectsForOperation(ctx, int64(qp.OperationID), pq)
case qp.LedgerID > 0:
return hq.EffectsForLedger(ctx, int32(qp.LedgerID), pq)
case qp.TxHash != "":
return hq.EffectsForTransaction(ctx, qp.TxHash, pq)
default:
return hq.Effects(ctx, pq)
return hq.Effects(ctx, pq, oldestLedger)
}
}

Expand Down
3 changes: 2 additions & 1 deletion services/horizon/internal/actions/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func (handler GetLedgersHandler) GetResourcePage(w HeaderWriter, r *http.Request
}

var records []history.Ledger
if err = historyQ.Ledgers().Page(pq).Select(r.Context(), &records); err != nil {
err = historyQ.Ledgers().Page(pq, handler.LedgerState.CurrentStatus().HistoryElder).Select(r.Context(), &records)
if err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/actions/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (handler GetOperationsHandler) GetResourcePage(w HeaderWriter, r *http.Requ
query.OnlyPayments()
}

ops, txs, err := query.Page(pq).Fetch(ctx)
ops, txs, err := query.Page(pq, handler.LedgerState.CurrentStatus().HistoryElder).Fetch(ctx)
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions services/horizon/internal/actions/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,20 @@ func (handler GetTradesHandler) GetResourcePage(w HeaderWriter, r *http.Request)
return nil, err
}

oldestLedger := handler.LedgerState.CurrentStatus().HistoryElder
if baseAsset != nil {
counterAsset, err = qp.Counter()
if err != nil {
return nil, err
}

records, err = historyQ.GetTradesForAssets(ctx, pq, qp.AccountID, qp.TradeType, *baseAsset, *counterAsset)
records, err = historyQ.GetTradesForAssets(ctx, pq, oldestLedger, qp.AccountID, qp.TradeType, *baseAsset, *counterAsset)
} else if qp.OfferID != 0 {
records, err = historyQ.GetTradesForOffer(ctx, pq, int64(qp.OfferID))
records, err = historyQ.GetTradesForOffer(ctx, pq, oldestLedger, int64(qp.OfferID))
} else if qp.PoolID != "" {
records, err = historyQ.GetTradesForLiquidityPool(ctx, pq, qp.PoolID)
records, err = historyQ.GetTradesForLiquidityPool(ctx, pq, oldestLedger, qp.PoolID)
} else {
records, err = historyQ.GetTrades(ctx, pq, qp.AccountID, qp.TradeType)
records, err = historyQ.GetTrades(ctx, pq, oldestLedger, qp.AccountID, qp.TradeType)
}
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/actions/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (handler GetTransactionsHandler) GetResourcePage(w HeaderWriter, r *http.Re
return nil, err
}

records, err := loadTransactionRecords(ctx, historyQ, qp, pq)
records, err := loadTransactionRecords(ctx, historyQ, qp, pq, handler.LedgerState.CurrentStatus().HistoryElder)
if err != nil {
return nil, errors.Wrap(err, "loading transaction records")
}
Expand All @@ -141,7 +141,7 @@ func (handler GetTransactionsHandler) GetResourcePage(w HeaderWriter, r *http.Re
// loadTransactionRecords returns a slice of transaction records of an
// account/ledger identified by accountID/ledgerID based on pq and
// includeFailedTx.
func loadTransactionRecords(ctx context.Context, hq *history.Q, qp TransactionsQuery, pq db2.PageQuery) ([]history.Transaction, error) {
func loadTransactionRecords(ctx context.Context, hq *history.Q, qp TransactionsQuery, pq db2.PageQuery, oldestLedger int32) ([]history.Transaction, error) {
var records []history.Transaction

txs := hq.Transactions()
Expand All @@ -160,7 +160,7 @@ func loadTransactionRecords(ctx context.Context, hq *history.Q, qp TransactionsQ
txs.IncludeFailed()
}

err := txs.Page(pq).Select(ctx, &records)
err := txs.Page(pq, oldestLedger).Select(ctx, &records)
if err != nil {
return nil, errors.Wrap(err, "executing transaction records query")
}
Expand Down
31 changes: 24 additions & 7 deletions services/horizon/internal/db2/history/effect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/stellar/go/toid"
)

const genesisLedger = 2

// UnmarshalDetails unmarshals the details of this effect into `dest`
func (r *Effect) UnmarshalDetails(dest interface{}) error {
if !r.DetailsString.Valid {
Expand Down Expand Up @@ -70,7 +72,7 @@ func (r *Effect) PagingToken() string {
}

// Effects returns a page of effects without any filters besides the cursor
func (q *Q) Effects(ctx context.Context, page db2.PageQuery) ([]Effect, error) {
func (q *Q) Effects(ctx context.Context, page db2.PageQuery, oldestLedger int32) ([]Effect, error) {
op, idx, err := parseEffectsCursor(page)
if err != nil {
return nil, err
Expand All @@ -87,6 +89,9 @@ func (q *Q) Effects(ctx context.Context, page db2.PageQuery) ([]Effect, error) {
Where("(heff.history_operation_id, heff.order) > (?, ?)", op, idx).
OrderBy("heff.history_operation_id asc, heff.order asc")
case "desc":
if lowerBound := lowestLedgerBound(oldestLedger); lowerBound > 0 {
query = query.Where("heff.history_operation_id > ?", lowerBound)
}
query = query.
Where("(heff.history_operation_id, heff.order) < (?, ?)", op, idx).
OrderBy("heff.history_operation_id desc, heff.order desc")
Expand All @@ -101,14 +106,14 @@ func (q *Q) Effects(ctx context.Context, page db2.PageQuery) ([]Effect, error) {
}

// EffectsForAccount returns a page of effects for a given account
func (q *Q) EffectsForAccount(ctx context.Context, aid string, page db2.PageQuery) ([]Effect, error) {
func (q *Q) EffectsForAccount(ctx context.Context, aid string, page db2.PageQuery, oldestLedger int32) ([]Effect, error) {
var account Account
if err := q.AccountByAddress(ctx, &account, aid); err != nil {
return nil, err
}

query := selectEffect.Where("heff.history_account_id = ?", account.ID)
return q.selectEffectsPage(ctx, query, page)
return q.selectEffectsPage(ctx, query, page, oldestLedger)
}

// EffectsForLedger returns a page of effects for a given ledger sequence
Expand All @@ -125,7 +130,7 @@ func (q *Q) EffectsForLedger(ctx context.Context, seq int32, page db2.PageQuery)
start.ToInt64(),
end.ToInt64(),
)
return q.selectEffectsPage(ctx, query, page)
return q.selectEffectsPage(ctx, query, page, 0)
}

// EffectsForOperation returns a page of effects for a given operation id.
Expand All @@ -138,11 +143,11 @@ func (q *Q) EffectsForOperation(ctx context.Context, id int64, page db2.PageQuer
start.ToInt64(),
end.ToInt64(),
)
return q.selectEffectsPage(ctx, query, page)
return q.selectEffectsPage(ctx, query, page, 0)
}

// EffectsForLiquidityPool returns a page of effects for a given liquidity pool.
func (q *Q) EffectsForLiquidityPool(ctx context.Context, id string, page db2.PageQuery) ([]Effect, error) {
func (q *Q) EffectsForLiquidityPool(ctx context.Context, id string, page db2.PageQuery, oldestLedger int32) ([]Effect, error) {
op, _, err := page.CursorInt64Pair(db2.DefaultPairSep)
if err != nil {
return nil, err
Expand Down Expand Up @@ -173,6 +178,7 @@ func (q *Q) EffectsForLiquidityPool(ctx context.Context, id string, page db2.Pag
"heff.history_operation_id": liquidityPoolOperationIDs,
}),
page,
oldestLedger,
)
}

Expand All @@ -194,6 +200,7 @@ func (q *Q) EffectsForTransaction(ctx context.Context, hash string, page db2.Pag
end.ToInt64(),
),
page,
0,
)
}

Expand All @@ -209,7 +216,14 @@ func parseEffectsCursor(page db2.PageQuery) (int64, int64, error) {
return op, idx, nil
}

func (q *Q) selectEffectsPage(ctx context.Context, query sq.SelectBuilder, page db2.PageQuery) ([]Effect, error) {
func lowestLedgerBound(oldestLedger int32) int64 {
if oldestLedger <= genesisLedger {
return 0
}
return toid.AfterLedger(oldestLedger - 1).ToInt64()
}

func (q *Q) selectEffectsPage(ctx context.Context, query sq.SelectBuilder, page db2.PageQuery, oldestLedger int32) ([]Effect, error) {
op, idx, err := parseEffectsCursor(page)
if err != nil {
return nil, err
Expand All @@ -230,6 +244,9 @@ func (q *Q) selectEffectsPage(ctx context.Context, query sq.SelectBuilder, page
))`, op, op, op, idx).
OrderBy("heff.history_operation_id asc, heff.order asc")
case "desc":
if lowerBound := lowestLedgerBound(oldestLedger); lowerBound > 0 {
query = query.Where("heff.history_operation_id > ?", lowerBound)
}
query = query.
Where(`(
heff.history_operation_id <= ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"encoding/json"
"fmt"
"testing"

"github.com/guregu/null"
Expand All @@ -16,7 +17,7 @@ func TestAddEffect(t *testing.T) {
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}
tt.Assert.NoError(q.Begin(tt.Ctx))
tt.Require.NoError(q.Begin(tt.Ctx))

address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
Expand All @@ -37,25 +38,42 @@ func TestAddEffect(t *testing.T) {
3,
details,
)
tt.Assert.NoError(err)
tt.Require.NoError(err)

tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(builder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())
tt.Require.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Require.NoError(builder.Exec(tt.Ctx, q))
tt.Require.NoError(q.Commit())

effects, err := q.Effects(tt.Ctx, db2.PageQuery{
Cursor: "0-0",
Order: "asc",
Limit: 200,
})
}, 0)
tt.Require.NoError(err)
tt.Assert.Len(effects, 1)
tt.Require.Len(effects, 1)

effect := effects[0]
tt.Assert.Equal(address, effect.Account)
tt.Assert.Equal(muxedAddres, effect.AccountMuxed.String)
tt.Assert.Equal(int64(240518172673), effect.HistoryOperationID)
tt.Assert.Equal(int32(1), effect.Order)
tt.Assert.Equal(EffectType(3), effect.Type)
tt.Assert.Equal("{\"amount\": \"1000.0000000\", \"asset_type\": \"native\"}", effect.DetailsString.String)
tt.Require.Equal(address, effect.Account)
tt.Require.Equal(muxedAddres, effect.AccountMuxed.String)
tt.Require.Equal(int64(240518172673), effect.HistoryOperationID)
tt.Require.Equal(int32(1), effect.Order)
tt.Require.Equal(EffectType(3), effect.Type)
tt.Require.Equal("{\"amount\": \"1000.0000000\", \"asset_type\": \"native\"}", effect.DetailsString.String)

effects, err = q.Effects(tt.Ctx, db2.PageQuery{
Cursor: fmt.Sprintf("%d-0", toid.New(sequence+2, 0, 0).ToInt64()),
Order: "desc",
Limit: 200,
}, sequence-3)
tt.Require.NoError(err)
tt.Require.Len(effects, 1)
tt.Require.Equal(effects[0], effect)

effects, err = q.Effects(tt.Ctx, db2.PageQuery{
Cursor: fmt.Sprintf("%d-0", toid.New(sequence+5, 0, 0).ToInt64()),
Order: "desc",
Limit: 200,
}, sequence+2)
tt.Require.NoError(err)
tt.Require.Empty(effects)
}
29 changes: 23 additions & 6 deletions services/horizon/internal/db2/history/effect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,34 @@ func TestEffectsForLiquidityPool(t *testing.T) {

tt.Assert.NoError(q.Commit())

var result []Effect
result, err = q.EffectsForLiquidityPool(tt.Ctx, liquidityPoolID, db2.PageQuery{
var effects []Effect
effects, err = q.EffectsForLiquidityPool(tt.Ctx, liquidityPoolID, db2.PageQuery{
Cursor: "0-0",
Order: "asc",
Limit: 10,
})
}, 0)
tt.Assert.NoError(err)

tt.Assert.Len(result, 1)
tt.Assert.Equal(result[0].Account, address)
tt.Assert.Len(effects, 1)
effect := effects[0]
tt.Assert.Equal(effect.Account, address)

effects, err = q.EffectsForLiquidityPool(tt.Ctx, liquidityPoolID, db2.PageQuery{
Cursor: fmt.Sprintf("%d-0", toid.New(sequence+2, 0, 0).ToInt64()),
Order: "desc",
Limit: 200,
}, sequence-3)
tt.Require.NoError(err)
tt.Require.Len(effects, 1)
tt.Require.Equal(effects[0], effect)

effects, err = q.EffectsForLiquidityPool(tt.Ctx, liquidityPoolID, db2.PageQuery{
Cursor: fmt.Sprintf("%d-0", toid.New(sequence+5, 0, 0).ToInt64()),
Order: "desc",
Limit: 200,
}, sequence+2)
tt.Require.NoError(err)
tt.Require.Empty(effects)
}

func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) {
Expand Down Expand Up @@ -160,7 +177,7 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) {
Cursor: "0-0",
Order: "asc",
Limit: 200,
})
}, 0)
tt.Require.NoError(err)
tt.Require.Len(results, len(tests))

Expand Down
6 changes: 5 additions & 1 deletion services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,15 @@ func (q *Q) LedgerCapacityUsageStats(ctx context.Context, currentSeq int32, dest
}

// Page specifies the paging constraints for the query being built by `q`.
func (q *LedgersQ) Page(page db2.PageQuery) *LedgersQ {
func (q *LedgersQ) Page(page db2.PageQuery, oldestLedger int32) *LedgersQ {
if q.Err != nil {
return q
}

if lowerBound := lowestLedgerBound(oldestLedger); lowerBound > 0 && page.Order == "desc" {
q.sql = q.sql.
Where("hl.id > ?", lowerBound)
}
q.sql, q.Err = page.ApplyTo(q.sql, "hl.id")
return q
}
Expand Down
12 changes: 7 additions & 5 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ type OperationsQ struct {
opIdCol string
includeFailed bool
includeTransactions bool
boundedIdQuery bool
}

// Q is a helper struct on which to hang common_trades queries against a history
Expand Down Expand Up @@ -878,11 +879,12 @@ func (t *Transaction) HasPreconditions() bool {
// TransactionsQ is a helper struct to aid in configuring queries that loads
// slices of transaction structs.
type TransactionsQ struct {
Err error
parent *Q
sql sq.SelectBuilder
includeFailed bool
txIdCol string
Err error
parent *Q
sql sq.SelectBuilder
includeFailed bool
txIdCol string
boundedIdQuery bool
}

// TrustLine is row of data from the `trust_lines` table from horizon DB
Expand Down
Loading

0 comments on commit fe25b61

Please sign in to comment.