Skip to content

Commit

Permalink
Merge branch 'master' into issue_88
Browse files Browse the repository at this point in the history
  • Loading branch information
dajtxx authored Jul 16, 2024
2 parents 0f840c8 + b8d8e7f commit 0c02c3a
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/python/pollers/axistech.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pika.adapters.blocking_connection as pab
import pika.channel
import pika.spec
import pprint
import requests
from pika.exchange_type import ExchangeType

Expand Down Expand Up @@ -138,6 +139,8 @@ def get_messages(start: dt.datetime, end: dt.datetime) -> Optional[pd.DataFrame]
data = r.json()

if 'bb5d4f86-6eaa-494d-abcc-8f2e9b66b214' not in data['data']:
logging.warning('Did not find expected UUID in data object.')
logging.warning(pprint.pformat(data))
return None

frames = []
Expand Down Expand Up @@ -180,7 +183,7 @@ def get_messages(start: dt.datetime, end: dt.datetime) -> Optional[pd.DataFrame]
return df

except BaseException as e:
logging.error(e)
logging.exception(e)

return None

Expand Down Expand Up @@ -239,8 +242,8 @@ def main() -> None:

# Initialise the most recent message timestamp cache. This is used to control the time window
# used in the AxisTech API calls.
#for pdev in dao.get_physical_devices_from_source(BrokerConstants.AXISTECH):
# _recent_msg_times[pdev.source_ids['serial_no']] = pdev.last_seen
for pdev in dao.get_physical_devices_from_source(BrokerConstants.AXISTECH):
_recent_msg_times[pdev.source_ids['serial_no']] = pdev.last_seen

try:
logging.info('Opening connection')
Expand Down
103 changes: 103 additions & 0 deletions src/python/util/Extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import argparse as ap
import datetime as dt
import logging
import os
from typing import Any, List, Optional
import pandas as pd
import psycopg2 as pg

import LoggingUtil

def get_data_for_logical_device(l_uid: int, start_ts: Optional[dt.datetime] = None, end_ts: Optional[dt.datetime] = None) -> pd.DataFrame:
qry = """
SELECT logical_uid, physical_uid, ts, received_at, ts_delta, json_msg->'timeseries' AS ts_array FROM physical_timeseries
WHERE logical_uid = %s
"""

qry_args: List[Any] = [l_uid]

if start_ts is not None:
qry += ' AND ts >= %s'
qry_args.append(start_ts)

if end_ts is not None:
qry += ' AND ts < %s'
qry_args.append(end_ts)

dataset = []
with pg.connect() as conn, conn.cursor() as curs:
curs.execute('SELECT name from logical_devices where uid = %s', (l_uid, ))
if curs.rowcount != 1: # If > 1, how?
logging.error(f'Did not find a logical device with id {l_uid}')
exit(1)

dev_name: str = str(curs.fetchone()[0])
dev_name = dev_name.replace(' ', '_')
logging.info(f'Fetching data for {l_uid} / {dev_name}')

logging.debug(qry)
logging.debug(curs.mogrify(qry, qry_args))
curs.execute(qry, qry_args)
if curs.rowcount < 1:
logging.info(f'No data for {l_uid} / {dev_name}')
exit(0)

while True:
rows = curs.fetchmany(size=2000)
print(f'fetched {len(rows)} rows')
if len(rows) < 1:
break

for row in rows:
dset_item = {'l_uid': row[0], 'p_uid': row[1], 'ts': row[2], 'received_at': row[3], 'ts_delta': row[4]}
for ts_obj in row[5]:
dset_item[ts_obj['name']] = ts_obj['value']
dataset.append(dset_item)

df = pd.DataFrame(dataset)
df.set_index(['l_uid', 'ts'], inplace=True)
df.sort_index(level=0, sort_remaining=True, inplace=True, ascending=True)
df.to_csv(f'{l_uid}_{dev_name}.csv')
return df


_default_host = 'localhost'
_default_port = '5432'
_default_dbname = 'broker' # This is an IoTa utility, so use the IoTa database name by default.
_default_user = 'postgres'

parser = ap.ArgumentParser(description='Extract data from the IoTa database')
parser.add_argument('-H', dest='host', help='host to connect to, default = localhost')
parser.add_argument('-p', dest='port', help='port number to connect to, default = 5432')
parser.add_argument('-d', dest='dbname', help='database name to connect to, default = broker')
parser.add_argument('-U', dest='user', help='User name to connect as, default = postgres')
parser.add_argument('-P', dest='password', help='password to connect with, prefer to set PGPASSWORD env var')
parser.add_argument('-l', dest='l_uid', type=int, help='logical device id')
parser.add_argument('-s', dest='start_time', type=dt.datetime.fromisoformat, help='earliest timestamp in ISO-8601 format (>=)')
parser.add_argument('-e', dest='end_time', type=dt.datetime.fromisoformat, help='latest timestamp in ISO-8601 format (<)')

args = parser.parse_args()

# Give precendence to command line args, fall back to env var value if it is set,
# finally, fall back to a default value.
_host = os.getenv('PGHOST', _default_host) if args.host is None else args.host
if _host is not None:
os.environ['PGHOST'] = _host

_port = os.getenv('PGPORT', _default_port) if args.port is None else args.port
if _port is not None:
os.environ['PGPORT'] = _port

_dbname = os.getenv('PGDATABASE', _default_dbname)if args.dbname is None else args.dbname
if _dbname is not None:
os.environ['PGDATABASE'] = _dbname

_user = os.getenv('PGUSER', _default_user) if args.user is None else args.user
if _user is not None:
os.environ['PGUSER'] = _user

_password = os.getenv('PGPASSWORD') if args.password is None else args.password
if _password is not None:
os.environ['PGPASSWORD'] = _password

get_data_for_logical_device(args.l_uid, args.start_time, args.end_time)

0 comments on commit 0c02c3a

Please sign in to comment.