-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_io.py
338 lines (269 loc) · 14.2 KB
/
data_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
"""
Import and export files fot the CDC Flu Forecast Challenge
"""
import datetime
import numpy as np
import os
import pandas as pd
import warnings
from collections import OrderedDict
from colorama import Fore, Style
from rtrend_tools.cdc_params import CDC_QUANTILES_SEQ, NUM_QUANTILES, NUM_STATES, WEEKDAY_TGT, \
NUM_OUTP_LINES, NUM_OUTP_LINES_WPOINTS, COV_NUM_STATES, WEEKDAY_FC_DAY, \
get_last_weekday_before_flu_deadline, get_last_weekday_before_cov_deadline
from rtrend_tools.forecast_structs import CDCDataBunch, ForecastPost
from rtrend_tools.interpolate import WEEKLEN
def load_cdc_truth_data(fname):
"""Load a generic truth data file from CDC."""
cdc = CDCDataBunch()
# Import
cdc.df = pd.read_csv(fname, index_col=(0, 1), parse_dates=["date"])
# Extract unique names and their ids
cdc.loc_names = np.sort(cdc.df["location_name"].unique()) # Alphabetic order
cdc.num_locs = cdc.loc_names.shape[0]
cdc.loc_ids = cdc.df.index.levels[1].unique()
# Make location id / name conversion
cdc.to_loc_name = dict() # Converts location id into name
cdc.to_loc_id = dict() # Converts location name into id
for l_id in cdc.loc_ids:
name = cdc.df.xs(l_id, level=1).iloc[0]["location_name"] # Get the name from the first id occurrence
cdc.to_loc_name[l_id] = name
cdc.to_loc_id[name] = l_id
cdc.df.sort_index(level="date", inplace=True) # Sort by dates.
cdc.data_time_labels = cdc.df.index.levels[0].unique().sort_values()
return cdc
def export_forecast_flu(fname, post_list, us, cdc, nweeks_fore, use_as_point=None,
add_week_to_labels=False, export_range=None):
"""
Data is not assumed to strictly follow the CDC guidelines (all locations and quantiles),
but warnings are thrown for irregular data.
"""
# PPREAMBLE
# ------------------------------------------------------------------------------------------------------------------
today = datetime.datetime.today().date()
today_str = today.isoformat()
target_fmt = "{:d} wk ahead inc flu hosp"
valid_post_list = [post for post in post_list if post is not None]
num_valid = len(valid_post_list)
# Default range of weeks to export
if export_range is None:
export_range = np.arange(nweeks_fore)
nweeks_eff = len(export_range)
# Check output size
if num_valid != NUM_STATES:
warnings.warn(f"\n[CDC-WARN] Hey, the number of calculated states/jurisdictions ({num_valid} "
f"+ US) does not match that required by CDC ({NUM_STATES} + US).\n")
if use_as_point is not None:
warnings.warn("HEY, use_as_point is not implemented in export_forecast_cdc.")
# --- Build list of data arrays to be concatenated in the end
# forecast_date_list = # Date of the forecast. Same for entire doc, created later.
location_list = list() # Location (state) id
target_list = list() # String describing the week of forecast
target_end_date_list = list() # Date of the forecast report
type_list = list() # Type of output: "point" or "quantile"
quantile_list = list() # q-value of the quantile
value_list = list() # Value of the forecast data.
actual_num_outp_lines = 0
# DATA UNPACKING AND COLLECTION
# ------------------------------------------------------------------------------------------------------------------
# Definition of the processing routine
# ------------------------------------
def process_location(weekly_quantiles, num_q, quantile_seq, fore_time_labels, state_name):
num_lines = nweeks_eff * (num_q + int(use_as_point is not None)) # Add point line, if requested
# CDC format compliance check
# for date in fore_time_labels:
for i_week in export_range:
date = fore_time_labels[i_week]
if date.weekday() != WEEKDAY_TGT:
warnings.warn(Fore.YELLOW + f"\n[CDC-WARN] Hey, wrong weekday ({date.weekday()}) was found in "
f"forecast data labels!)\n" + Style.RESET_ALL)
# Allocate arrays
# forecast_date_array = not needed here
location_array = np.repeat(cdc.to_loc_id[state_name], num_lines)
target_array = np.empty(num_lines, dtype=object)
target_end_date_array = np.empty(num_lines, dtype=object)
type_array = np.empty(num_lines, dtype=object)
quantile_array = np.empty(num_lines, dtype=float)
value_array = np.empty(num_lines, dtype=float)
# ---
i_line = 0
# for i_week in range(nweeks_fore): # Loop over forecast weeks
for i_week in export_range:
week = fore_time_labels[i_week] + int(add_week_to_labels) * datetime.timedelta(7)
# Write data into arrays
target_array[i_line:i_line + num_q] = target_fmt.format(i_week + 1)
target_end_date_array[i_line: i_line + num_q] = week.date().isoformat()
type_array[i_line: i_line + num_q] = "quantile"
quantile_array[i_line: i_line + num_q] = quantile_seq[:]
value_array[i_line: i_line + num_q] = weekly_quantiles[:, i_week]
i_line += num_q
# Point
if use_as_point is not None:
pass # NOT IMPLEMENTED
i_line += 1
# Store arrays
# forecast_date_list # Not needed
location_list.append(location_array)
target_list.append(target_array)
target_end_date_list.append(target_end_date_array)
type_list.append(type_array)
quantile_list.append(quantile_array)
value_list.append(value_array)
return num_lines
# Application for all states and the US
# -------------------------------------
for i_post, post in enumerate(valid_post_list): # Loop over each location
actual_num_outp_lines += process_location(post.weekly_quantiles, post.num_quantiles,
post.quantile_seq, post.fore_time_labels, post.state_name)
# --- APPLY SEPARATELY FOR US
# post_samp = valid_post_list[0] # First as example, they better be the same!
actual_num_outp_lines += process_location(us.weekly_quantiles, NUM_QUANTILES, CDC_QUANTILES_SEQ,
us.fore_time_labels, "US")
# Concatenate all arrays in desired order
fd = get_last_weekday_before_flu_deadline(WEEKDAY_FC_DAY)
# fd = get_last_weekday_before_cov_deadline(WEEKDAY_FC_DAY, now=datetime.datetime.today() - datetime.timedelta(days=1))
forecast_date = np.repeat(str(fd), actual_num_outp_lines)
location = np.concatenate(location_list)
target = np.concatenate(target_list)
target_end_date = np.concatenate(target_end_date_list)
type_ = np.concatenate(type_list)
quantile = np.concatenate(quantile_list)
value = np.concatenate(value_list)
# Final check on the number of lines in output
expec_num_outp_lines = NUM_OUTP_LINES if use_as_point is None else NUM_OUTP_LINES_WPOINTS
if actual_num_outp_lines != expec_num_outp_lines:
warnings.warn("\n[CDC-WARN] Hey, total number of lines in file doesn't match the expected "
"value.\n"
f"Total = {actual_num_outp_lines} | Expected = {NUM_OUTP_LINES}")
# DF CONSTRUCTION AND EXPORT
# ------------------------------------------------------------------------------------------------------------------
out_df = pd.DataFrame(OrderedDict(
forecast_date=forecast_date,
location=location,
target=target,
target_end_date=target_end_date,
type=type_,
quantile=quantile,
value=value,
))
print(out_df)
os.makedirs(os.path.dirname(fname), exist_ok=True)
out_df.to_csv(fname, index=False)
def export_forecast_cov_hosp(fname, post_list, us, cdc, nweeks_fore, use_as_point=None, add_days=0, export_range=None):
"""
Data is not assumed to strictly follow the CDC guidelines (all locations and quantiles),
but warnings are thrown for irregular data.
"""
# PREAMBLE
# ------------------------------------------------------------------------------------------------------------------
today = datetime.datetime.today().date()
today_str = today.isoformat()
# target_fmt = "{:d} wk ahead inc flu hosp"
target_fmt = "{:d} day ahead inc hosp"
if export_range is None:
ndays_fore = WEEKLEN * nweeks_fore
export_range = np.arange(ndays_fore) # Standard range, given nweeks_fore
else:
ndays_fore = len(export_range) # Overrides the number of forecast days by export_range
valid_post_list = [post for post in post_list if post is not None]
num_valid = len(valid_post_list)
# Check output size
if num_valid != COV_NUM_STATES:
warnings.warn(f"\n[CDC-WARN] Hey, the number of calculated states/jurisdictions ({num_valid} "
f"+ US) does not match that required by CDC ({COV_NUM_STATES} + US).\n")
if use_as_point is not None:
warnings.warn("HEY, use_as_point is not implemented in export_forecast_cdc.")
# --- Build list of data arrays to be concatenated in the end
# forecast_date_list = # Date of the forecast. Same for entire doc, created later.
location_list = list() # Location (state) id
target_list = list() # String describing the week of forecast
target_end_date_list = list() # Date of the forecast report
type_list = list() # Type of output: "point" or "quantile"
quantile_list = list() # q-value of the quantile
value_list = list() # Value of the forecast data.
actual_num_outp_lines = 0
# DATA UNPACKING AND COLLECTION
# ------------------------------------------------------------------------------------------------------------------
# Definition of the processing routine
# ------------------------------------
def process_location(daily_quantiles, num_q, quantile_seq, fore_time_labels, state_name):
num_lines = ndays_fore * (num_q + int(use_as_point is not None)) # Add point line, if requested
# Allocate arrays
# forecast_date_array = not needed here
location_array = np.repeat(cdc.to_loc_id[state_name], num_lines)
target_array = np.empty(num_lines, dtype=object)
target_end_date_array = np.empty(num_lines, dtype=object)
type_array = np.empty(num_lines, dtype=object)
quantile_array = np.empty(num_lines, dtype=float)
value_array = np.empty(num_lines, dtype=int)
# ---
i_line = 0
for iiday, i_day in enumerate(export_range): # Loop over forecast days
day = fore_time_labels[i_day] + pd.Timedelta(add_days, "d")
# Write data into arrays
target_array[i_line:i_line + num_q] = target_fmt.format(iiday) # Considers as 0 the first day of export range
target_end_date_array[i_line: i_line + num_q] = day.date().isoformat()
type_array[i_line: i_line + num_q] = "quantile"
quantile_array[i_line: i_line + num_q] = quantile_seq[:]
# value_array[i_line: i_line + num_q] = daily_quantiles[:, i_day] # Old way – give float values
value_array[i_line: i_line + num_q] = np.round(daily_quantiles[:, i_day]).astype(int)
i_line += num_q
# Point
if use_as_point is not None:
pass # NOT IMPLEMENTED
i_line += 1
# Store arrays
# forecast_date_list # Not needed
location_list.append(location_array)
target_list.append(target_array)
target_end_date_list.append(target_end_date_array)
type_list.append(type_array)
quantile_list.append(quantile_array)
value_list.append(value_array)
return num_lines
# Application for all states and the US
# -------------------------------------
for i_post, post in enumerate(valid_post_list): # Loop over each location
post: ForecastPost
actual_num_outp_lines += process_location(post.daily_quantiles, post.num_quantiles,
post.quantile_seq, post.fore_daily_tlabels, post.state_name)
# --- APPLY SEPARATELY FOR US
# post_samp = valid_post_list[0] # First as example, they better be the same!
actual_num_outp_lines += process_location(us.daily_quantiles, NUM_QUANTILES, CDC_QUANTILES_SEQ,
us.fore_daily_tlabels, "United States")
# Concatenate all arrays in desired order
fd = get_last_weekday_before_cov_deadline(WEEKDAY_FC_DAY)#- pd.Timedelta("1w")
# fd = get_last_weekday_before_cov_deadline(WEEKDAY_FC_DAY, now=datetime.datetime.today() - datetime.timedelta(days=1)) # FOR LATE SUBMISSION
forecast_date = np.repeat(str(fd), actual_num_outp_lines)
location = np.concatenate(location_list)
target = np.concatenate(target_list)
target_end_date = np.concatenate(target_end_date_list)
type_ = np.concatenate(type_list)
quantile = np.concatenate(quantile_list)
value = np.concatenate(value_list)
# # Final check on the number of lines in output
# expec_num_outp_lines = NUM_OUTP_LINES if use_as_point is None else NUM_OUTP_LINES_WPOINTS
# if actual_num_outp_lines != expec_num_outp_lines:
# warnings.warn("\n[CDC-WARN] Hey, total number of lines in file doesn't match the expected "
# "value.\n"
# f"Total = {actual_num_outp_lines} | Expected = {NUM_OUTP_LINES}")
# DF CONSTRUCTION AND EXPORT
# ------------------------------------------------------------------------------------------------------------------
out_df = pd.DataFrame(OrderedDict(
forecast_date=forecast_date,
location=location,
target=target,
target_end_date=target_end_date,
type=type_,
quantile=quantile,
value=value,
))
print(out_df)
os.makedirs(os.path.dirname(fname), exist_ok=True)
out_df.to_csv(fname, index=False)
def load_forecast_cdc(fname):
"""Import forecast file in the CDC format, possibly created with 'export_forecast_cdc()'. """
df = pd.read_csv(fname, header=0, parse_dates=["forecast_date", "target_end_date"])
return df
def make_state_arrays(weekly_quantiles, state_id, num_quantiles, nweeks_fore):
pass