diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index c8fb0769e..33684f596 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -86,13 +86,13 @@ def __init__( self.df, self.additive_event_and_holiday_names, self.multiplicative_event_and_holiday_names, - ) = add_event_features_to_df( + ) = self.add_event_features_to_df( self.df, self.config_events, self.config_country_holidays, ) # pre-sort additive/multiplicative regressors - self.additive_regressors_names, self.multiplicative_regressors_names = sort_regressor_names( + self.additive_regressors_names, self.multiplicative_regressors_names = self.sort_regressor_names( self.config_regressors ) @@ -130,6 +130,44 @@ def __init__( # Construct index map self.sample2index_map, self.length = self.create_sample2index_map(self.df, self.df_tensors) + if self.config_seasonality is not None and hasattr(self.config_seasonality, "periods"): + self.calculate_seasonalities() + + def calculate_seasonalities(self): + self.seasonalities = OrderedDict({}) + dates = self.df_tensors["ds"] + t = (dates - torch.tensor(datetime(1900, 1, 1).timestamp())).float() / (3600 * 24.0) + + def compute_fourier_features(t, period): + factor = 2.0 * np.pi / period.period + sin_terms = torch.sin(factor * t[:, None] * torch.arange(1, period.resolution + 1)) + cos_terms = torch.cos(factor * t[:, None] * torch.arange(1, period.resolution + 1)) + return torch.cat((sin_terms, cos_terms), dim=1) + + for name, period in self.config_seasonality.periods.items(): + if period.resolution > 0: + features = compute_fourier_features(t, period) + + if period.condition_name is not None: + condition_values = self.df_tensors[period.condition_name].unsqueeze(1) + features *= condition_values + self.seasonalities[name] = features + + def get_sample_seasonalities(self, df_tensors, origin_index, n_forecasts, max_lags, n_lags, config_seasonality): + seasonalities = OrderedDict({}) + + # Determine the range of indices based on whether lags are used + if max_lags == 0: + indices = [origin_index] + else: + indices = list(range(origin_index - n_lags + 1, origin_index + n_forecasts + 1)) + + # Extract the precomputed seasonalities from self.seasonalities + for name, features in self.seasonalities.items(): + seasonalities[name] = features[indices, :] + + return seasonalities + def __getitem__(self, index): """Overrides parent class method to get an item at index. Parameters @@ -165,7 +203,7 @@ def __getitem__(self, index): df_index = self.sample_index_to_df_index(index) # Tabularize - extract features from dataframe at given target index position - inputs, target = tabularize_univariate_datetime_single_index( + inputs, target = self.tabularize_univariate_datetime_single_index( df_tensors=self.df_tensors, origin_index=df_index, predict_mode=self.predict_mode, @@ -197,19 +235,21 @@ def create_sample2index_map(self, df, df_tensors): # Limit target range due to input lags and number of forecasts df_length = len(df_tensors["ds"]) - origin_start_end_mask = create_origin_start_end_mask( + origin_start_end_mask = self.create_origin_start_end_mask( df_length=df_length, max_lags=self.max_lags, n_forecasts=self.n_forecasts ) # Prediction Frequency # Filter missing samples and prediction frequency (does not actually drop, but creates indexmapping) - prediction_frequency_mask = create_prediction_frequency_filter_mask(df_tensors["ds"], self.prediction_frequency) + prediction_frequency_mask = self.create_prediction_frequency_filter_mask( + df_tensors["ds"], self.prediction_frequency + ) # Combine prediction origin masks valid_prediction_mask = prediction_frequency_mask & origin_start_end_mask # Create NAN-free index mapping of sample index to df index - nan_mask = create_nan_mask( + nan_mask = self.create_nan_mask( df_tensors=df_tensors, predict_mode=self.predict_mode, max_lags=self.max_lags, @@ -239,6 +279,515 @@ def create_sample2index_map(self, df, df_tensors): return sample_index_2_df_origin_index, num_samples + def log_input_shapes(self, inputs): + tabularized_input_shapes_str = "" + for key, value in inputs.items(): + if key in [ + "seasonalities", + "covariates", + "events", + "regressors", + ]: + for name, period_features in value.items(): + tabularized_input_shapes_str += f" {name} {key} {period_features.shape}\n" + else: + tabularized_input_shapes_str += f" {key} {value.shape} \n" + log.debug(f"Tabularized inputs shapes: \n{tabularized_input_shapes_str}") + + def tabularize_univariate_datetime_single_index( + self, + df_tensors: dict, + origin_index: int, + predict_mode: bool = False, + n_lags: int = 0, + max_lags: int = 0, + n_forecasts: int = 1, + config_seasonality: Optional[configure.ConfigSeasonality] = None, + config_lagged_regressors: Optional[configure.ConfigLaggedRegressors] = None, + additive_event_and_holiday_names: List[str] = [], + multiplicative_event_and_holiday_names: List[str] = [], + additive_regressors_names: List[str] = [], + multiplicative_regressors_names: List[str] = [], + ): + """Create a tabular data sample from timeseries dataframe, used for mini-batch creation. + Note + ---- + Data must have no gaps for sample extracted at given index position. + ---------- + df : pd.DataFrame + Sequence of observations with original ``ds``, ``y`` and normalized ``t``, ``y_scaled`` columns + origin_index: int: + dataframe index position of last observed lag before forecast starts. + n_forecasts : int + Number of steps to forecast into future + n_lags : int + Number of lagged values of series to include as model inputs (aka AR-order) + config_seasonality : configure.ConfigSeasonality + Configuration for seasonalities + config_lagged_regressors : configure.ConfigLaggedRegressors + Configurations for lagged regressors + config_events : configure.ConfigEvents + User specified events, each with their upper, lower windows (int) and regularization + config_country_holidays : configure.ConfigCountryHolidays + Configurations (holiday_names, upper, lower windows, regularization) for country specific holidays + config_regressors : configure.ConfigFutureRegressors + Configuration for regressors + predict_mode : bool + Chooses the prediction mode + Options + * (default) ``False``: Includes target values + * ``True``: Does not include targets but includes entire dataset as input + Returns + ------- + OrderedDict + Model inputs, each of len(df) but with varying dimensions + Note + ---- + Contains the following data: + Model Inputs + * ``time`` (np.array, float), dims: (num_samples, 1) + * ``seasonalities`` (OrderedDict), named seasonalities + each with features (np.array, float) - dims: (num_samples, n_features[name]) + * ``lags`` (np.array, float), dims: (num_samples, n_lags) + * ``covariates`` (OrderedDict), named covariates, + each with features (np.array, float) of dims: (num_samples, n_lags) + * ``events`` (OrderedDict), events, + each with features (np.array, float) of dims: (num_samples, n_lags) + * ``regressors`` (OrderedDict), regressors, + each with features (np.array, float) of dims: (num_samples, n_lags) + np.array, float + Targets to be predicted of same length as each of the model inputs, dims: (n_forecasts, 1) + """ + # TODO: pre-process all type conversions (e.g. torch.float32) in __init__ + # Note: if max_lags == 0, then n_forecasts == 1 + + # sample features are stored and returned in OrderedDict + inputs = OrderedDict({}) + + targets = self.get_sample_targets( + df_tensors=df_tensors, + origin_index=origin_index, + n_forecasts=n_forecasts, + max_lags=max_lags, + predict_mode=predict_mode, + ) + + # TIME: the time at each sample's lags and forecasts + if max_lags == 0: + t = df_tensors["t"][origin_index] + inputs["time"] = t.unsqueeze(0) + else: + # extract time value of n_lags steps before and icluding origin_index and n_forecasts steps after origin_index + # Note: df.loc is inclusive of slice end, while df.iloc is not. + t = df_tensors["t"][origin_index - n_lags + 1 : origin_index + n_forecasts + 1] + inputs["time"] = t + + # LAGS: From y-series, extract preceeding n_lags steps up to and including origin_index + if n_lags >= 1 and "y_scaled" in df_tensors: + # Note: df.loc is inclusive of slice end, while df.iloc is not. + lags = df_tensors["y_scaled"][origin_index - n_lags + 1 : origin_index + 1] + inputs["lags"] = lags + + # COVARIATES / LAGGED REGRESSORS: Lagged regressor inputs: analogous to LAGS + if config_lagged_regressors is not None: # and max_lags > 0: + inputs["covariates"] = self.get_sample_lagged_regressors( + df_tensors=df_tensors, origin_index=origin_index, config_lagged_regressors=config_lagged_regressors + ) + + # SEASONALITIES_ + if config_seasonality is not None: + inputs["seasonalities"] = self.get_sample_seasonalities( + df_tensors=df_tensors, + origin_index=origin_index, + n_forecasts=n_forecasts, + max_lags=max_lags, + n_lags=n_lags, + config_seasonality=config_seasonality, + ) + + # FUTURE REGRESSORS: get the future regressors features + # create numpy array of values of additive and multiplicative regressors, at correct indexes + # features dims: (n_forecasts, n_features) + any_future_regressors = 0 < len(additive_regressors_names + multiplicative_regressors_names) + if any_future_regressors: # if config_regressors.regressors is not None: + inputs["regressors"] = self.get_sample_future_regressors( + df_tensors=df_tensors, + origin_index=origin_index, + n_forecasts=n_forecasts, + max_lags=max_lags, + n_lags=n_lags, + additive_regressors_names=additive_regressors_names, + multiplicative_regressors_names=multiplicative_regressors_names, + ) + + # FUTURE EVENTS: get the events features + # create numpy array of values of additive and multiplicative events, at correct indexes + # features dims: (n_forecasts, n_features) + any_events = 0 < len(additive_event_and_holiday_names + multiplicative_event_and_holiday_names) + if any_events: + inputs["events"] = self.get_sample_future_events( + df_tensors=df_tensors, + origin_index=origin_index, + n_forecasts=n_forecasts, + max_lags=max_lags, + n_lags=n_lags, + additive_event_and_holiday_names=additive_event_and_holiday_names, + multiplicative_event_and_holiday_names=multiplicative_event_and_holiday_names, + ) + + # ONLY FOR DEBUGGING + # if log.level == 0: + # log_input_shapes(inputs) + return inputs, targets + + def get_event_offset_features(self, event, config, feature): + """ + Create event offset features for the given event, config and feature + Parameters + ---------- + event : str + Name of the event + config : configure.ConfigEvents + User specified events, holidays, and country specific holidays + feature : pd.Series + Feature for the event + Returns + ------- + tuple + Tuple of additive_events and multiplicative_events + """ + offsets = range(config.lower_window, config.upper_window + 1) + offset_features = pd.concat( + { + utils.create_event_names_for_offsets(event, offset): feature.shift(periods=offset, fill_value=0.0) + for offset in offsets + }, + axis=1, + ) + return offset_features + + def add_event_features_to_df( + self, + df, + config_events: Optional[configure.ConfigEvents] = None, + config_country_holidays: Optional[configure.ConfigCountryHolidays] = None, + ): + """ + Construct columns containing the features of each event, added to df. + Parameters + ---------- + df : pd.DataFrame + Dataframe with all values including the user specified events (provided by user) + config_events : configure.ConfigEvents + User specified events, each with their upper, lower windows (int), regularization + config_country_holidays : configure.ConfigCountryHolidays + Configurations (holiday_names, upper, lower windows, regularization) for country specific holidays + Returns + ------- + np.array + All additive event features (both user specified and country specific) + np.array + All multiplicative event features (both user specified and country specific) + """ + + def normalize_holiday_name(name): + # Handle cases like "Independence Day (observed)" -> "Independence Day" + return name.replace(" (observed)", "") if "(observed)" in name else name + + def add_offset_features(feature, event_name, config): + additive_names = [] + multiplicative_names = [] + for offset in range(config.lower_window, config.upper_window + 1): + event_offset_name = utils.create_event_names_for_offsets(event_name, offset) + df[event_offset_name] = feature.shift(periods=offset, fill_value=0.0) + if config.mode == "additive": + additive_names.append(event_offset_name) + else: + multiplicative_names.append(event_offset_name) + return additive_names, multiplicative_names + + # Create all additional user-specified offset events + additive_events_names = [] + multiplicative_events_names = [] + + if config_events is not None: + for event in sorted(config_events.keys()): + feature = df[event] + config = config_events[event] + additive_names, multiplicative_names = add_offset_features(feature, event, config) + additive_events_names.extend(additive_names) + multiplicative_events_names.extend(multiplicative_names) + + # Create all country-specific holidays and their offsets + additive_holiday_names = [] + multiplicative_holiday_names = [] + + if config_country_holidays is not None: + year_list = df["ds"].dt.year.unique() + country_holidays_dict = get_all_holidays(year_list, config_country_holidays.country) + config = config_country_holidays + + for holiday in config_country_holidays.holiday_names: + feature = pd.Series(np.zeros(len(df)), index=df.index, dtype=np.float32) + normalized_holiday = normalize_holiday_name(holiday) + + if normalized_holiday in country_holidays_dict: + dates = country_holidays_dict[normalized_holiday] + feature.loc[df["ds"].isin(dates)] = 1.0 + else: + raise ValueError(f"Holiday {holiday} not found in {config_country_holidays.country} holidays") + + additive_names, multiplicative_names = add_offset_features(feature, normalized_holiday, config) + additive_holiday_names.extend(additive_names) + multiplicative_holiday_names.extend(multiplicative_names) + + additive_event_and_holiday_names = sorted(additive_events_names + additive_holiday_names) + multiplicative_event_and_holiday_names = sorted(multiplicative_events_names + multiplicative_holiday_names) + + return df, additive_event_and_holiday_names, multiplicative_event_and_holiday_names + + def create_origin_start_end_mask(self, df_length, max_lags, n_forecasts): + """Creates a boolean mask for valid prediction origin positions. + (based on limiting input lags and forecast targets at start and end of df)""" + if max_lags >= 1: + start_pad = torch.zeros(max_lags - 1, dtype=torch.bool) + valid_targets = torch.ones(df_length - max_lags - n_forecasts + 1, dtype=torch.bool) + end_pad = torch.zeros(n_forecasts, dtype=torch.bool) + target_start_end_mask = torch.cat((start_pad, valid_targets, end_pad), dim=0) + elif max_lags == 0 and n_forecasts == 1: + # without lags, forecast targets and origins are identical + target_start_end_mask = torch.ones(df_length, dtype=torch.bool) + else: + raise ValueError(f"max_lags value of {max_lags} not supported for n_forecasts {n_forecasts}.") + return target_start_end_mask + + def create_prediction_frequency_filter_mask(self, timestamps, prediction_frequency=None): + """Filters prediction origin index from df based on the forecast frequency setting. + + Filter based on timestamp last lag before targets start + + Parameters + ---------- + timestamps : torch.Tensor + Tensor of timestamps in Unix epoch format + prediction_frequency : dict + periodic interval in which forecasts should be made. + Note + ---- + E.g. if prediction_frequency=7, forecasts are only made on every 7th step (once in a week in case of daily + resolution). + + Returns boolean mask where prediction origin indexes to be included are True, and the rest False. + """ + if prediction_frequency is None: + return torch.ones(len(timestamps), dtype=torch.bool) + + timestamps = pd.to_datetime(timestamps.numpy(), unit="s") + mask = torch.ones(len(timestamps), dtype=torch.bool) + + filters = { + "hourly-minute": timestamps.minute, + "daily-hour": timestamps.hour, + "weekly-day": timestamps.dayofweek, + "monthly-day": timestamps.day, + "yearly-month": timestamps.month, + } + + for key, value in prediction_frequency.items(): + if key not in filters: + raise ValueError(f"Invalid prediction frequency: {key}") + mask &= filters[key] == value + + return torch.tensor(mask, dtype=torch.bool) + + def create_nan_mask( + self, + df_tensors, + predict_mode, + max_lags, + n_lags, + n_forecasts, + config_lagged_regressors, + future_regressor_names, + event_names, + ): + """Creates mask for each prediction origin, + accounting for corresponding input lags / forecast targets containing any NaN values. + """ + tensor_length = len(df_tensors["ds"]) + valid_origins = torch.ones(tensor_length, dtype=torch.bool) + tensor_isna = {k: torch.isnan(v) for k, v in df_tensors.items()} + + # TARGETS + if predict_mode: + # Targets not needed + targets_valid = torch.ones(tensor_length, dtype=torch.bool) + else: + if max_lags == 0: # y-series and origin index match + targets_valid = ~tensor_isna["y_scaled"] + else: + if n_forecasts == 1: + targets_nan = tensor_isna["y_scaled"][1:] + targets_nan = torch.cat([targets_nan, torch.tensor([True], dtype=torch.bool)]) + targets_valid = ~targets_nan + else: # This is also correct for n_forecasts == 1, but slower. + targets_nan = sliding_window_view(tensor_isna["y_scaled"], window_shape=n_forecasts).any(axis=-1) + # first entry corresponds to origin_index -1, drop this. + targets_nan = torch.tensor(targets_nan[1:]) + # pad last n_forecasts as missing, as forecast origins will have missing forecast-targets there. + targets_nan = torch.cat([targets_nan, torch.ones(n_forecasts, dtype=torch.bool)]) + targets_valid = ~targets_nan + + valid_origins &= targets_valid + + # AR LAGS + if n_lags > 0: + # boolean vector, starting at origin_index = n_lags -1 + y_lags_nan = torch.tensor(sliding_window_view(tensor_isna["y_scaled"], window_shape=n_lags).any(axis=-1)) + # fill first n_lags -1 positions with True + # as there are missing lags for the corresponding origin_indexes + y_lags_nan = torch.cat([torch.ones(n_lags - 1, dtype=torch.bool), y_lags_nan]) + y_lags_valid = ~y_lags_nan + valid_origins &= y_lags_valid + + # LAGGED REGRESSORS + if config_lagged_regressors is not None: # and max_lags > 0: + reg_lags_valid = torch.ones(tensor_length, dtype=torch.bool) + for name, lagged_regressor in config_lagged_regressors.items(): + n_reg_lags = lagged_regressor.n_lags + if n_reg_lags > 0: + # boolean vector, starting at origin_index = n_lags -1 + reg_lags_nan = torch.tensor( + sliding_window_view(tensor_isna[name].numpy(), window_shape=n_reg_lags).any(axis=-1) + ) + # fill first n_reg_lags -1 positions with True, + # as there are missing lags for the corresponding origin_indexes + reg_lags_nan = torch.cat([torch.ones(n_reg_lags - 1, dtype=torch.bool), reg_lags_nan]) + reg_lags_valid &= ~reg_lags_nan + valid_origins &= reg_lags_valid + + # TIME: TREND & SEASONALITY: the time at each sample's lags and forecasts + # FUTURE REGRESSORS + # EVENTS + names = ["t"] + future_regressor_names + event_names + valid_columns = self.mask_origin_without_nan_for_columns(tensor_isna, names, max_lags, n_lags, n_forecasts) + valid_origins &= valid_columns + + return valid_origins + + def mask_origin_without_nan_for_columns(self, tensor_isna, names, max_lags, n_lags, n_forecasts): + contains_nan = torch.stack([tensor_isna[name] for name in names], dim=1).any(dim=1) + if max_lags > 0: + if n_lags == 0 and n_forecasts == 1: + contains_nan = contains_nan[1:] + contains_nan = torch.cat([contains_nan, torch.tensor([True], dtype=torch.bool)]) + else: + contains_nan = sliding_window_view(contains_nan.numpy(), window_shape=n_lags + n_forecasts).any(axis=-1) + # first sample is at origin_index = n_lags -1, + if n_lags == 0: # first sample origin index is at -1 + contains_nan = contains_nan[1:] + else: + contains_nan = torch.cat([torch.ones(n_lags - 1, dtype=torch.bool), torch.tensor(contains_nan)]) + # there are n_forecasts origin_indexes missing at end + contains_nan = torch.cat([torch.tensor(contains_nan), torch.ones(n_forecasts, dtype=torch.bool)]) + valid_origins = ~contains_nan + return valid_origins + + def sort_regressor_names(self, config): + additive_regressors_names = [] + multiplicative_regressors_names = [] + if config is not None and config.regressors is not None: + # sort and divide regressors into multiplicative and additive + for reg in sorted(list(config.regressors.keys())): + mode = config.regressors[reg].mode + if mode == "additive": + additive_regressors_names.append(reg) + else: + multiplicative_regressors_names.append(reg) + return additive_regressors_names, multiplicative_regressors_names + + def get_sample_targets(self, df_tensors, origin_index, n_forecasts, max_lags, predict_mode): + if predict_mode: + return torch.zeros((n_forecasts, 1), dtype=torch.float32) + else: + if n_forecasts == 1: + if max_lags == 0: + targets = df_tensors["y_scaled"][origin_index] + if max_lags > 0: + targets = df_tensors["y_scaled"][origin_index + 1] + targets = targets.unsqueeze(0).unsqueeze(1) + else: + targets = df_tensors["y_scaled"][origin_index + 1 : origin_index + n_forecasts + 1] + targets = targets.unsqueeze(1) + return targets + + def get_sample_lagged_regressors(self, df_tensors, origin_index, config_lagged_regressors): + lagged_regressors = OrderedDict({}) + # Future TODO: optimize this computation for many lagged_regressors + for name, lagged_regressor in config_lagged_regressors.items(): + covar_lags = lagged_regressor.n_lags + assert covar_lags > 0 + # Indexing tensors instead of DataFrame + lagged_regressors[name] = df_tensors[name][origin_index - covar_lags + 1 : origin_index + 1] + return lagged_regressors + + def get_sample_future_regressors( + self, + df_tensors, + origin_index, + n_forecasts, + max_lags, + n_lags, + additive_regressors_names, + multiplicative_regressors_names, + ): + regressors = OrderedDict({}) + if max_lags == 0: + if additive_regressors_names: + regressors["additive"] = df_tensors["additive_regressors"][origin_index, :].unsqueeze(0) + + if multiplicative_regressors_names: + regressors["multiplicative"] = df_tensors["multiplicative_regressors"][origin_index, :].unsqueeze(0) + + else: + if additive_regressors_names: + regressors["additive"] = df_tensors["additive_regressors"][ + origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : + ] + if multiplicative_regressors_names: + regressors["multiplicative"] = df_tensors["multiplicative_regressors"][ + origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : + ] + + return regressors + + def get_sample_future_events( + self, + df_tensors, + origin_index, + n_forecasts, + max_lags, + n_lags, + additive_event_and_holiday_names, + multiplicative_event_and_holiday_names, + ): + events = OrderedDict({}) + if max_lags == 0: + if additive_event_and_holiday_names: + events["additive"] = df_tensors["additive_event_and_holiday"][origin_index, :].unsqueeze(0) + if multiplicative_event_and_holiday_names: + events["multiplicative"] = df_tensors["multiplicative_event_and_holiday"][origin_index, :].unsqueeze(0) + else: + if additive_event_and_holiday_names: + events["additive"] = df_tensors["additive_event_and_holiday"][ + origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : + ] + if multiplicative_event_and_holiday_names: + events["multiplicative"] = df_tensors["multiplicative_event_and_holiday"][ + origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : + ] + return events + class GlobalTimeDataset(TimeDataset): def __init__( @@ -305,278 +854,6 @@ def __getitem__(self, idx): return self.datasets[df_name].__getitem__(local_pos) -def get_sample_targets(df_tensors, origin_index, n_forecasts, max_lags, predict_mode): - if predict_mode: - return torch.zeros((n_forecasts, 1), dtype=torch.float32) - else: - if n_forecasts == 1: - if max_lags == 0: - targets = df_tensors["y_scaled"][origin_index] - if max_lags > 0: - targets = df_tensors["y_scaled"][origin_index + 1] - targets = targets.unsqueeze(0).unsqueeze(1) - else: - targets = df_tensors["y_scaled"][origin_index + 1 : origin_index + n_forecasts + 1] - targets = targets.unsqueeze(1) - return targets - - -def get_sample_lagged_regressors(df_tensors, origin_index, config_lagged_regressors): - lagged_regressors = OrderedDict({}) - # Future TODO: optimize this computation for many lagged_regressors - for name, lagged_regressor in config_lagged_regressors.items(): - covar_lags = lagged_regressor.n_lags - assert covar_lags > 0 - # Indexing tensors instead of DataFrame - lagged_regressors[name] = df_tensors[name][origin_index - covar_lags + 1 : origin_index + 1] - return lagged_regressors - - -def get_sample_seasonalities(df_tensors, origin_index, n_forecasts, max_lags, n_lags, config_seasonality): - - seasonalities = OrderedDict({}) - if max_lags == 0: - dates = df_tensors["ds"][origin_index].unsqueeze(0) - else: - dates = df_tensors["ds"][origin_index - n_lags + 1 : origin_index + n_forecasts + 1] - - t = (dates - torch.tensor(datetime(1900, 1, 1).timestamp())).float() / (3600 * 24.0) - - for name, period in config_seasonality.periods.items(): - if period.resolution > 0: - if config_seasonality.computation == "fourier": - factor = 2.0 * np.pi * t[:, None] / period.period - sin_terms = torch.sin(factor * torch.arange(1, period.resolution + 1)) - cos_terms = torch.cos(factor * torch.arange(1, period.resolution + 1)) - features = torch.cat((sin_terms, cos_terms), dim=1) - else: - raise NotImplementedError - - if period.condition_name is not None: - if max_lags == 0: - condition_values = df_tensors[period.condition_name][origin_index].unsqueeze(0).unsqueeze(1) - else: - condition_values = df_tensors[period.condition_name][ - origin_index - n_lags + 1 : origin_index + n_forecasts + 1 - ].unsqueeze(1) - features = features * condition_values - seasonalities[name] = features - return seasonalities - - -def get_sample_future_regressors( - df_tensors, origin_index, n_forecasts, max_lags, n_lags, additive_regressors_names, multiplicative_regressors_names -): - regressors = OrderedDict({}) - if max_lags == 0: - if additive_regressors_names: - regressors["additive"] = df_tensors["additive_regressors"][origin_index, :].unsqueeze(0) - - if multiplicative_regressors_names: - regressors["multiplicative"] = df_tensors["multiplicative_regressors"][origin_index, :].unsqueeze(0) - - else: - if additive_regressors_names: - regressors["additive"] = df_tensors["additive_regressors"][ - origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : - ] - if multiplicative_regressors_names: - regressors["multiplicative"] = df_tensors["multiplicative_regressors"][ - origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : - ] - - return regressors - - -def get_sample_future_events( - df_tensors, - origin_index, - n_forecasts, - max_lags, - n_lags, - additive_event_and_holiday_names, - multiplicative_event_and_holiday_names, -): - events = OrderedDict({}) - if max_lags == 0: - if additive_event_and_holiday_names: - events["additive"] = df_tensors["additive_event_and_holiday"][origin_index, :].unsqueeze(0) - if multiplicative_event_and_holiday_names: - events["multiplicative"] = df_tensors["multiplicative_event_and_holiday"][origin_index, :].unsqueeze(0) - else: - if additive_event_and_holiday_names: - events["additive"] = df_tensors["additive_event_and_holiday"][ - origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : - ] - if multiplicative_event_and_holiday_names: - events["multiplicative"] = df_tensors["multiplicative_event_and_holiday"][ - origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : - ] - return events - - -def log_input_shapes(inputs): - tabularized_input_shapes_str = "" - for key, value in inputs.items(): - if key in [ - "seasonalities", - "covariates", - "events", - "regressors", - ]: - for name, period_features in value.items(): - tabularized_input_shapes_str += f" {name} {key} {period_features.shape}\n" - else: - tabularized_input_shapes_str += f" {key} {value.shape} \n" - log.debug(f"Tabularized inputs shapes: \n{tabularized_input_shapes_str}") - - -def tabularize_univariate_datetime_single_index( - df_tensors: dict, - origin_index: int, - predict_mode: bool = False, - n_lags: int = 0, - max_lags: int = 0, - n_forecasts: int = 1, - config_seasonality: Optional[configure.ConfigSeasonality] = None, - config_lagged_regressors: Optional[configure.ConfigLaggedRegressors] = None, - additive_event_and_holiday_names: List[str] = [], - multiplicative_event_and_holiday_names: List[str] = [], - additive_regressors_names: List[str] = [], - multiplicative_regressors_names: List[str] = [], -): - """Create a tabular data sample from timeseries dataframe, used for mini-batch creation. - Note - ---- - Data must have no gaps for sample extracted at given index position. - ---------- - df : pd.DataFrame - Sequence of observations with original ``ds``, ``y`` and normalized ``t``, ``y_scaled`` columns - origin_index: int: - dataframe index position of last observed lag before forecast starts. - n_forecasts : int - Number of steps to forecast into future - n_lags : int - Number of lagged values of series to include as model inputs (aka AR-order) - config_seasonality : configure.ConfigSeasonality - Configuration for seasonalities - config_lagged_regressors : configure.ConfigLaggedRegressors - Configurations for lagged regressors - config_events : configure.ConfigEvents - User specified events, each with their upper, lower windows (int) and regularization - config_country_holidays : configure.ConfigCountryHolidays - Configurations (holiday_names, upper, lower windows, regularization) for country specific holidays - config_regressors : configure.ConfigFutureRegressors - Configuration for regressors - predict_mode : bool - Chooses the prediction mode - Options - * (default) ``False``: Includes target values - * ``True``: Does not include targets but includes entire dataset as input - Returns - ------- - OrderedDict - Model inputs, each of len(df) but with varying dimensions - Note - ---- - Contains the following data: - Model Inputs - * ``time`` (np.array, float), dims: (num_samples, 1) - * ``seasonalities`` (OrderedDict), named seasonalities - each with features (np.array, float) - dims: (num_samples, n_features[name]) - * ``lags`` (np.array, float), dims: (num_samples, n_lags) - * ``covariates`` (OrderedDict), named covariates, - each with features (np.array, float) of dims: (num_samples, n_lags) - * ``events`` (OrderedDict), events, - each with features (np.array, float) of dims: (num_samples, n_lags) - * ``regressors`` (OrderedDict), regressors, - each with features (np.array, float) of dims: (num_samples, n_lags) - np.array, float - Targets to be predicted of same length as each of the model inputs, dims: (n_forecasts, 1) - """ - # TODO: pre-process all type conversions (e.g. torch.float32) in __init__ - # Note: if max_lags == 0, then n_forecasts == 1 - - # sample features are stored and returned in OrderedDict - inputs = OrderedDict({}) - - targets = get_sample_targets( - df_tensors=df_tensors, - origin_index=origin_index, - n_forecasts=n_forecasts, - max_lags=max_lags, - predict_mode=predict_mode, - ) - - # TIME: the time at each sample's lags and forecasts - if max_lags == 0: - t = df_tensors["t"][origin_index] - inputs["time"] = t.unsqueeze(0) - else: - # extract time value of n_lags steps before and icluding origin_index and n_forecasts steps after origin_index - # Note: df.loc is inclusive of slice end, while df.iloc is not. - t = df_tensors["t"][origin_index - n_lags + 1 : origin_index + n_forecasts + 1] - inputs["time"] = t - - # LAGS: From y-series, extract preceeding n_lags steps up to and including origin_index - if n_lags >= 1 and "y_scaled" in df_tensors: - # Note: df.loc is inclusive of slice end, while df.iloc is not. - lags = df_tensors["y_scaled"][origin_index - n_lags + 1 : origin_index + 1] - inputs["lags"] = lags - - # COVARIATES / LAGGED REGRESSORS: Lagged regressor inputs: analogous to LAGS - if config_lagged_regressors is not None: # and max_lags > 0: - inputs["covariates"] = get_sample_lagged_regressors( - df_tensors=df_tensors, origin_index=origin_index, config_lagged_regressors=config_lagged_regressors - ) - - # SEASONALITIES_ - if config_seasonality is not None: - inputs["seasonalities"] = get_sample_seasonalities( - df_tensors=df_tensors, - origin_index=origin_index, - n_forecasts=n_forecasts, - max_lags=max_lags, - n_lags=n_lags, - config_seasonality=config_seasonality, - ) - - # FUTURE REGRESSORS: get the future regressors features - # create numpy array of values of additive and multiplicative regressors, at correct indexes - # features dims: (n_forecasts, n_features) - any_future_regressors = 0 < len(additive_regressors_names + multiplicative_regressors_names) - if any_future_regressors: # if config_regressors.regressors is not None: - inputs["regressors"] = get_sample_future_regressors( - df_tensors=df_tensors, - origin_index=origin_index, - n_forecasts=n_forecasts, - max_lags=max_lags, - n_lags=n_lags, - additive_regressors_names=additive_regressors_names, - multiplicative_regressors_names=multiplicative_regressors_names, - ) - - # FUTURE EVENTS: get the events features - # create numpy array of values of additive and multiplicative events, at correct indexes - # features dims: (n_forecasts, n_features) - any_events = 0 < len(additive_event_and_holiday_names + multiplicative_event_and_holiday_names) - if any_events: - inputs["events"] = get_sample_future_events( - df_tensors=df_tensors, - origin_index=origin_index, - n_forecasts=n_forecasts, - max_lags=max_lags, - n_lags=n_lags, - additive_event_and_holiday_names=additive_event_and_holiday_names, - multiplicative_event_and_holiday_names=multiplicative_event_and_holiday_names, - ) - - # ONLY FOR DEBUGGING - # if log.level == 0: - # log_input_shapes(inputs) - return inputs, targets - - def fourier_series(dates, period, series_order): """Provides Fourier series components with the specified frequency and order. Note @@ -622,274 +899,3 @@ def fourier_series_t(t, period, series_order): [fun((2.0 * (i + 1) * np.pi * t / period)) for i in range(series_order) for fun in (np.sin, np.cos)] ) return features - - -def get_event_offset_features(event, config, feature): - """ - Create event offset features for the given event, config and feature - Parameters - ---------- - event : str - Name of the event - config : configure.ConfigEvents - User specified events, holidays, and country specific holidays - feature : pd.Series - Feature for the event - Returns - ------- - tuple - Tuple of additive_events and multiplicative_events - """ - offsets = range(config.lower_window, config.upper_window + 1) - offset_features = pd.concat( - { - utils.create_event_names_for_offsets(event, offset): feature.shift(periods=offset, fill_value=0.0) - for offset in offsets - }, - axis=1, - ) - return offset_features - - -def add_event_features_to_df( - df, - config_events: Optional[configure.ConfigEvents] = None, - config_country_holidays: Optional[configure.ConfigCountryHolidays] = None, -): - """ - Construct columns containing the features of each event, added to df. - Parameters - ---------- - df : pd.DataFrame - Dataframe with all values including the user specified events (provided by user) - config_events : configure.ConfigEvents - User specified events, each with their upper, lower windows (int), regularization - config_country_holidays : configure.ConfigCountryHolidays - Configurations (holiday_names, upper, lower windows, regularization) for country specific holidays - Returns - ------- - np.array - All additive event features (both user specified and country specific) - np.array - All multiplicative event features (both user specified and country specific) - """ - - def normalize_holiday_name(name): - # Handle cases like "Independence Day (observed)" -> "Independence Day" - return name.replace(" (observed)", "") if "(observed)" in name else name - - def add_offset_features(feature, event_name, config): - additive_names = [] - multiplicative_names = [] - for offset in range(config.lower_window, config.upper_window + 1): - event_offset_name = utils.create_event_names_for_offsets(event_name, offset) - df[event_offset_name] = feature.shift(periods=offset, fill_value=0.0) - if config.mode == "additive": - additive_names.append(event_offset_name) - else: - multiplicative_names.append(event_offset_name) - return additive_names, multiplicative_names - - # Create all additional user-specified offset events - additive_events_names = [] - multiplicative_events_names = [] - - if config_events is not None: - for event in sorted(config_events.keys()): - feature = df[event] - config = config_events[event] - additive_names, multiplicative_names = add_offset_features(feature, event, config) - additive_events_names.extend(additive_names) - multiplicative_events_names.extend(multiplicative_names) - - # Create all country-specific holidays and their offsets - additive_holiday_names = [] - multiplicative_holiday_names = [] - - if config_country_holidays is not None: - year_list = df["ds"].dt.year.unique() - country_holidays_dict = get_all_holidays(year_list, config_country_holidays.country) - config = config_country_holidays - - for holiday in config_country_holidays.holiday_names: - feature = pd.Series(np.zeros(len(df)), index=df.index, dtype=np.float32) - normalized_holiday = normalize_holiday_name(holiday) - - if normalized_holiday in country_holidays_dict: - dates = country_holidays_dict[normalized_holiday] - feature.loc[df["ds"].isin(dates)] = 1.0 - else: - raise ValueError(f"Holiday {holiday} not found in {config_country_holidays.country} holidays") - - additive_names, multiplicative_names = add_offset_features(feature, normalized_holiday, config) - additive_holiday_names.extend(additive_names) - multiplicative_holiday_names.extend(multiplicative_names) - - additive_event_and_holiday_names = sorted(additive_events_names + additive_holiday_names) - multiplicative_event_and_holiday_names = sorted(multiplicative_events_names + multiplicative_holiday_names) - - return df, additive_event_and_holiday_names, multiplicative_event_and_holiday_names - - -def create_origin_start_end_mask(df_length, max_lags, n_forecasts): - """Creates a boolean mask for valid prediction origin positions. - (based on limiting input lags and forecast targets at start and end of df)""" - if max_lags >= 1: - start_pad = torch.zeros(max_lags - 1, dtype=torch.bool) - valid_targets = torch.ones(df_length - max_lags - n_forecasts + 1, dtype=torch.bool) - end_pad = torch.zeros(n_forecasts, dtype=torch.bool) - target_start_end_mask = torch.cat((start_pad, valid_targets, end_pad), dim=0) - elif max_lags == 0 and n_forecasts == 1: - # without lags, forecast targets and origins are identical - target_start_end_mask = torch.ones(df_length, dtype=torch.bool) - else: - raise ValueError(f"max_lags value of {max_lags} not supported for n_forecasts {n_forecasts}.") - return target_start_end_mask - - -def create_prediction_frequency_filter_mask(timestamps, prediction_frequency=None): - """Filters prediction origin index from df based on the forecast frequency setting. - - Filter based on timestamp last lag before targets start - - Parameters - ---------- - timestamps : torch.Tensor - Tensor of timestamps in Unix epoch format - prediction_frequency : dict - periodic interval in which forecasts should be made. - Note - ---- - E.g. if prediction_frequency=7, forecasts are only made on every 7th step (once in a week in case of daily - resolution). - - Returns boolean mask where prediction origin indexes to be included are True, and the rest False. - """ - if prediction_frequency is None: - return torch.ones(len(timestamps), dtype=torch.bool) - - timestamps = pd.to_datetime(timestamps.numpy(), unit="s") - mask = torch.ones(len(timestamps), dtype=torch.bool) - - filters = { - "hourly-minute": timestamps.minute, - "daily-hour": timestamps.hour, - "weekly-day": timestamps.dayofweek, - "monthly-day": timestamps.day, - "yearly-month": timestamps.month, - } - - for key, value in prediction_frequency.items(): - if key not in filters: - raise ValueError(f"Invalid prediction frequency: {key}") - mask &= filters[key] == value - - return torch.tensor(mask, dtype=torch.bool) - - -def create_nan_mask( - df_tensors, - predict_mode, - max_lags, - n_lags, - n_forecasts, - config_lagged_regressors, - future_regressor_names, - event_names, -): - """Creates mask for each prediction origin, - accounting for corresponding input lags / forecast targets containing any NaN values. - """ - tensor_length = len(df_tensors["ds"]) - valid_origins = torch.ones(tensor_length, dtype=torch.bool) - tensor_isna = {k: torch.isnan(v) for k, v in df_tensors.items()} - - # TARGETS - if predict_mode: - # Targets not needed - targets_valid = torch.ones(tensor_length, dtype=torch.bool) - else: - if max_lags == 0: # y-series and origin index match - targets_valid = ~tensor_isna["y_scaled"] - else: - if n_forecasts == 1: - targets_nan = tensor_isna["y_scaled"][1:] - targets_nan = torch.cat([targets_nan, torch.tensor([True], dtype=torch.bool)]) - targets_valid = ~targets_nan - else: # This is also correct for n_forecasts == 1, but slower. - targets_nan = sliding_window_view(tensor_isna["y_scaled"], window_shape=n_forecasts).any(axis=-1) - # first entry corresponds to origin_index -1, drop this. - targets_nan = torch.tensor(targets_nan[1:]) - # pad last n_forecasts as missing, as forecast origins will have missing forecast-targets there. - targets_nan = torch.cat([targets_nan, torch.ones(n_forecasts, dtype=torch.bool)]) - targets_valid = ~targets_nan - - valid_origins &= targets_valid - - # AR LAGS - if n_lags > 0: - # boolean vector, starting at origin_index = n_lags -1 - y_lags_nan = torch.tensor(sliding_window_view(tensor_isna["y_scaled"], window_shape=n_lags).any(axis=-1)) - # fill first n_lags -1 positions with True - # as there are missing lags for the corresponding origin_indexes - y_lags_nan = torch.cat([torch.ones(n_lags - 1, dtype=torch.bool), y_lags_nan]) - y_lags_valid = ~y_lags_nan - valid_origins &= y_lags_valid - - # LAGGED REGRESSORS - if config_lagged_regressors is not None: # and max_lags > 0: - reg_lags_valid = torch.ones(tensor_length, dtype=torch.bool) - for name, lagged_regressor in config_lagged_regressors.items(): - n_reg_lags = lagged_regressor.n_lags - if n_reg_lags > 0: - # boolean vector, starting at origin_index = n_lags -1 - reg_lags_nan = torch.tensor( - sliding_window_view(tensor_isna[name].numpy(), window_shape=n_reg_lags).any(axis=-1) - ) - # fill first n_reg_lags -1 positions with True, - # as there are missing lags for the corresponding origin_indexes - reg_lags_nan = torch.cat([torch.ones(n_reg_lags - 1, dtype=torch.bool), reg_lags_nan]) - reg_lags_valid &= ~reg_lags_nan - valid_origins &= reg_lags_valid - - # TIME: TREND & SEASONALITY: the time at each sample's lags and forecasts - # FUTURE REGRESSORS - # EVENTS - names = ["t"] + future_regressor_names + event_names - valid_columns = mask_origin_without_nan_for_columns(tensor_isna, names, max_lags, n_lags, n_forecasts) - valid_origins &= valid_columns - - return valid_origins - - -def mask_origin_without_nan_for_columns(tensor_isna, names, max_lags, n_lags, n_forecasts): - contains_nan = torch.stack([tensor_isna[name] for name in names], dim=1).any(dim=1) - if max_lags > 0: - if n_lags == 0 and n_forecasts == 1: - contains_nan = contains_nan[1:] - contains_nan = torch.cat([contains_nan, torch.tensor([True], dtype=torch.bool)]) - else: - contains_nan = sliding_window_view(contains_nan.numpy(), window_shape=n_lags + n_forecasts).any(axis=-1) - # first sample is at origin_index = n_lags -1, - if n_lags == 0: # first sample origin index is at -1 - contains_nan = contains_nan[1:] - else: - contains_nan = torch.cat([torch.ones(n_lags - 1, dtype=torch.bool), torch.tensor(contains_nan)]) - # there are n_forecasts origin_indexes missing at end - contains_nan = torch.cat([torch.tensor(contains_nan), torch.ones(n_forecasts, dtype=torch.bool)]) - valid_origins = ~contains_nan - return valid_origins - - -def sort_regressor_names(config): - additive_regressors_names = [] - multiplicative_regressors_names = [] - if config is not None and config.regressors is not None: - # sort and divide regressors into multiplicative and additive - for reg in sorted(list(config.regressors.keys())): - mode = config.regressors[reg].mode - if mode == "additive": - additive_regressors_names.append(reg) - else: - multiplicative_regressors_names.append(reg) - return additive_regressors_names, multiplicative_regressors_names