Skip to content

Commit

Permalink
feat(api): event list (#1486)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Sep 9, 2024
1 parent d8bdd34 commit 6dd8d34
Show file tree
Hide file tree
Showing 8 changed files with 960 additions and 612 deletions.
660 changes: 362 additions & 298 deletions api/api.gen.go

Large diffs are not rendered by default.

700 changes: 402 additions & 298 deletions api/client/go/client.gen.go

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,59 @@ paths:
summary: List ingested events
description: |
List ingested events within a time range.
If the from query param is not provided it defaults to last 72 hours.
operationId: listEvents
parameters:
- $ref: "#/components/parameters/queryFrom"
- $ref: "#/components/parameters/queryTo"
- name: ingestedAtFrom
in: query
required: false
description: |
Start date-time in RFC 3339 format.
Inclusive.
schema:
type: string
format: date-time
example: "2023-01-01T00:00:00Z"
- name: ingestedAtTo
in: query
required: false
description: |
End date-time in RFC 3339 format.
Inclusive.
schema:
type: string
format: date-time
example: "2023-01-02T00:00:00Z"
- name: hasError
in: query
required: false
description: |
If not provided lists all events.
If provided with true, only list events with processing error.
If provided with false, only list events without processing error.
schema:
type: boolean
example: false
- name: id
in: query
required: false
description: |
The event ID.
Accepts partial ID.
schema:
type: string
example: my-event-id
- name: subject
in: query
required: false
description: |
The event subject.
Accepts partial subject.
schema:
type: string
example: customer-1
- name: limit
in: query
required: false
Expand Down
58 changes: 55 additions & 3 deletions openmeter/server/router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package router
import (
"fmt"
"net/http"
"time"

"github.com/go-chi/render"

Expand All @@ -13,6 +14,9 @@ import (
"github.com/openmeterio/openmeter/pkg/models"
)

// 32 days
const maximumFromDuration = time.Hour * 24 * 32

func (a *Router) IngestEvents(w http.ResponseWriter, r *http.Request) {
a.config.IngestHandler.ServeHTTP(w, r)
}
Expand All @@ -21,13 +25,61 @@ func (a *Router) ListEvents(w http.ResponseWriter, r *http.Request, params api.L
ctx := contextx.WithAttr(r.Context(), "operation", "queryEvents")

namespace := a.config.NamespaceManager.GetDefaultNamespace()
minimumFrom := time.Now().Add(-maximumFromDuration)

// Set default values
from := defaultx.WithDefault(params.From, minimumFrom)
limit := defaultx.WithDefault(params.Limit, 100)

// Validate params
if from.Before(minimumFrom) {
err := fmt.Errorf("from date is too old: %s", from)

a.config.ErrorHandler.HandleContext(ctx, err)
models.NewStatusProblem(ctx, err, http.StatusBadRequest).Respond(w)

return
}

if params.To != nil && params.To.Before(from) {
err := fmt.Errorf("to date is before from date: %s < %s", params.To, params.From)

a.config.ErrorHandler.HandleContext(ctx, err)
models.NewStatusProblem(ctx, err, http.StatusBadRequest).Respond(w)

return
}

if params.IngestedAtFrom != nil && params.IngestedAtFrom.Before(minimumFrom) {
err := fmt.Errorf("ingestedAtFrom date is too old: %s", params.IngestedAtFrom)

a.config.ErrorHandler.HandleContext(ctx, err)
models.NewStatusProblem(ctx, err, http.StatusBadRequest).Respond(w)

return
}

if params.IngestedAtFrom != nil && params.IngestedAtTo != nil && params.IngestedAtTo.Before(*params.IngestedAtFrom) {
err := fmt.Errorf("ingestedAtTo date is before ingestedAtFrom date: %s < %s", params.IngestedAtTo, params.IngestedAtFrom)

a.config.ErrorHandler.HandleContext(ctx, err)
models.NewStatusProblem(ctx, err, http.StatusBadRequest).Respond(w)

return
}

queryParams := streaming.ListEventsParams{
From: params.From,
To: params.To,
Limit: defaultx.WithDefault(params.Limit, 100),
From: &from,
To: params.To,
IngestedAtFrom: params.IngestedAtFrom,
IngestedAtTo: params.IngestedAtTo,
ID: params.Id,
Subject: params.Subject,
HasError: params.HasError,
Limit: limit,
}

// Query events
events, err := a.config.StreamingConnector.ListEvents(ctx, namespace, queryParams)
if err != nil {
err := fmt.Errorf("query events: %w", err)
Expand Down
15 changes: 10 additions & 5 deletions openmeter/streaming/clickhouse_connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,16 @@ func (c *ClickhouseConnector) createEventsTable(ctx context.Context) error {

func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]api.IngestedEvent, error) {
table := queryEventsTable{
Database: c.config.Database,
Namespace: namespace,
From: params.From,
To: params.To,
Limit: params.Limit,
Database: c.config.Database,
Namespace: namespace,
From: params.From,
To: params.To,
IngestedAtFrom: params.IngestedAtFrom,
IngestedAtTo: params.IngestedAtTo,
ID: params.ID,
Subject: params.Subject,
HasError: params.HasError,
Limit: params.Limit,
}

sql, args := table.toSQL()
Expand Down
34 changes: 29 additions & 5 deletions openmeter/streaming/clickhouse_connector/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,16 @@ func (d createEventsTable) toSQL() string {
}

type queryEventsTable struct {
Database string
Namespace string
From *time.Time
To *time.Time
Limit int
Database string
Namespace string
From *time.Time
To *time.Time
IngestedAtFrom *time.Time
IngestedAtTo *time.Time
ID *string
Subject *string
HasError *bool
Limit int
}

func (d queryEventsTable) toSQL() (string, []interface{}) {
Expand All @@ -70,6 +75,25 @@ func (d queryEventsTable) toSQL() (string, []interface{}) {
if d.To != nil {
where = append(where, query.LessEqualThan("time", d.To.Unix()))
}
if d.IngestedAtFrom != nil {
where = append(where, query.GreaterEqualThan("ingested_at", d.IngestedAtFrom.Unix()))
}
if d.IngestedAtTo != nil {
where = append(where, query.LessEqualThan("ingested_at", d.IngestedAtTo.Unix()))
}
if d.ID != nil {
where = append(where, query.Like("id", fmt.Sprintf("%%%s%%", *d.ID)))
}
if d.Subject != nil {
where = append(where, query.Like("subject", fmt.Sprintf("%%%s%%", *d.Subject)))
}
if d.HasError != nil {
if *d.HasError {
where = append(where, "notEmpty(validation_error) = 1")
} else {
where = append(where, "empty(validation_error) = 1")
}
}
query.Where(where...)

query.Desc().OrderBy("time")
Expand Down
45 changes: 45 additions & 0 deletions openmeter/streaming/clickhouse_connector/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func TestCreateEventsTable(t *testing.T) {
}

func TestQueryEventsTable(t *testing.T) {
subjectFilter := "customer-1"
idFilter := "event-id-1"
hasErrorTrue := true
hasErrorFalse := false

tests := []struct {
query queryEventsTable
wantSQL string
Expand All @@ -46,6 +51,46 @@ func TestQueryEventsTable(t *testing.T) {
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? ORDER BY time DESC LIMIT 100",
wantArgs: []interface{}{"my_namespace"},
},
{
query: queryEventsTable{
Database: "openmeter",
Namespace: "my_namespace",
Limit: 100,
Subject: &subjectFilter,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? AND subject LIKE ? ORDER BY time DESC LIMIT 100",
wantArgs: []interface{}{"my_namespace", "%customer-1%"},
},
{
query: queryEventsTable{
Database: "openmeter",
Namespace: "my_namespace",
Limit: 100,
ID: &idFilter,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? AND id LIKE ? ORDER BY time DESC LIMIT 100",
wantArgs: []interface{}{"my_namespace", "%event-id-1%"},
},
{
query: queryEventsTable{
Database: "openmeter",
Namespace: "my_namespace",
Limit: 100,
HasError: &hasErrorTrue,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? AND notEmpty(validation_error) = 1 ORDER BY time DESC LIMIT 100",
wantArgs: []interface{}{"my_namespace"},
},
{
query: queryEventsTable{
Database: "openmeter",
Namespace: "my_namespace",
Limit: 100,
HasError: &hasErrorFalse,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? AND empty(validation_error) = 1 ORDER BY time DESC LIMIT 100",
wantArgs: []interface{}{"my_namespace"},
},
}

for _, tt := range tests {
Expand Down
11 changes: 8 additions & 3 deletions openmeter/streaming/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ import (
)

type ListEventsParams struct {
From *time.Time
To *time.Time
Limit int
From *time.Time
To *time.Time
IngestedAtFrom *time.Time
IngestedAtTo *time.Time
ID *string
Subject *string
HasError *bool
Limit int
}

type CountEventsParams struct {
Expand Down

0 comments on commit 6dd8d34

Please sign in to comment.