Skip to content

Commit

Permalink
Merge pull request #6 from oslokommune/collect-measurements
Browse files Browse the repository at this point in the history
Initial function for importing KPIs
  • Loading branch information
petterhj authored Mar 20, 2024
2 parents dc86b43 + 9d784d2 commit 0781b8e
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The following sources are currently implemented:
- `agresso`: Economy data from Agresso.
- `better_uptime`: Service uptime data from Better Stack.
- `disruptive`: Refrigerator sensor data from Disruptive.
- `measurements`: Measurements (KPIs) from the OKR Tracker.

## Setup

Expand Down
2 changes: 1 addition & 1 deletion common/dataplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def upload_dataset(dataset_id, filename):
version = dataset.get_latest_version(dataset_id)["version"]
except HTTPError as e:
if e.response.status_code == 404:
print(f"Dataset '{dataset_id}' not found; skipping import")
logger.error(f"Dataset '{dataset_id}' not found; skipping import")
return
raise

Expand Down
12 changes: 12 additions & 0 deletions common/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import csv


def getenv(name):
Expand All @@ -12,3 +13,14 @@ def getenv(name):
raise OSError(f"Environment variable {name} is not set")

return env


def write_dict_to_csv(filename, data, fieldnames, **kwargs):
with open(filename, "w") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames, **kwargs)
writer.writeheader()

for row in data:
writer.writerow(row)

return csv_file
Empty file added measurements/__init__.py
Empty file.
91 changes: 91 additions & 0 deletions measurements/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio
import logging
from pathlib import Path

from aiohttp import ClientSession, ClientResponseError
from aws_xray_sdk.core import patch_all, xray_recorder
from okdata.aws import ssm
from okdata.aws.logging import logging_wrapper

from common import dataplatform, util

patch_all()
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)

MEASUREMENTS = {
"fdeYObXg1OpTh1XxXPJ4": "dataspeilet-antall-malinger-i-origo",
"iRznHuvBqx2ketpA8keT": "dataspeilet-etterlevelse-av-oppdateringshyppighet",
}


async def collect_measurements(measurements):
# Fetch values for all measurements
logger.info(f"Fetching data for {len(measurements)} measurements")
api_key = ssm.get_secret("/dataplatform/okr-tracker/api-key")

async with ClientSession(
headers={"x-api-key": api_key},
base_url=util.getenv("OKR_TRACKER_API_BASE_URL"),
raise_for_status=True,
) as session:
kpis = await asyncio.gather(
*[fetch_measurement(session, kpi_id) for kpi_id in measurements.keys()]
)

measurements_data = {measurement_id: values for measurement_id, values in kpis}

# Upload data to dataset
for measurement_id, dataset_id in measurements.items():
measurement_values = measurements_data[measurement_id]

if measurement_values is None:
logger.warning(
f"No data for measurement '{measurement_id}'; skipping import!"
)
continue

logger.info(
f"Uploading {len(measurement_values)} measurement values for to dataset '{dataset_id}'"
)

csv_file = util.write_dict_to_csv(
filename=Path("/") / "tmp" / f"{dataset_id}_values.csv",
data=measurement_values,
fieldnames=["date", "value", "comment"],
extrasaction="ignore",
)

dataplatform.upload_dataset(dataset_id, csv_file.name)


async def fetch_measurement(session, measurement_id):
try:
async with session.get(f"/kpi/{measurement_id}/values") as response:
response_data = await response.json()
return (
measurement_id,
[
{
"date": v["date"],
"value": v["value"],
"comment": (v["comment"] or "").replace("\n", " "),
}
for v in response_data
],
)
except ClientResponseError as e:
logger.error(
"Error while fetching measurement `{}`: {}".format(
measurement_id,
str(e),
)
)
return (measurement_id, None)


@logging_wrapper
@xray_recorder.capture("collect_monitors")
def import_data(event, context):
asyncio.run(collect_measurements(MEASUREMENTS))
8 changes: 8 additions & 0 deletions serverless.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,17 @@ package:
- agresso/data/**
- better_uptime/*.py
- common/*.py
- measurements/*.py
- okdata_disruptive/*.py

functions:
collect-measurements:
handler: measurements.handler.import_data
events:
- schedule: cron(0 2 * * ? *)
timeout: 60
environment:
OKR_TRACKER_API_BASE_URL: ${ssm:/dataplatform/okr-tracker/api-url}
collect-better-uptime-monitors:
handler: better_uptime.handler.collect_monitors
events:
Expand Down
Empty file added test/measurements/__init__.py
Empty file.
40 changes: 40 additions & 0 deletions test/measurements/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import re

import pytest
from unittest.mock import patch

from okdata.aws import ssm


@pytest.fixture(autouse=True)
def mocked_ssm_get_secret():
with patch.object(ssm, "get_secret", return_value="foo-token"):
yield


class AsyncMock:
def __init__(self, status_code, data):
self.status = status_code
self._response_data = data

async def __aenter__(self):
return self

async def __aexit__(self, *error_info):
return self

async def json(self):
return self._response_data


@pytest.fixture(scope="function")
def mock_client(monkeypatch, response_data):
def mock_client_get(self, url):
# /kpi/{measurement_id}/values
if match := re.search(r"\/kpi\/(?P<measurement_id>[a-zA-Z0-9]+)\/values$", url):
measurement_values = response_data.get(match.group("measurement_id"))
if measurement_values is not None:
return AsyncMock(200, measurement_values)
return AsyncMock(404, None)

monkeypatch.setattr("aiohttp.ClientSession.get", mock_client_get)
81 changes: 81 additions & 0 deletions test/measurements/test_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import csv
from pathlib import Path

import pytest
from aws_xray_sdk.core import xray_recorder
from unittest.mock import patch

import measurements.handler as handler
from common import dataplatform

xray_recorder.begin_segment("Test")


@pytest.mark.asyncio
@pytest.mark.parametrize(
"response_data",
[
{
"foo456": [
{
"value": 0.14,
"date": "2024-02-18",
"comment": "foooo",
"created": "2024-02-18T20:03:12.871Z",
},
{
"value": 0.7,
"date": "2024-01-10",
"comment": None,
"created": "2024-01-10T20:03:12.871Z",
},
],
"bar456": [],
}
],
)
@patch.object(dataplatform, "upload_dataset")
async def test_collect_measurements(
mock_dataset_upload,
mock_client,
response_data,
mocker,
):
measurements = {
"foo456": "foo-dataset",
"bar456": "bar-dataset",
"baz789": "baz-dataset",
}

await handler.collect_measurements(measurements)

for measurement_id, dataset_id in measurements.items():
csv_file_path = Path("/") / "tmp" / f"{dataset_id}_values.csv"

response_values = response_data.get(measurement_id)

if response_values is None:
assert not csv_file_path.exists()
continue

with open(csv_file_path, "r") as csv_file:
reader = csv.DictReader(csv_file)
records = list(reader)

assert reader.dialect == "excel"
assert reader.fieldnames == ["date", "value", "comment"]

assert len(records) == len(response_values)

for i, record in enumerate(records):
assert record == {
field: str(response_values[i][field] or "")
for field in reader.fieldnames
}

assert (
mocker.call(dataset_id, str(csv_file_path))
in mock_dataset_upload.call_args_list
)

assert mock_dataset_upload.call_count == len(response_data)
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ setenv =
#DISRUPTIVE_SERVICE_ACCOUNT_EMAIL=
#DISRUPTIVE_SERVICE_ACCOUNT_KEY_ID=
#DISRUPTIVE_SERVICE_ACCOUNT_SECRET=
OKR_TRACKER_API_BASE_URL=http://okr-tracker.api.arpa

[testenv:flake8]
skip_install = true
Expand Down

0 comments on commit 0781b8e

Please sign in to comment.