Skip to content

Commit

Permalink
Merge pull request #62 from prio-data/drift_detection
Browse files Browse the repository at this point in the history
Drift detection
  • Loading branch information
jimdale authored Apr 29, 2024
2 parents d1daaff + f0dcc20 commit 3149910
Show file tree
Hide file tree
Showing 7 changed files with 799 additions and 8 deletions.
86 changes: 81 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,23 @@ Show docstring for a particular transform:

`viewser transforms show <transform-name>`

List querysets stored in the queryset database:

`viewser querysets list`

Produce code required to generate a queryset

`viewser querysets show <queryset-name>`

## Via API

The full functionality of viewser is exposed via its API for use in scripts and notebooks

The two fundamental objects used to define what data is fetched by the client are the *Queryset* and the *Column*, where a
Queryset consists of one or more Columns.

### Defining a new queryset from scratch

To define a queryset, one first imports the Queryset and Column classes

`from viewser import Queryset, Column`
Expand Down Expand Up @@ -124,7 +134,7 @@ If the wrong loa is specified, the queryset will be rejected by the server and a

The final argument to the Column instance is the name of the raw column to be fetched from the database. If a non-existant column is requested, the queryset will be rejected by the server and an error message detailing which columns are unavailable will be returned.

## Aggregation/disaggregation
#### Aggregation/disaggregation

The definition of a queryset must include the *target* level of analysis, at which the resulting data will be presented to the user.

Expand Down Expand Up @@ -153,7 +163,7 @@ It is up to users to ensure that they select the correct aggregation functions.

If a non-existent aggregation function is specified, the queryset will be rejected by the server and an error message detailing which columns have incorrect aggregation functions will be returned.

## Transforms
#### Transforms

Any queryset column may specify an arbitrary number of transforms to be done to the raw data *after* any necessary aggregation/disaggregation has been done.

Expand All @@ -174,13 +184,29 @@ A list of available transforms can be obtained using the viewser CLI. A notebook

Note that not all transforms are available at all levels of analysis. If a transform is requested at an innapproprite loa, the queryset will be rejected by the server and an error message detailing which columns have requested incompatible transforms and loas will be returned.

## Publising a queryset
### Making a new queryset by merging two or more existing querysets

It is sometimes desirable to make a larger queryset by merging several existing querysets. This can be done with the from_merger method. The method requires at mininum a list of querysets to be merged and a name for the merged queryset. Optionally, a theme and description can also be passed. There is also a boolean verbose flag, described below.
For example

querysets_to_merge = ['queryset1','querysets2','queryset3']
merged_queryset = Queryset.from_merger(querysets_to_merge,'my_merged_queryset',theme='my_theme',description='description')

Before merging, some checks are performed. The querysets to be merged must all have the same target LOA. If the querysets to be merged contain two or more columns with the same name, the method checks that all the definitions of that column are exactly the same (same raw data, same transforms with same parameters). If this is the case, one copy of this column is included in the merged queryset (if the verbose flag is True, the method reports that this has been done). If there are multiple definitions of the columns with the same column name, the attempt at merging is aborted.

### Recreating a queryset from storage

If a queryset has already been published to the queryset store (see below), the queryset object can be regenerated by doing

queryset = Queryset.from_storage(queryset_name)

### Publising a queryset

Before a queryset can be fetched, it must be published to a permanent database on the server. This is done using the `publish()` method:
Before a new queryset (written from scratch or created by merging existing querysets) can be fetched, it must be published to a permanent database on the server. This is done using the `publish()` method:

data = new_queryset.publish()

## Fetching a queryset
### Fetching a queryset

A published queryset can be fetched using the `.fetch()` method

Expand Down Expand Up @@ -269,6 +295,56 @@ This message indicates only that the queryset is waiting in the transform queue.

When all transforms have completed, downloading of the completed dataframe begins. The status message at this point will cease updating, which can make it appear that the client is hung in the case of large querysets. Users are asked to be patient :) .

## Input drift detection

The viewser package is able to perform a series of tests on the data fetched from the remote service which are designed to detect, and issue warnings about, anomalies in the data.

Two broad types of anomaly can be monitored

### Global anomalies

These examine the whole dataset, whatever its dimensions (thought on terms of time_units x space_units x features). The available anomaly detectors are

- global_missingness: simply reports if the total fraction of missing (i.e. NaN) values across the whole dataset exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05.


- global zeros: reports if the total fraction of zero values across the whole dataset exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05.


- time_missingness: reports if the fraction of missingness across any (space_units x features) slices exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05.


- space_missingness: reports if the fraction of missingness across any (time_units x features) slices exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05.


- feature_missingness: reports if the fraction of missingness for any feature (over all time and space units) exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05.


- time_zeros: reports if the fraction of zeros across any (space_units x features) slices exceeds a threshold. Threshold should be a number between 0 and 1 and close to 1, e.g. 0.95.


- space_zeros: reports if the fraction of zeros across any (time_units x features) slices exceeds a threshold. Threshold should be a number between 0 and 1 close to 1, e.g. 0.95.


- feature_zeros: reports if the fraction of zeros for any feature (over all time and space units) exceeds a threshold. Threshold should be a number between 0 and 1 close to 1, e.g. 0.95.


### Recent data anomalies
These partition the dataset into three partitions, defined by two integers n and m. If the most recent time unit in the dateset is k: the test partition consists of the most recent n time units, i.e. k-n+1 to k inclusive (usually n would be 1 so the test parition simply consists of the most recent time unit k), the standard partition consists of the most recent k-m-n to k-n time units. The time units before k-m-n are discarded. The available anomaly detectors are
- delta_completeness: reports, for each feature, if the ratio of missingness fractions in the test and standard partitions is greater than a threshold. Threshold should be a number between 0 and 1, e.g. 0.25.


- delta_zeros: reports, for each feature, if the ratio of the fraction of zeros in the test and standard partitions is greater than a threshold. Threshold should be a number between 0 and 1, e.g. 0.25.


- extreme_values: reports, for each feature, if the most extreme value in the test partition is more than (threshold) standard deviations from the mean of the data in the test partition. Threshold should be a number in the range 2-7, e.g. 5.


- ks_drift: for each feature, performs a two-sample Kolmogorov-Smirnoff test (https://en.wikipedia.org/wiki/Kolmogorov–Smirnov_test#Two-sample_Kolmogorov–Smirnov_test) between the data in the test and standard partitions and reports if (1/the returned p-value) exceeds a threshold. Threshold should be a large number, e.g. 100.


- ecod_drift: for all features simultaneously, reports if the fraction of data-points considered outliers in the test partition exceeds that in the standard partition, according to an ECOD model (https://pyod.readthedocs.io/en/latest/_modules/pyod/models/ecod.html#ECOD) trained on the standard partition, exceeds a threshold. Threshold should be a number between 0 and 1, e.g. 0.25.

## Funding

The contents of this repository is the outcome of projects that have received funding from the European Research Council (ERC) under the European Union’s Horizon 2020 research and innovation programme (Grant agreement No. 694640, *ViEWS*) and Horizon Europe (Grant agreement No. 101055176, *ANTICIPATE*; and No. 101069312, *ViEWS* (ERC-2022-POC1)), Riksbankens Jubileumsfond (Grant agreement No. M21-0002, *Societies at Risk*), Uppsala University, Peace Research Institute Oslo, the United Nations Economic and Social Commission for Western Asia (*ViEWS-ESCWA*), the United Kingdom Foreign, Commonwealth & Development Office (GSRA – *Forecasting Fatalities in Armed Conflict*), the Swedish Research Council (*DEMSCORE*), the Swedish Foundation for Strategic Environmental Research (*MISTRA Geopolitics*), the Norwegian MFA (*Conflict Trends* QZA-18/0227), and the United Nations High Commissioner for Refugees (*the Sahel Predictive Analytics project*).
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "viewser"
version = "6.4.2"
version = "6.5.0"
description = "The Views 3 CLI tool"
authors = ["peder2911 <[email protected]>"]
readme = "README.md"
Expand Down
65 changes: 65 additions & 0 deletions viewser/commands/queryset/config_drift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import numpy as np
from . import integrity_checks as ic

default_dne = -np.inf
default_missing = np.nan


default_config_dict = {

'global_missingness': {'threshold': 0.05,
'test_function': ic.get_global_nan_fracs,
'message': 'dataset missingness'},

'global_zeros': {'threshold': 0.95,
'test_function': ic.get_global_zero_fracs,
'message': 'dataset zero'},

'time_missingness': {'threshold': 0.01,
'test_function': ic.get_time_nan_fracs,
'message': 'time-unit missingness'},

'space_missingness': {'threshold': 0.03,
'test_function': ic.get_space_nan_fracs,
'message': 'space-unit missingness'},

'feature_missingness': {'threshold': 0.01,
'test_function': ic.get_feature_nan_fracs,
'message': 'feature missingness'},

'time_zeros': {'threshold': 0.95,
'test_function': ic.get_time_zero_fracs,
'message': 'time-unit zero'},

'space_zeros': {'threshold': 0.95,
'test_function': ic.get_space_zero_fracs,
'message': 'space-unit zero'},

'feature_zeros': {'threshold': 0.95,
'test_function': ic.get_feature_zero_fracs,
'message': 'feature zero'},

'delta_completeness': {'threshold': 1.25,
'test_function': ic.get_delta_completeness,
'message': 'feature delta_completeness'},

'delta_zeroes': {'threshold': 1.25,
'test_function': ic.get_delta_zeroes,
'message': 'feature delta_zeroes'},

'extreme_values': {'threshold': 4.0,
'test_function': ic.get_extreme_values,
'message': 'feature extreme values'},

'ks_drift': {'threshold': 100.,
'test_function': ic.get_ks_drift,
'message': 'feature KS drift'},

'ecod_drift': {'threshold': 0.05,
'test_function': ic.get_ecod_drift,
'message': 'dataset ECOD drift'},

'standard_partition_length': 10,
'test_partition_length': 1

}
167 changes: 167 additions & 0 deletions viewser/commands/queryset/drift_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import numpy as np
import scipy
from views_tensor_utilities import objects, mappings
from . import config_drift as config
import datetime


class InputAlarm:
def __repr__(self):
return f"Input alarm: {self.message} Severity: {self.severity} Timestamp: {self.timestamp}\n"

def __str__(self):
return f"Input alarm: {self.message} Severity: {self.severity} Timestamp: {self.timestamp}\n"

def __init__(self, message, severity=1):
self.message = message
self.severity = severity
self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")


class Tester:

"""
Tester
Class that mediates between the InputGate class and the testing functions. Brings together the relevant
test function, the partitions, the threshold, the message describing the test in the alarms, and the input data
- tensor, index, and feature names (required for useful reporting in the alarms)
"""

def __init__(self, test_function=None,
test_partition_length=1,
standard_partition_length=1,
threshold=0,
message='',
data=None,
index=None,
features=None,
):

self.test_function = test_function
self.test_partition_length = test_partition_length
self.standard_partition_length = standard_partition_length
self.threshold = threshold
self.message = message
self.data = data
self.index = index
self.features = features

def generate_alarms(self):

"""
generate alarms
Calls the object's assigned testing function with a kwarg dict that the function picks and chooses from
as needed.
The function returns an array of results and a dictionary translating the indexes of the results in the
array into units of analysis or features for reporting in the alarms.
"""

results, translation_dict = self.test_function(
tensor=self.data,
index=self.index,
features=self.features,
test_partition_length=self.test_partition_length,
standard_partition_length=self.standard_partition_length)

results /= self.threshold

try:
offenders = np.where(results > 1)[0]
severities = results[offenders]
except:

return None

if len(offenders) > 0:
alarms = []
for offender, severity in zip(offenders, severities):
if translation_dict is not None:
offender_id = translation_dict[offender]
else:
offender_id = offender

al = InputAlarm(
f"{self.message}; offender: {offender_id}, "
f"threshold: {self.threshold}",
int(1+severity))
alarms.append(al)

return alarms
else:
return f"{self.message} passed"


class InputGate:

"""
InputGate
Class which superintends the input warning machinery. Accepts a dataframe containing the data to be examined
and a configuration dictionary which users can use to override the default settings in config_drift.
The df is converted to a tensor container and non-numeric parts of the data are stripped out.
"""

def __init__(self, df, drift_config_dict=None):
self.config_dict = drift_config_dict
self.tensor_container = objects.ViewsDataframe(df).to_numpy_time_space()
self.numeric_part = self.tensor_container.get_numeric_part()
self.tensor = self.numeric_part.tensor
self.index = self.tensor_container.index
self.columns = self.numeric_part.columns

self.default_config_dict = config.default_config_dict
self.testers = []

def assemble_alerts(self):
"""
assemble_alerts
Method which compares the default configuration dictionary with that supplied when the InputGate object is
instantiated, removes test functions that are not required and updates thresholds of those requested.
The resulting configuration dictionary is then used to generate a list of Tester objects, whose
generate_alarm methods are then called in sequence.
"""

# override defaults
if self.config_dict is None:
self.config_dict = self.default_config_dict
else:
for key in self.default_config_dict.keys():
if key in self.config_dict.keys():
try:
detector_dict = self.default_config_dict[key]
detector_dict['threshold'] = self.config_dict[key]['threshold']
self.config_dict[key] = detector_dict
except:
pass
else:
try:
_ = self.default_config_dict[key]['threshold']
except:
self.config_dict[key] = self.default_config_dict[key]

testers = []
for key in self.config_dict.keys():
try:
tester_dict = self.config_dict[key]
testers.append(Tester(test_function=tester_dict['test_function'],
test_partition_length=self.config_dict['test_partition_length'],
standard_partition_length=self.config_dict['standard_partition_length'],
threshold=tester_dict['threshold'],
message=tester_dict['message'],
data=self.tensor,
index=self.index,
features=self.columns,
))
except:
pass

return [tester.generate_alarms() for tester in testers]
Loading

0 comments on commit 3149910

Please sign in to comment.