Skip to content

Commit

Permalink
feat(optimus): use grpc to connect (#273)
Browse files Browse the repository at this point in the history
* feat(models): sync models with latest structure

* refactor: create test utils to assert JSON

* feat(optimus): create Optimus extractor

* docs: update guides

* refactor(optimus): fix linter error

* feat(optimus): connect using GRPC instead of HTTP

* feat(optimus): use grpc to connect instead of http
  • Loading branch information
StewartJingga authored Nov 30, 2021
1 parent 5f7cd14 commit 7a18654
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 61 deletions.
8 changes: 4 additions & 4 deletions docs/docs/reference/extractors.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ Meteor currently support metadata extraction on these data sources. To perform e

| Type | Url | Chart | Lineage | Tags | Custom |
| :--- | :--- | :--- | :--- | :--- | :--- |
| [`grafana`](https://github.com/odpf/meteor/tree/cb12c3ecf8904cf3f4ce365ca8981ccd132f35d0/plugins/extractors/grafana/README.md) ||||||
| [`metabase`](https://github.com/odpf/meteor/tree/cb12c3ecf8904cf3f4ce365ca8981ccd132f35d0/plugins/extractors/metabase/README.md) ||||||
| [`superset`](https://github.com/odpf/meteor/tree/cb12c3ecf8904cf3f4ce365ca8981ccd132f35d0/plugins/extractors/superset/README.md) ||||||
| [`tableau`](https://github.com/odpf/meteor/tree/cb12c3ecf8904cf3f4ce365ca8981ccd132f35d0/plugins/extractors/tableau/README.md) ||||||
| [`grafana`](https://github.com/odpf/meteor/tree/main/plugins/extractors/grafana/README.md) ||||||
| [`metabase`](https://github.com/odpf/meteor/tree/main/plugins/extractors/metabase/README.md) ||||||
| [`superset`](https://github.com/odpf/meteor/tree/main/plugins/extractors/superset/README.md) ||||||
| [`tableau`](https://github.com/odpf/meteor/tree/main/plugins/extractors/tableau/README.md) ||||||

### Topic

Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ require (
github.com/hashicorp/go-hclog v0.16.1
github.com/hashicorp/go-plugin v1.4.2
github.com/klauspost/compress v1.13.6 // indirect
github.com/knadh/koanf v1.1.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.2
github.com/mcuadros/go-defaults v1.2.0
github.com/mitchellh/mapstructure v1.4.1
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/muesli/reflow v0.3.0 // indirect
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249
github.com/odpf/optimus v0.0.3
github.com/odpf/optimus v0.0.4-0.20211125235320-9d1751152006
github.com/odpf/salt v0.0.0-20210919015538-3fd8ab22acea
github.com/opencontainers/runc v1.0.1 // indirect
github.com/ory/dockertest/v3 v3.7.0
Expand All @@ -51,6 +52,7 @@ require (
gitlab.com/flimzy/testy v0.8.0 // indirect
go.mongodb.org/mongo-driver v1.5.3
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0
go.opentelemetry.io/otel/sdk/metric v0.24.0 // indirect
golang.org/x/net v0.0.0-20211101193420-4a448f8816b3 // indirect
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
golang.org/x/sys v0.0.0-20210903071746-97244b99971b // indirect
Expand Down
70 changes: 70 additions & 0 deletions go.sum

Large diffs are not rendered by default.

17 changes: 8 additions & 9 deletions plugins/extractors/optimus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ const (
type Client interface {
pb.RuntimeServiceClient
Connect(ctx context.Context, host string) error
// TODO: Remove GetJobTask function and use Optimus' when it is already available.
GetJobTask(ctx context.Context, in *GetJobTaskRequest, opts ...grpc.CallOption) (*GetJobTaskResponse, error)
Close() error
}

func newClient() Client {
Expand All @@ -33,25 +32,25 @@ func newClient() Client {

type client struct {
pb.RuntimeServiceClient
conn *grpc.ClientConn
}

func (c *client) Connect(ctx context.Context, host string) (err error) {
dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2)
defer dialCancel()

var conn *grpc.ClientConn
if conn, err = c.createConnection(dialTimeoutCtx, host); err != nil {
return errors.Wrap(err, "error creating connection")
if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil {
err = errors.Wrap(err, "error creating connection")
return
}
defer conn.Close()

c.RuntimeServiceClient = pb.NewRuntimeServiceClient(conn)
c.RuntimeServiceClient = pb.NewRuntimeServiceClient(c.conn)

return
}

func (c *client) GetJobTask(ctx context.Context, in *GetJobTaskRequest, opts ...grpc.CallOption) (*GetJobTaskResponse, error) {
return nil, nil
func (c *client) Close() error {
return c.conn.Close()
}

func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) {
Expand Down
30 changes: 0 additions & 30 deletions plugins/extractors/optimus/models.go

This file was deleted.

14 changes: 10 additions & 4 deletions plugins/extractors/optimus/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})

// Extract checks if the table is valid and extracts the table schema
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
defer e.client.Close()

projResp, err := e.client.ListProjects(ctx, &pb.ListProjectsRequest{})
if err != nil {
return errors.Wrap(err, "error fetching projects")
Expand Down Expand Up @@ -118,7 +120,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
}

func (e *Extractor) buildJob(ctx context.Context, jobSpec *pb.JobSpecification, project, namespace string) (job *assets.Job, err error) {
jobResp, err := e.client.GetJobTask(ctx, &GetJobTaskRequest{
jobResp, err := e.client.GetJobTask(ctx, &pb.GetJobTaskRequest{
ProjectName: project,
Namespace: namespace,
JobName: jobSpec.Name,
Expand Down Expand Up @@ -184,7 +186,7 @@ func (e *Extractor) buildJob(ctx context.Context, jobSpec *pb.JobSpecification,
return
}

func (e *Extractor) buildLineage(task *JobTask) (upstreams, downstreams []*common.Resource, err error) {
func (e *Extractor) buildLineage(task *pb.JobTask) (upstreams, downstreams []*common.Resource, err error) {
upstreams, err = e.buildUpstreams(task)
if err != nil {
err = errors.Wrap(err, "error building upstreams")
Expand All @@ -199,7 +201,7 @@ func (e *Extractor) buildLineage(task *JobTask) (upstreams, downstreams []*commo
return
}

func (e *Extractor) buildUpstreams(task *JobTask) (upstreams []*common.Resource, err error) {
func (e *Extractor) buildUpstreams(task *pb.JobTask) (upstreams []*common.Resource, err error) {
for _, dependency := range task.Dependencies {
var urn string
urn, err = e.mapURN(dependency.Dependency)
Expand All @@ -217,7 +219,11 @@ func (e *Extractor) buildUpstreams(task *JobTask) (upstreams []*common.Resource,
return
}

func (e *Extractor) buildDownstreams(task *JobTask) (downstreams []*common.Resource, err error) {
func (e *Extractor) buildDownstreams(task *pb.JobTask) (downstreams []*common.Resource, err error) {
if task.Destination == nil {
return
}

var urn string
urn, err = e.mapURN(task.Destination.Destination)
if err != nil {
Expand Down
32 changes: 19 additions & 13 deletions plugins/extractors/optimus/optimus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestExtract(t *testing.T) {

client := new(mockClient)
setupExtractExpectation(ctx, client)
client.On("Close").Return(nil, nil).Once()
defer client.AssertExpectations(t)

extr := optimus.New(testutils.Logger, client)
Expand All @@ -77,6 +78,11 @@ func (c *mockClient) Connect(ctx context.Context, host string) (err error) {
return args.Error(0)
}

func (c *mockClient) Close() error {
args := c.Called()
return args.Error(0)
}

func (c *mockClient) ListProjects(ctx context.Context, in *pb.ListProjectsRequest, opts ...grpc.CallOption) (*pb.ListProjectsResponse, error) {
args := c.Called(ctx, in, opts)

Expand All @@ -97,12 +103,12 @@ func (c *mockClient) ListJobSpecification(ctx context.Context, in *pb.ListJobSpe

func (c *mockClient) GetJobTask(
ctx context.Context,
in *optimus.GetJobTaskRequest,
in *pb.GetJobTaskRequest,
opts ...grpc.CallOption,
) (*optimus.GetJobTaskResponse, error) {
) (*pb.GetJobTaskResponse, error) {
args := c.Called(ctx, in, opts)

return args.Get(0).(*optimus.GetJobTaskResponse), args.Error(1)
return args.Get(0).(*pb.GetJobTaskResponse), args.Error(1)
}

func setupExtractExpectation(ctx context.Context, client *mockClient) {
Expand Down Expand Up @@ -238,41 +244,41 @@ func setupExtractExpectation(ctx context.Context, client *mockClient) {
},
}, nil).Once()

client.On("GetJobTask", ctx, &optimus.GetJobTaskRequest{
client.On("GetJobTask", ctx, &pb.GetJobTaskRequest{
ProjectName: "project-A",
Namespace: "namespace-A",
JobName: "job-A",
}, mock.Anything).Return(&optimus.GetJobTaskResponse{
Task: &optimus.JobTask{
}, mock.Anything).Return(&pb.GetJobTaskResponse{
Task: &pb.JobTask{
Name: "task-A",
Description: "task's description",
Image: "task's image",
Destination: &optimus.JobTaskDestination{
Destination: &pb.JobTask_Destination{
Destination: "bigquery://dst-project:dst-dataset.dst-table",
Type: "bigquery",
},
Dependencies: []*optimus.JobTaskDependency{
Dependencies: []*pb.JobTask_Dependency{
{
Dependency: "bigquery://src-project:src-dataset.src-table",
},
},
},
}, nil).Once()

client.On("GetJobTask", ctx, &optimus.GetJobTaskRequest{
client.On("GetJobTask", ctx, &pb.GetJobTaskRequest{
ProjectName: "project-A",
Namespace: "namespace-A",
JobName: "job-B",
}, mock.Anything).Return(&optimus.GetJobTaskResponse{
Task: &optimus.JobTask{
}, mock.Anything).Return(&pb.GetJobTaskResponse{
Task: &pb.JobTask{
Name: "task-B",
Description: "task's description B",
Image: "task's image B",
Destination: &optimus.JobTaskDestination{
Destination: &pb.JobTask_Destination{
Destination: "bigquery://dst-b-project:dst-b-dataset.dst-b-table",
Type: "bigquery",
},
Dependencies: []*optimus.JobTaskDependency{
Dependencies: []*pb.JobTask_Dependency{
{Dependency: "bigquery://src-b1-project:src-b1-dataset.src-b1-table"},
{Dependency: "bigquery://src-b2-project:src-b2-dataset.src-b2-table"},
},
Expand Down

0 comments on commit 7a18654

Please sign in to comment.