Skip to content

Commit

Permalink
feat(bigquery): explicitly define gcp projects of bigquery logs (#277)
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh authored Dec 10, 2021
1 parent 8cea52b commit 898c309
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
12 changes: 8 additions & 4 deletions plugins/extractors/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ source:
}
collect_table_usage: false
usage_period_in_day: 7
usage_project_ids:
- google-project-id
- other-google-project-id
```
## Inputs
Expand All @@ -34,8 +37,9 @@ source:
| `table_pattern` | `string` | `gofood.fact_` | Regex pattern to filter which bigquery table to scan (whitelist) | *optional* |
| `include_column_profile` | `bool` | `true` | true if you want to profile the column value such min, max, med, avg, top, and freq | *optional* |
| `max_preview_rows` | `int` | `30` | max number of preview rows to fetch, `0` will skip preview fetching. Default to `30`. | *optional* |
| `collect_table_usage` | `bools` | `false` | toggle feature to collect table usage, `true` will enable collecting table usage. Default to `false`. | *optional* |
| `collect_table_usage` | `boolean` | `false` | toggle feature to collect table usage, `true` will enable collecting table usage. Default to `false`. | *optional* |
| `usage_period_in_day` | `int` | `7` | collecting log from `(now - usage_period_in_day)` until `now`. only matter if `collect_table_usage` is true. Default to `7`. | *optional* |
| `usage_project_ids` | `[]string` | `[google-project-id, other-google-project-id]` | collecting log from defined GCP Project IDs. Default to BigQuery Project ID. | *optional* |

### *Notes*

Expand All @@ -52,8 +56,8 @@ source:
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `profile.usage_count` | `15` |
| `profile.common_join` | [][CommonJoin](#CommonJoin) |
| `profile.filter_conditions` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] |
| `profile.joins` | [][Join](#Join) |
| `profile.filters` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] |
| `schema` | [][Column](#column) |

### Column
Expand All @@ -67,7 +71,7 @@ source:
| `length` | `12,2` |
| `profile` | `{"min":...,"max": ...,"unique": ...}` |

### CommonJoin
### Join

| Field | Sample Value |
| :---- | :---- |
Expand Down
15 changes: 10 additions & 5 deletions plugins/extractors/bigquery/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
)

type Config struct {
ProjectID string `mapstructure:"project_id" validate:"required"`
ServiceAccountJSON string `mapstructure:"service_account_json"`
IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"`
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
ProjectID string
ServiceAccountJSON string
IsCollectTableUsage bool
UsagePeriodInDay int64
UsageProjectIDs []string
}

const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
Expand All @@ -40,6 +41,9 @@ func New(logger log.Logger) *AuditLog {
}

func (l *AuditLog) Init(ctx context.Context, cfg Config) (err error) {
if len(cfg.UsageProjectIDs) == 0 {
cfg.UsageProjectIDs = []string{cfg.ProjectID}
}
l.config = cfg
l.client, err = l.createClient(ctx)
if err != nil {
Expand Down Expand Up @@ -69,9 +73,10 @@ func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err err

filter := l.buildFilter()
it := l.client.Entries(ctx,
logadmin.ProjectIDs([]string{l.config.ProjectID}),
logadmin.ProjectIDs(l.config.UsageProjectIDs),
logadmin.Filter(filter))

l.logger.Info("getting logs in these projects", "projects", l.config.UsageProjectIDs)
l.logger.Info("getting logs with the filter", "filter", filter)

for {
Expand Down
18 changes: 10 additions & 8 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ var summary string

// Config hold the set of configuration for the bigquery extractor
type Config struct {
ProjectID string `mapstructure:"project_id" validate:"required"`
ServiceAccountJSON string `mapstructure:"service_account_json"`
TablePattern string `mapstructure:"table_pattern"`
IncludeColumnProfile bool `mapstructure:"include_column_profile"`
MaxPreviewRows int `mapstructure:"max_preview_rows" default:"30"`
IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"`
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
ProjectID string `mapstructure:"project_id" validate:"required"`
ServiceAccountJSON string `mapstructure:"service_account_json"`
TablePattern string `mapstructure:"table_pattern"`
IncludeColumnProfile bool `mapstructure:"include_column_profile"`
MaxPreviewRows int `mapstructure:"max_preview_rows" default:"30"`
IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"`
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
UsageProjectIDs []string `mapstructure:"usage_project_ids"`
}

var sampleConfig = `
Expand Down Expand Up @@ -108,6 +109,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})
ServiceAccountJSON: e.config.ServiceAccountJSON,
IsCollectTableUsage: e.config.IsCollectTableUsage,
UsagePeriodInDay: e.config.UsagePeriodInDay,
UsageProjectIDs: e.config.UsageProjectIDs,
})
if errL != nil {
e.logger.Error("failed to create google audit log client", "err", errL)
Expand All @@ -124,7 +126,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
ts, errL := e.galClient.Collect(ctx)
e.tableStats = ts
if errL != nil {
e.logger.Error("error populating table stats usage", errL)
e.logger.Warn("error populating table stats usage", "error", errL)
}
}

Expand Down

0 comments on commit 898c309

Please sign in to comment.