Skip to main content

nixtla_statsforecast

dataflow

To get started:

Dynamically pull and run

from hamilton import dataflows, driver
# downloads into ~/.hamilton/dataflows and loads the module -- WARNING: ensure you know what code you're importing!
nixtla_statsforecast = dataflows.import_module("nixtla_statsforecast", "zilto")
dr = (
driver.Builder()
.with_config({}) # replace with configuration as appropriate
.with_modules(nixtla_statsforecast)
.build()
)
# If you have sf-hamilton[visualization] installed, you can see the dataflow graph
# In a notebook this will show an image, else pass in arguments to save to a file
# dr.display_all_functions()
# Execute the dataflow, specifying what you want back. Will return a dictionary.
result = dr.execute(
[nixtla_statsforecast.CHANGE_ME, ...], # this specifies what you want back
inputs={...} # pass in inputs as appropriate
)

Use published library version

pip install sf-hamilton-contrib --upgrade  # make sure you have the latest
from hamilton import dataflows, driver
# Make sure you've done - `pip install sf-hamilton-contrib --upgrade`
from hamilton.contrib.user.zilto import nixtla_statsforecast
dr = (
driver.Builder()
.with_config({}) # replace with configuration as appropriate
.with_modules(nixtla_statsforecast)
.build()
)
# If you have sf-hamilton[visualization] installed, you can see the dataflow graph
# In a notebook this will show an image, else pass in arguments to save to a file
# dr.display_all_functions()
# Execute the dataflow, specifying what you want back. Will return a dictionary.
result = dr.execute(
[nixtla_statsforecast.CHANGE_ME, ...], # this specifies what you want back
inputs={...} # pass in inputs as appropriate
)

Modify for your needs

Now if you want to modify the dataflow, you can copy it to a new folder (renaming is possible), and modify it there.

dataflows.copy(nixtla_statsforecast, "path/to/save/to")

Purpose of this module

This module implements forecasting using statistical methods with statsforecast by Nixtla.

Fit and evaluate a list of models on a time series dataset. Obtain a cross-validation benchmark dataframe and prediction plots.

Your dataset needs to have columns unique_id to identify each series, ds to identify the time step, and y to specify the value of series unique_id at time ds.

Configuration Options

Config.when

This module doesn't receive configurations.

Inputs

  • freq: Adjust to meet the sampling rate of your time series
  • cv_forecast_steps, cv_window_size, n_cv_windows: Change these values to define your cross-validation strategy.
  • n_jobs: Set the number of cores to use for compute. The default value -1 will use all available cores and might slowdown the machine in the meantime.

Overrides

  • base_models: Set the list of Nixtla models to fit and evaluate (docs)
  • evaluation_metrics: Set the list of Nixtla-compatible metrics to use during cross-validation (examples)

Limitations

  • This flow doesn't include dataset preprocessing steps.

Source code

__init__.py
import logging
from typing import Callable, Optional

logger = logging.getLogger(__name__)

from hamilton import contrib

with contrib.catch_import_errors(__name__, __file__, logger):
import matplotlib
import pandas as pd
from statsforecast import StatsForecast
from statsforecast.models import (
_TS,
AutoARIMA,
CrostonClassic,
DynamicOptimizedTheta,
HistoricAverage,
HoltWinters,
SeasonalNaive,
)
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mse


def base_models() -> list[_TS]:
"""Nixtla stats models to fit and evaluate
Override with your own models at the Hamilton Driver level.
"""
season_length = 24
return [
AutoARIMA(season_length=season_length),
HoltWinters(),
CrostonClassic(),
SeasonalNaive(season_length=season_length),
HistoricAverage(),
DynamicOptimizedTheta(season_length=season_length),
]


def fallback_model() -> _TS:
"""Model to be used if a model fails"""
return SeasonalNaive(season_length=7)


def forecaster(
dataset: pd.DataFrame,
base_models: list[_TS],
fallback_model: _TS,
freq: str = "M",
n_jobs: int = -1,
verbose: bool = False,
) -> StatsForecast:
"""Create the forecasting harness with data and models

:param freq: frequency of the data, see pandas ref: https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases
:param n_jobs: number of cores to use; -1 will use all available cores
"""
return StatsForecast(
df=dataset,
models=base_models,
fallback_model=fallback_model,
freq=freq,
n_jobs=n_jobs,
verbose=verbose,
)


def cross_validation_predictions(
dataset: pd.DataFrame,
forecaster: StatsForecast,
cv_forecast_steps: int = 24,
cv_window_size: int = 24,
n_cv_windows: int = 2,
) -> pd.DataFrame:
"""Fit models and predict over `n_cv_windows` time windows"""
return forecaster.cross_validation(
df=dataset,
h=cv_forecast_steps,
step_size=cv_window_size,
n_windows=n_cv_windows,
)


def evaluation_metrics() -> list[Callable]:
"""List of metrics to use with `utilsforecast.evaluation.evaluate` in `cross_validation_evaluation`
The metric function should receive as arguments at least:
df: pd.DataFrame
models: list[str]
id_col: str = "unique_id"
target_col: str = "y"

See examples: https://github.com/Nixtla/utilsforecast/blob/main/utilsforecast/losses.py
"""
return [mse]


def cross_validation_evaluation(
cross_validation_predictions: pd.DataFrame,
evaluation_metrics: list[Callable],
) -> pd.DataFrame:
"""Evaluate the crossvalidation predictions and aggregate the performances by series"""
df = cross_validation_predictions.reset_index()

models = [m for m in df.columns if m not in ["unique_id", "ds", "cutoff", "y"]]

evals = []
for cutoff in df.cutoff.unique():
eval_ = evaluate(
df=df.loc[df.cutoff == cutoff],
metrics=evaluation_metrics,
models=models,
)
evals.append(eval_)

evaluation_df = pd.concat(evals)
evaluation_df = evaluation_df.groupby("unique_id").mean(numeric_only=True)
return evaluation_df


def best_model_per_series(cross_validation_evaluation: pd.DataFrame) -> pd.Series:
"""Return the best model for each series"""
return cross_validation_evaluation.idxmin(axis=1)


def inference_predictions(
forecaster: StatsForecast,
inference_forecast_steps: int = 12,
inference_confidence_percentile: list[float] = [90.0], # noqa: B006
) -> pd.DataFrame:
"""Infer values using the training harness. Fitted models aren't stored

:param inference_forecast_steps: number of steps in the future to forecast
"""
return forecaster.forecast(
h=inference_forecast_steps,
level=inference_confidence_percentile,
)


def plotting_config(
plot_uids: Optional[list[str]] = None,
plot_models: Optional[list[str]] = None,
plot_anomalies: bool = False,
plot_confidence_percentile: list[float] = [90.0], # noqa: B006
plot_engine: str = "matplotlib",
) -> dict:
"""Configuration for plotting functions"""
return dict(
unique_ids=plot_uids,
models=plot_models,
plot_anomalies=plot_anomalies,
level=plot_confidence_percentile,
engine=plot_engine,
)


def dataset_plot(dataset: pd.DataFrame, plotting_config: dict) -> matplotlib.figure.Figure:
"""Plot series from the dataset"""
return StatsForecast.plot(dataset, **plotting_config)


def inference_plot(
dataset: pd.DataFrame,
inference_predictions: pd.DataFrame,
plotting_config: dict,
) -> matplotlib.figure.Figure:
"""Plot forecast values"""
return StatsForecast.plot(dataset, inference_predictions, **plotting_config)


if __name__ == "__main__":
# run as a script to test Hamilton's execution
import __init__ as nixtla_statsforecast

from hamilton import driver

dr = driver.Driver(
{},
nixtla_statsforecast,
)
# create the DAG image
dr.display_all_functions("dag", {"format": "png", "view": False})

Requirements

matplotlib
numpy
pandas
statsforecast
utilsforecast