Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS: Add SNS and STS #134

Open
wants to merge 1 commit into
base: f/timestream-unit-test
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 111 additions & 32 deletions kensu/boto3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from kensu.requests.models import ksu_str
from kensu.itertools import kensu_list
from kensu.utils.kensu_provider import KensuProvider
from kensu.utils.helpers import logical_naming_batch
from kensu.utils.helpers import logical_naming_batch, flatten
from kensu.utils.dsl.extractors.external_lineage_dtos import KensuDatasourceAndSchema


class ksu_dict(dict):
ksu_metadata = {}
Expand Down Expand Up @@ -45,6 +47,21 @@ def create_timestream_ds_schema(db, table, schema_dict):
ds_name=ds_name,
)

def report_lean_output(ds):
ksu = KensuProvider().instance()
ds.ksu_ds._report()
schema = ds.ksu_schema._report()
# TODO: Create Stats
if ksu.lean_mode:
ksu.outputs_lean.append(schema)
if ds.ksu_ds.name in ksu.ds_name_stats:
stats_json = ksu.ds_name_stats[ds.ksu_ds.name]
else:
stats_json = None
ksu.real_schema_df[schema.to_guid()]= stats_json
ksu.report_without_mapping()


def kensu_put(event_params, event_ctx, **kwargs):
if isinstance(event_params.get('Body'), ksu_str):
kensu = KensuProvider().instance()
Expand Down Expand Up @@ -211,13 +228,18 @@ def kensu_timestream_query(event_params):
ksu = KensuProvider().instance()
for ((db, table), schema_dict) in validated_input_columns.items():
ds = create_timestream_ds_schema(db, table, schema_dict)
if ksu.lean_mode:
ds.ksu_ds._report()
ds.ksu_schema._report()
ksu.real_schema_df[ds.ksu_schema.to_guid()] = None # is this one needed?
ksu.register_input_lean_mode(ds.ksu_schema)
else:
logging.warning("Timestream-query.query() kensu tracking is supported only in kensu_lean_mode=True")
register_lean_input(ds, 'Timestream-query.query()')


def register_lean_input(ds, tracking_what):
ksu = KensuProvider().instance()
if ksu.lean_mode:
ds.ksu_ds._report()
ds.ksu_schema._report()
ksu.real_schema_df[ds.ksu_schema.to_guid()] = None # is this one needed?
ksu.register_input_lean_mode(ds.ksu_schema)
else:
logging.warning(f"kensu tracking for {tracking_what} is supported only in kensu_lean_mode=True")


def dim_to_schema(dim):
Expand Down Expand Up @@ -273,20 +295,7 @@ def kensu_write_records(event_params):
# Creation of the output datasource (stored in S3)
schema_dict = extract_timestream_write_schema(records, event_params.get('CommonAttributes'))
ds = create_timestream_ds_schema(database, table, schema_dict)
ds.ksu_ds._report()
schema = ds.ksu_schema._report()

# TODO: Create Stats

if kensu.lean_mode:
kensu.outputs_lean.append(schema)
if ds.ksu_ds.name in kensu.ds_name_stats:
stats_json = kensu.ds_name_stats[ds.ksu_ds.name]
else:
stats_json = None
kensu.real_schema_df[schema.to_guid()]= stats_json
kensu.report_without_mapping()

report_lean_output(ds)



Expand Down Expand Up @@ -320,13 +329,69 @@ def kensu_get(event_params, **kwargs):

class_attributes['get'] = kensu_get

boto3._get_default_session().events.register("creating-resource-class.s3.Object",
add_custom_method)

def extract_sns_schema(msg):
schema = [('unknown', 'unknown')]
if isinstance(msg, str):
try:
import json
parsed_msg = json.loads(msg)
schema = list(flatten(parsed_msg).items())
# this seem to have another json string(s) inside
# {'default': '{"data": {"msg": "my_test_message"}, "type": "io.kensu.example"}'}
for root_key, sub_msg in parsed_msg.items():
if isinstance(msg, str):
try:
parsed_submsg = json.loads(sub_msg)
schema.extend(
flatten(parsed_submsg, parent_key=root_key).items())
except:
pass
except:
pass
return schema


def kensu_sns_publish(event_params):
body = event_params.get('body', {})
topic_arn = body.get('TopicArn')
msg = body.get('Message')
# FIXME: should we extract msg `type` as part of Kensu DS URI?
print(f'SNS: {topic_arn} msg: {msg}')
schema = extract_sns_schema(msg)

ksu = KensuProvider().instance()
ds_name = f"SNS msg in {topic_arn.replace('arn:aws:sns:', '')}"
ds = KensuDatasourceAndSchema.for_path_with_opt_schema(
ksu=ksu,
ds_path=f"{topic_arn}",
format="AWS SNS message",
categories=['logical::'+ds_name],
maybe_schema=schema,
ds_name=ds_name,
)
report_lean_output(ds)


def kensu_sts_GetCallerIdentity(event_params):
ksu = KensuProvider().instance()
ds_name = "AWS::STS::get_caller_identity()"
schema_dict = {"unknown": "unknown"}
ds = KensuDatasourceAndSchema.for_path_with_opt_schema(
ksu=ksu,
ds_path=ds_name,
format="AWS STS request",
categories=['logical::'+ds_name],
maybe_schema=schema_dict.items(),
ds_name=ds_name,
)
register_lean_input(ds, 'AWS client("sts").get_caller_identity()')


def kensu_tracker(*class_attributes, **kwargs):
import logging
global TRACKED_EVENTS_DICT

param_types = [
]
import pprint
Expand All @@ -342,24 +407,38 @@ def kensu_tracker(*class_attributes, **kwargs):
event_name = kwargs.get('event_name')
event_params = kwargs.get('params')
event_ctx = kwargs.get('context')

if event_name == 'provide-client-params.s3.PutObject' and event_params:
kensu_put(event_params=event_params, event_ctx=event_ctx, **kwargs)
if event_name == 'provide-client-params.timestream-write.WriteRecords' and event_params:
kensu_write_records(event_params)
if event_name == 'provide-client-params.timestream-query.Query' and event_params:
kensu_timestream_query(event_params)
else:
for handler_event_name, handler_fn in TRACKED_EVENTS_DICT.items():
if event_name == handler_event_name:
handler_fn(event_params)


TRACKED_EVENTS_DICT = {
'provide-client-params.s3.PutObject': kensu_put,
'provide-client-params.timestream-write.WriteRecords': kensu_write_records,
'provide-client-params.timestream-query.Query': kensu_timestream_query,
'before-call.sns.Publish': kensu_sns_publish,
'provide-client-params.sts.GetCallerIdentity': kensu_sts_GetCallerIdentity
}


#boto3._get_default_session().events.register('creating-resource-class.s3.ServiceResource',kensu_tracker)
#boto3._get_default_session().events.register('before-send.s3.PutObject', kensu_tracker)
boto3._get_default_session().events.register('provide-client-params.s3.PutObject', kensu_tracker)

# in case we wanted to see all events - use *
#S3 = boto3.resource('s3' , region_name='eu-west-1')
#event_system = S3.meta.client.meta.events
#event_system.register("*",kensu_tracker)
#event_system.register('creating-resource-class.s3.*', prt)

#boto3._get_default_session().events.register("*",kensu_tracker)
#boto3._get_default_session().events.register("*", kensu_tracker)


boto3._get_default_session().events.register("creating-resource-class.s3.Object",
add_custom_method)

boto3._get_default_session().events.register('provide-client-params.timestream-write.WriteRecords', kensu_tracker)
boto3._get_default_session().events.register('provide-client-params.timestream-query.Query', kensu_tracker)
for event_id in TRACKED_EVENTS_DICT.keys():
boto3._get_default_session().events.register(event_id, kensu_tracker)
2 changes: 1 addition & 1 deletion kensu/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def flatten(d, parent_key='', sep='.'):
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten(v, new_key, sep=sep).items())
elif isinstance(v, list):
elif isinstance(v, list) and v:
if isinstance(v[0], dict):
new_key = new_key + '[]'
for i in v:
Expand Down