Skip to content

Commit

Permalink
fix snapshot and output
Browse files Browse the repository at this point in the history
  • Loading branch information
pskrbasu committed Oct 3, 2024
1 parent aa64798 commit 30cafd9
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 44 deletions.
75 changes: 46 additions & 29 deletions pkg/query/queryexecute/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/turbot/pipe-fittings/contexthelpers"
"github.com/turbot/pipe-fittings/modconfig"
"github.com/turbot/pipe-fittings/querydisplay"
"github.com/turbot/pipe-fittings/queryresult"
"github.com/turbot/pipe-fittings/steampipeconfig"
"github.com/turbot/pipe-fittings/utils"
"github.com/turbot/steampipe/pkg/cmdconfig"
Expand Down Expand Up @@ -129,42 +130,58 @@ func executeQuery(ctx context.Context, initData *query.InitData, resolvedQuery *
if err != nil {
return err, 0
}
}

// if the output format is snapshot we don't call the querydisplay code in pipe-fittings, instead we
// generate the snapshot and display it to stdout
outputFormat := viper.GetString(pconstants.ArgOutput)
if outputFormat == pconstants.OutputFormatSnapshot || outputFormat == pconstants.OutputFormatSteampipeSnapshotShort {

// display the snapshot as JSON
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
encoder.SetEscapeHTML(false)
if err := encoder.Encode(snap); err != nil {
//nolint:forbidigo // acceptable
fmt.Print("Error displaying result as snapshot", err)
// re-generate the query result from the snapshot. since the row stream in the actual queryresult has been exhausted(while generating the snapshot),
// we need to re-generate it for other output formats
newQueryResult, err := snapshot.SnapshotToQueryResult[queryresult.TimingContainer](snap, initData.StartTime)
if err != nil {
return err, 0
}
}

// if we need to export the snapshot, we export it directly from here
if viper.IsSet(pconstants.ArgExport) {
exportArgs := viper.GetStringSlice(pconstants.ArgExport)
exportMsg, err := initData.ExportManager.DoExport(ctx, "query", snap, exportArgs)
if err != nil {
return err, 0
// if the output format is snapshot we don't call the querydisplay code in pipe-fittings, instead we
// generate the snapshot and display it to stdout
outputFormat := viper.GetString(pconstants.ArgOutput)
if outputFormat == pconstants.OutputFormatSnapshot || outputFormat == pconstants.OutputFormatSteampipeSnapshotShort {

// display the snapshot as JSON
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
encoder.SetEscapeHTML(false)
if err := encoder.Encode(snap); err != nil {
//nolint:forbidigo // acceptable
fmt.Print("Error displaying result as snapshot", err)
return err, 0
}
}
// print the location where the file is exported
if len(exportMsg) > 0 && viper.GetBool(pconstants.ArgProgress) {
fmt.Printf("\n") //nolint:forbidigo // intentional use of fmt
fmt.Println(strings.Join(exportMsg, "\n")) //nolint:forbidigo // intentional use of fmt
fmt.Printf("\n") //nolint:forbidigo // intentional use of fmt

// if we need to export the snapshot, we export it directly from here
if viper.IsSet(pconstants.ArgExport) {
exportArgs := viper.GetStringSlice(pconstants.ArgExport)
exportMsg, err := initData.ExportManager.DoExport(ctx, "query", snap, exportArgs)
if err != nil {
return err, 0
}
// print the location where the file is exported
if len(exportMsg) > 0 && viper.GetBool(pconstants.ArgProgress) {
fmt.Printf("\n") //nolint:forbidigo // intentional use of fmt
fmt.Println(strings.Join(exportMsg, "\n")) //nolint:forbidigo // intentional use of fmt
fmt.Printf("\n") //nolint:forbidigo // intentional use of fmt
}
}
}

// if we need to publish the snapshot, we publish it directly from here
if err := publishSnapshotIfNeeded(ctx, snap); err != nil {
return err, 0
// if we need to publish the snapshot, we publish it directly from here
if err := publishSnapshotIfNeeded(ctx, snap); err != nil {
return err, 0
}

// if other output formats are also needed, we call the querydisplay using the re-generated query result
rowCount, _ := querydisplay.ShowOutput(ctx, newQueryResult)
// show timing
display.DisplayTiming(r, rowCount)

// signal to the resultStreamer that we are done with this result
resultsStreamer.AllResultsRead()
return nil, rowErrors
}

// for other output formats, we call the querydisplay code in pipe-fittings
Expand Down
72 changes: 57 additions & 15 deletions pkg/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
pqueryresult "github.com/turbot/pipe-fittings/queryresult"
"github.com/turbot/pipe-fittings/steampipeconfig"
"github.com/turbot/pipe-fittings/utils"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
)

const schemaVersion = "20221222"
Expand All @@ -21,15 +22,20 @@ const schemaVersion = "20221222"
// We cannot use the SnapshotPanel interface directly in this package as it references
// powerpipe types that are not available in this package
type PanelData struct {
Dashboard string `json:"dashboard"`
Name string `json:"name"`
PanelType string `json:"panel_type"`
SourceDefinition string `json:"source_definition"`
Status string `json:"status,omitempty"`
Title string `json:"title,omitempty"`
SQL string `json:"sql,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
Dashboard string `json:"dashboard"`
Name string `json:"name"`
PanelType string `json:"panel_type"`
SourceDefinition string `json:"source_definition"`
Status string `json:"status,omitempty"`
Title string `json:"title,omitempty"`
SQL string `json:"sql,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
Data LeafData `json:"data,omitempty"`
}

type LeafData struct {
Columns []*queryresult.ColumnDef `json:"columns"`
Rows []map[string]interface{} `json:"rows"`
}

// IsSnapshotPanel implements SnapshotPanel
Expand Down Expand Up @@ -100,15 +106,15 @@ func getPanelTable[T queryresult.TimingContainer](ctx context.Context, result *q
}
}

func getData[T queryresult.TimingContainer](ctx context.Context, result *queryresult.Result[T]) map[string]interface{} {
jsonOutput := querydisplay.NewJSONOutput()
func getData[T queryresult.TimingContainer](ctx context.Context, result *queryresult.Result[T]) LeafData {
jsonOutput := querydisplay.NewSnapshotPanelData()
// Ensure columns are being added
if len(result.Cols) == 0 {
error_helpers.ShowError(ctx, fmt.Errorf("no columns found in the result"))
}
// Add column definitions to the JSON output
for _, col := range result.Cols {
c := pqueryresult.ColumnDef{
c := &pqueryresult.ColumnDef{
Name: col.Name,
OriginalName: col.OriginalName,
DataType: strings.ToUpper(col.DataType),
Expand All @@ -130,9 +136,9 @@ func getData[T queryresult.TimingContainer](ctx context.Context, result *queryre
error_helpers.ShowError(ctx, err)
}
// Return the full data (including columns and rows)
return map[string]interface{}{
"columns": jsonOutput.Columns,
"rows": jsonOutput.Rows,
return LeafData{
Columns: jsonOutput.Columns,
Rows: jsonOutput.Rows,
}
}

Expand All @@ -154,3 +160,39 @@ func getLayout[T queryresult.TimingContainer](result *queryresult.Result[T], res
NodeType: "dashboard",
}
}

// SnapshotToQueryResult function to generate a queryresult with streamed rows from a snapshot
func SnapshotToQueryResult[T queryresult.TimingContainer](snap *steampipeconfig.SteampipeSnapshot, startTime time.Time) (*queryresult.Result[T], error) {
// the table of a snapshot query has a fixed name
tablePanel, ok := snap.Panels[modconfig.SnapshotQueryTableName]
if !ok {
return nil, sperr.New("dashboard does not contain table result for query")
}
chartRun := tablePanel.(*PanelData)
if !ok {
return nil, sperr.New("failed to read query result from snapshot")
}

var tim T
res := queryresult.NewResult[T](chartRun.Data.Columns, tim)

// start a goroutine to stream the results as rows
go func() {
for _, d := range chartRun.Data.Rows {
// we need to allocate a new slice everytime, since this gets read
// asynchronously on the other end and we need to make sure that we don't overwrite
// data already sent
rowVals := make([]interface{}, len(chartRun.Data.Columns))
for i, c := range chartRun.Data.Columns {
rowVals[i] = d[c.Name]
}
res.StreamRow(rowVals)
}
res.Close()
}()

// res.Timing = &queryresult.TimingMetadata{
// Duration: time.Since(startTime),
// }
return res, nil
}

0 comments on commit 30cafd9

Please sign in to comment.