Favorita Dataset Creation Example¶
This tutorial demonstrates the creation of a dataset in a format that can be fed to the TemporalFusionTransformer
. It does not demonstrate direct usage of the implementation suggested in this package, but merely suggesting a methodology for generating a suitable dataset.
Importing the required libraries¶
[ ]:
import os
import glob
import pickle
from datetime import datetime
from pathlib import Path
from tqdm import tqdm
import numpy as np
import pandas as pd
import pandas.api.types as ptypes
import matplotlib.pyplot as plt
from sklearn.preprocessing import QuantileTransformer, LabelEncoder, StandardScaler, MinMaxScaler
Configuration details¶
Set the path to the directory containing the CSV files, as well as the output path, where the processed dataset will be stored:
[ ]:
data_path = os.path.abspath('.../data/favorita/raw')
# set parent directory as the output path
output_path = Path(data_path).parent.absolute()
Set the time boundaries according to which the data will be generated:
[ ]:
# No records will be considered outside these bounds
start_date = datetime(2013, 7, 1)
end_date = datetime(2017, 4, 1)
# Where training period ends and the validation period begins
validation_bound = datetime(2016, 7, 1)
We also need to set what is the historical scope, in terms of temporal steps, to conisder for each observation, as well as the maximal horizon (in time-steps) for which the prediction will be required, for each observation. In our case, each observation corresponds to a time-series, and the time-steps correspond to days.
[ ]:
history_len = 90 # historical scope in time-steps
future_len = 30 # futuristic scope in time-steps
One more temporal configuration argument we need to set is the sampling interval; In order ease the processing of the dataset, we generate a new shiny time-series, spaced with samp_interval
steps from the adjacent time-series (imposing some overlap between adjacent observations).
[ ]:
samp_interval = 5 # time-steps
Attributes configuration¶
For the multi-horizon forecasting scenario, we consider three primary channels of information flowing into the model as input data: * past/historical temporal information, which is the observed time-series. * static information, in the form of non-temporal attributes associated with the observation. * futuristic temporal information, which is known in advance, for each of the horizons we are about the predict.
Each of this channels can be composed of numeric variables, and from categorical variables. In the following sections, one can find the specification of each attribute’s assignment:
[ ]:
# These are the variables that are known in advance, and will compose the futuristic time-series
known_attrs = ['onpromotion',
'day_of_week',
'day_of_month',
'month',
'national_holiday',
'regional_holiday',
'local_holiday',
'open'
]
# The following set of variables will be considered as static, i.e. containing non-temporal information
# every attribute which is not listed here will be considered as temporal.
static_attrs = ['item_nbr',
'store_nbr',
'city',
'state',
'store_type',
'store_cluster',
'item_family',
'item_class',
'perishable',
]
# The following set of variables will be considered as categorical.
# The rest of the variables (which are not listed below) will be considered as numeric.
categorical_attrs = ['item_nbr',
'store_nbr',
'city',
'state',
'store_type',
'store_cluster',
'item_family',
'item_class',
'perishable',
'onpromotion',
'open',
'day_of_week',
'month',
'national_holiday',
'regional_holiday',
'local_holiday',
]
We also need to specify which of the attributes represents the signal we would like to predict into the future:
[ ]:
target_signal = 'log_sales'
and a list of variables which are not to be considered as actual features - these can be the time index associated with each record, the ID associated with the observation, or features that are already represented by some other variable:
[ ]:
# these will not be included as part of the input data which will end up feeding the model
meta_attrs = ['date', 'combination_id', 'temporal_id', 'unit_sales']
Data Loading¶
Listing the relevant files:
[ ]:
file_names = [os.path.basename(f) for f in glob.glob(os.path.join(data_path, '*.{}'.format('csv')))]
print(file_names)
Load the CSV files:
[ ]:
transactions_df = pd.read_csv(os.path.join(data_path, 'transactions.csv'), parse_dates=['date'],
infer_datetime_format=True)
items_df = pd.read_csv(os.path.join(data_path, 'items.csv'), index_col='item_nbr')
oil_df = pd.read_csv(os.path.join(data_path, 'oil.csv'), parse_dates=['date'], infer_datetime_format=True,
index_col='date')
holiday_df = pd.read_csv(os.path.join(data_path, 'holidays_events.csv'), parse_dates=['date'],
infer_datetime_format=True,
dtype={'transferred': bool})
stores_df = pd.read_csv(os.path.join(data_path, 'stores.csv'), index_col='store_nbr')
data_df = pd.read_csv(os.path.join(data_path, 'train.csv'),
dtype={'onpromotion': object},
index_col='id',
parse_dates=['date'], infer_datetime_format=True)
# we will not use the test data in this demonstration -
# the entire dataset will be created using the 'train.csv' file.
test_df = pd.read_csv(os.path.join(data_path, 'test.csv'),
index_col='id',
parse_dates=['date'], infer_datetime_format=True)
and fix nulls on the onpromotion
indicator, transforming this attribute to bool
type.
[ ]:
if ptypes.is_object_dtype(data_df['onpromotion']):
data_df['onpromotion'] = data_df['onpromotion'] == 'True'
Some of the columns are renamed for better clarity when the information will be gathered from the various sources:
[ ]:
stores_df.rename(columns={'type': 'store_type', 'cluster': 'store_cluster'}, inplace=True)
items_df.rename(columns={'class': 'item_class', 'family': 'item_family'}, inplace=True)
oil_df.rename(columns={'dcoilwtico': 'oil_price'}, inplace=True)
holiday_df.rename(columns={'type': 'holiday_type'}, inplace=True)
And the oil price is interpolated (method='ffill'
) before we associate it with the other temporal records.
[ ]:
# Lose the null records on the raw dataframe representing oil prices
oil_df = oil_df.loc[~oil_df.oil_price.isna()]
oil_df = oil_df.resample('1d').ffill().reset_index()
Filter, Maniplate & Resample¶
Filter¶
Before merging and joining the other sources of data into the data_df
, which represents the primary source of data associated with each observation, we restrict it and filter in order to keep only the records within the boundaries we set above.
[ ]:
data_df = data_df.loc[(data_df['date'] >= start_date) & (data_df['date'] <= end_date)]
Manipulate¶
In the dataset we’re dealing with, each time-series is associated with two primary entities: * the selling store * the sold product
Hence, the following snippet will generate an ID, combination_id
, which will identify the combination of a specific store and a specific product.
[ ]:
data_df = data_df.assign(combination_id=data_df['store_nbr'].apply(str) + '_' + data_df['item_nbr'].apply(str))
# another index can be used to identify the unique combination of (store,product,date)
data_df = data_df.assign(temporal_id=data_df['combination_id'] + '_' + data_df['date'].dt.strftime('%Y-%m-%d'))
In addition, we discard (store,item) combinations with negative sales observed:
[ ]:
# for each combination, we calculate the minimal unit_sales value
min_sales = data_df.groupby('combination_id', as_index=False)['unit_sales'].min()
# keep only combination with non-negative sales.
data_df = data_df.loc[data_df['combination_id'].isin(min_sales.loc[min_sales.unit_sales >= 0, 'combination_id'])]
And mark all the existing records as days in which the relevant stores were open:
[ ]:
# mark all the existing records as days in which the relevant stores were open
data_df = data_df.assign(open=1)
Temporal resampling of each combination (1 days interval)¶
We are generating a dense sequence of records for each combination by resamplings, so that for each step in the time-window covered by this sequence, there will be a corresponding records. The records generated by this resampling procedure, will be considered as days where the specific store was closed (open=False
). Note: As part of the resampling, we also assign a new column to contain log_sales
which will be our target signal.
[ ]:
sequence_per_combination = [] # a list to contain all the resampled sequences
# for each combination
for comb_id, comb_df in tqdm(data_df.groupby('combination_id')):
resamp_seq = comb_df.copy()
resamp_seq = resamp_seq.set_index('date').resample('1d').last().reset_index()
resamp_seq['log_sales'] = np.log10(1 + resamp_seq['unit_sales'])
# newly generated records are assumed to be days in which the store was not open
resamp_seq['open'] = resamp_seq['open'].fillna(0)
# pad with the corresponding information according to the previously available record
for col in ['store_nbr', 'item_nbr', 'onpromotion']:
resamp_seq[col] = resamp_seq[col].fillna(method='ffill')
sequence_per_combination.append(resamp_seq)
# combine all the resampled sequences
data_df = pd.concat(sequence_per_combination, axis=0)
Gathering Information¶
Before merging the other sources, we can add some time-related information using the specified date associated with each record:
[ ]:
data_df['day_of_week'] = pd.to_datetime(data_df['date'].values).dayofweek
data_df['day_of_month'] = pd.to_datetime(data_df['date'].values).day
data_df['month'] = pd.to_datetime(data_df['date'].values).month
Merging with other sources¶
Adding the metadata associated with each store and item:
[ ]:
data_df = data_df.merge(stores_df, how='left', on='store_nbr')
data_df = data_df.merge(items_df, how='left', on='item_nbr')
Adding the holiday-related information associated with each date:
[ ]:
# we'll ignore holidays that were "transferred"
holiday_df = holiday_df.loc[~holiday_df.transferred]
# National holidays will mark every relevant record (by date)
data_df = data_df.assign(national_holiday=data_df.merge(holiday_df.loc[holiday_df.locale == 'National'],
on='date', how='left')['description'].fillna('None')
)
# Regional holidays will mark every relevant record (by date and state)
data_df = data_df.assign(regional_holiday=data_df.merge(holiday_df.loc[holiday_df.locale == 'Regional'],
left_on=['date', 'state'],
right_on=['date', 'locale_name'],
how='left'
)['description'].fillna('None')
)
# Local holidays will mark every relevant record (by date and city)
data_df = data_df.assign(local_holiday=data_df.merge(holiday_df.loc[holiday_df.locale == 'Local'],
left_on=['date', 'city'],
right_on=['date', 'locale_name'],
how='left'
)['description'].fillna('None')
)
Finally, we’re merging the transactions data, as well as the oil price data:
[ ]:
data_df = data_df.merge(transactions_df, how='left', on=['date', 'store_nbr'])
data_df['transactions'] = data_df['transactions'].fillna(-1)
data_df = data_df.merge(oil_df, on='date', how='left')
Inferring Composition¶
Now that the entire dataset is composed, we’ll use the attributes-related configuration we’ve set above. The complete feature set is retrieved by filtering out the meta_attrs
from the columns list:
[ ]:
all_cols = list(data_df.columns)
feature_cols = [col for col in all_cols if col not in meta_attrs]
Then, we’ll create the list of attributes for each channel of input. We’ll need such list for each combination of (static/historical/futuristic) and (numeric/categorical):
[ ]:
feature_map = {
'static_feats_numeric': [col for col in feature_cols if col in static_attrs and col not in categorical_attrs],
'static_feats_categorical': [col for col in feature_cols if col in static_attrs and col in categorical_attrs],
'historical_ts_numeric': [col for col in feature_cols if col not in static_attrs and col not in categorical_attrs],
'historical_ts_categorical': [col for col in feature_cols if col not in static_attrs and col in categorical_attrs],
'future_ts_numeric': [col for col in feature_cols if col in known_attrs and col not in categorical_attrs],
'future_ts_categorical': [col for col in feature_cols if col in known_attrs and col in categorical_attrs]
}
Data Scaling¶
We would like all of the input variables fed to the model to have similar scales. Hence, each variable will be scaled (if it is numeric) or encoded (in case it is categorical).
[ ]:
# allocate a dictionary to contain the scaler and encoder objects after fitting them
scalers = {'numeric': dict(), 'categorical': dict()}
# for the categorical variables we would like to keep the cardinalities (how many categories for each variable)
categorical_cardinalities = dict()
The scalers/encoders are fit according to the training set/period.
[ ]:
# take only the the train time range
only_train = data_df.loc[data_df['date'] < validation_bound]
Note: The specific scaling method for each numeric variable was selected after examining its distribution.
Fitting the scalers/encoders¶
[ ]:
for col in tqdm(feature_cols):
if col in categorical_attrs:
scalers['categorical'][col] = LabelEncoder().fit(only_train[col].values)
categorical_cardinalities[col] = only_train[col].nunique()
else:
if col in ['log_sales']:
scalers['numeric'][col] = StandardScaler().fit(only_train[col].values.astype(float).reshape(-1, 1))
elif col in ['day_of_month']:
scalers['numeric'][col] = MinMaxScaler().fit(only_train[col].values.astype(float).reshape(-1, 1))
else:
scalers['numeric'][col] = QuantileTransformer(n_quantiles=256).fit(
only_train[col].values.astype(float).reshape(-1, 1))
Transform by Applying Scalers¶
After fitting the scalers and the encoders we apply them in order to the transform the entire dataset. Note that some categories appearing in the complete dataset, might not be “familiar” to the associated label encoder. Such keys will be mapped to a new ordinal label.
[ ]:
for col in tqdm(feature_cols):
if col in categorical_attrs:
le = scalers['categorical'][col]
# handle cases with unseen keys
le_dict = dict(zip(le.classes_, le.transform(le.classes_)))
data_df[col] = data_df[col].apply(lambda x: le_dict.get(x, max(le.transform(le.classes_)) + 1))
data_df[col] = data_df[col].astype(np.int32)
else:
data_df[col] = scalers['numeric'][col].transform(data_df[col].values.reshape(-1, 1)).squeeze()
data_df[col] = data_df[col].astype(np.float32)
After performing the transformations above, and in order to avoid null records on the target variable, we impute target signal.
[ ]:
data_df['log_sales'].fillna(0.0, inplace=True)
Splitting Data¶
This stage deals with generating distinct subsets of the data for training, validation and testing the model.
[ ]:
data_sets = {'train': dict(), 'validation': dict(), 'test': dict()}
For each combination of (store,item), first we will slice the data into the training periods, as well as the validation and testing period. This primary slicing will be determined according to the argument we set in the beginning: validation_bound
, history_len
, and future_len
.
Then we’ll slide over each slice, with offset steps dictated by samp_interval
. for each slide (if the resulting sub-slicing results with sufficient time steps: history_len + future_len
), we split the feature set according to the data related keys: * static_feats_numeric
* static_feats_categorical
* historical_ts_numeric
* historical_ts_categorical
* future_ts_numeric
* future_ts_categorical
* target
where the temporal elements in this division, for each time-series, are represented as 2D arrays.
[ ]:
for combination_id, combination_seq in tqdm(data_df.groupby('combination_id')):
# take the complete sequence associated with this combination and break it into the relevant periods
train_subset = combination_seq.loc[combination_seq['date'] < validation_bound]
num_train_records = len(train_subset)
validation_subset_len = num_train_records + future_len
validation_subset = combination_seq.iloc[num_train_records - history_len: validation_subset_len]
test_subset = combination_seq.iloc[validation_subset_len - history_len:]
subsets_dict = {'train': train_subset,
'validation': validation_subset,
'test': test_subset}
# for the specific combination we're processing in the current iteration,
# we'd like to go over each subset separately
for subset_key, subset_data in subsets_dict.items():
# sliding window, according to samp_interval skips between adjacent windows
for i in range(0, len(subset_data), samp_interval):
# slice includes history period and horizons period
slc = subset_data.iloc[i: i + history_len + future_len]
if len(slc) < (history_len + future_len):
# skip edge cases, where not enough steps are included
continue
# meta
data_sets[subset_key].setdefault('time_index', []).append(slc.iloc[history_len - 1]['date'])
data_sets[subset_key].setdefault('combination_id', []).append(combination_id)
# static attributes
data_sets[subset_key].setdefault('static_feats_numeric', []).append(
slc.iloc[0][feature_map['static_feats_numeric']].values.astype(np.float32))
data_sets[subset_key].setdefault('static_feats_categorical', []).append(
slc.iloc[0][feature_map['static_feats_categorical']].values.astype(np.int32))
# historical
data_sets[subset_key].setdefault('historical_ts_numeric', []).append(
slc.iloc[:history_len][feature_map['historical_ts_numeric']].values.astype(np.float32).reshape(
history_len, -1))
data_sets[subset_key].setdefault('historical_ts_categorical', []).append(
slc.iloc[:history_len][feature_map['historical_ts_categorical']].values.astype(np.int32).reshape(
history_len, -1))
# futuristic (known)
data_sets[subset_key].setdefault('future_ts_numeric', []).append(
slc.iloc[history_len:][feature_map['future_ts_numeric']].values.astype(np.float32).reshape(future_len,
-1))
data_sets[subset_key].setdefault('future_ts_categorical', []).append(
slc.iloc[history_len:][feature_map['future_ts_categorical']].values.astype(np.int32).reshape(future_len,
-1))
# target
data_sets[subset_key].setdefault('target', []).append(
slc.iloc[history_len:]['log_sales'].values.astype(np.float32))
After generating the above mentioned sets, we’ll want to concatenate them into arrays for easier processing:
[ ]:
# for each set
for set_key in list(data_sets.keys()):
# for each component in the set
for arr_key in list(data_sets[set_key].keys()):
# list of arrays will be concatenated
if isinstance(data_sets[set_key][arr_key], np.ndarray):
data_sets[set_key][arr_key] = np.stack(data_sets[set_key][arr_key], axis=0)
# lists will be transformed into arrays
else:
data_sets[set_key][arr_key] = np.array(data_sets[set_key][arr_key])
Export processed data¶
Last step to perform is save this processed data to disk, together with the relevant meta data we’ll need for building the model and analyzing its outputs:
[ ]:
with open(os.path.join(output_path, 'data.pickle'), 'wb') as f:
pickle.dump({
'data_sets': data_sets,
'feature_map': feature_map,
'scalers': scalers,
'categorical_cardinalities': categorical_cardinalities
}, f, pickle.HIGHEST_PROTOCOL)
And that’s it! We’re done with the generation of the suitable dataset.