Simulación y reporting¶
Jano puede describir una simulación temporal sobre un dataset concreto y exponerla de tres formas complementarias:
un
SimulationSummaryestructuradoun reporte HTML standalone con timeline
SimulationChartDatalisto para graficar en tus propias visualizaciones Python
El entry point principal es describe_simulation() sobre TemporalBacktestSplitter.
Si querés correr una simulación completa sin iterar folds manualmente, la interfaz recomendada es WalkForwardPolicy.
El workflow general está pensado por capas:
usar clases high-level cuando la pregunta ya está encapsulada
inspeccionar o recortar iteraciones con
plan()cuando haga faltay caer al modo manual de folds cuando querés componer todo por tu cuenta
La misma API acepta tres inputs tabulares:
pandas.DataFramenumpy.ndarrayusando referencias enteras comotime_col=0polars.DataFramecuando está instalado el extra opcional de Polars
Eso significa que la configuración temporal permanece igual aunque cambie la fuente upstream. Lo único que cambia es cómo referenciás las columnas:
por nombre para pandas y Polars
por posición entera para arrays NumPy
Ejemplo¶
pandas.DataFrame
import pandas as pd
from jano import TemporalPartitionSpec, WalkForwardPolicy
frame = pd.DataFrame(
{
"timestamp": pd.date_range("2024-01-01", periods=365, freq="D"),
"feature": range(365),
"target": range(100, 465),
}
)
policy = WalkForwardPolicy(
time_col="timestamp",
partition=TemporalPartitionSpec(
layout="train_test",
train_size="10D",
test_size="5D",
),
step="5D",
strategy="rolling",
)
result = policy.run(frame, title="Walk-forward simulation")
print(result.total_folds)
print(result.to_frame().head())
print(result.html[:120])
print(result.chart_data.segment_stats)
Si querés inspeccionar la simulación antes de materializar folds, usá plan():
Simulación planificada
plan = policy.plan(frame, title="Plan walk-forward")
print(plan.total_folds)
print(plan.to_frame().head())
filtered = plan.exclude_windows(
train=[("2025-12-20", "2026-01-05")],
).select_from_iteration(5)
result = filtered.materialize()
El frame del plan incluye el índice de iteración, boundaries por segmento y conteos de filas, de modo que podés inspeccionar la estructura primero y materializar después sólo los folds que te interesan.
Podés anclar la simulación a un punto de tiempo específico y limitar el número de folds:
Simulación anclada
policy = WalkForwardPolicy(
time_col="timestamp",
partition=TemporalPartitionSpec(
layout="train_test",
train_size="15D",
test_size="4D",
),
step="1D",
strategy="rolling",
start_at="2025-09-01",
max_folds=15,
)
result = policy.run(frame, title="15 iteraciones diarias de retraining")
WalkForwardPolicy también acepta end_at cuando querés restringir la simulación a una ventana temporal acotada.
Correr un modelo con policies de retraining¶
Cuando no querés escribir un loop manual tipo for train_idx, test_idx in splitter,
podés usar WalkForwardRunner por encima del workflow temporal. El runner mantiene
las responsabilidades separadas:
WalkForwardPolicysigue definiendo la geometría de foldsWalkForwardRunnerejecuta el estimador sobre esos foldsuna policy de retraining decide si el modelo debe refittearse antes de cada fold
import numpy as np
from jano import TemporalPartitionSpec, WalkForwardPolicy, WalkForwardRunner
def mae(y_true, y_pred):
return float(np.mean(np.abs(np.asarray(y_true) - np.asarray(y_pred))))
def rmse(y_true, y_pred):
return float(np.sqrt(np.mean((np.asarray(y_true) - np.asarray(y_pred)) ** 2)))
policy = WalkForwardPolicy(
time_col="timestamp",
partition=TemporalPartitionSpec(
layout="train_test",
train_size="30D",
test_size="7D",
),
step="7D",
strategy="rolling",
)
runner = WalkForwardRunner(
model=model,
target_col="target",
feature_cols=["feature_a", "feature_b"],
retrain="always",
metrics={"mae": mae, "rmse": rmse},
)
run = runner.run(policy, frame)
print(run.to_frame().head())
print(run.summary())
print(run.metric_trajectory().head())
print(run.retrain_events())
Los resultados del runner son data-first. Jano no necesita ser dueño de la capa final de dashboard; expone evidencia estructurada para que notebooks, agentes, herramientas de presentación o aplicaciones la visualicen con su propio estilo:
run.fold_summary()devuelve geometría temporal y metadata de retraining.run.metric_trajectory()devuelve métricas en formato long, listas para graficar.run.retrain_events()devuelve solo los folds donde el estimador se refitteó.run.predictions_frame()devuelve predicciones row-level sobre los tests.run.report_data()yrun.to_dict()devuelven diccionarios estructurados para capas externas de reporting.
Los modos shorthand de retraining son:
retrain="always"oretrain=Truepara refittear en cada foldretrain="never"oretrain=Falsepara entrenar una vez y reutilizar el mismo modeloretrain="periodic"másretrain_interval=Kpara refittear cadaKfolds
Perfiles de evaluación¶
EvaluationProfile separa cómo se mide una corrida temporal de cuándo el runner
debería reentrenar el estimador. Jano no implementa fórmulas de métricas; el
contrato principal es que el usuario pase la función de métrica o pérdida que
corresponde a su problema.
import numpy as np
from jano import EvaluationProfile, FunctionRetrainPolicy, WalkForwardRunner
def daily_cost(y_true, y_pred):
return float(np.mean(np.abs(y_true - y_pred)))
def retrain_rule(context):
if context.history.empty:
return True
latest = context.history["daily_cost"].iloc[-1]
limit = limit_for_date(context.split.boundaries["test"].end)
return latest > limit
runner = WalkForwardRunner(
model=model,
target_col="target",
feature_cols=["feature_a", "feature_b"],
evaluation=EvaluationProfile(
metrics={"daily_cost": daily_cost},
metric_directions={"daily_cost": "min"},
primary_metric="daily_cost",
),
retrain_policy=FunctionRetrainPolicy(retrain_rule),
)
El profile le dice a Jano qué métricas existen, si cada una debe minimizarse o
maximizarse y cuál es la señal operativa principal. FunctionRetrainPolicy le
da al usuario control total sobre la decisión de reentrenar, incluyendo thresholds
dinámicos, losses que cambian por fecha o reglas de negocio.
También hay perfiles convenientes cuando el tipo de problema ayuda a explicitar la intención. No agregan fórmulas de métricas; agrupan métricas provistas por el usuario según el estilo del problema:
RegressionProfileetiqueta pérdidas de regresión provistas por el usuario.ClassificationProfileetiqueta scores de clasificación provistos por el usuario.OrdinalClassificationProfileestá pensado para clases ordenadas con costos custom.RankingProfileestá pensado para métricas de ranking o retrieval provistas por el usuario.
También podés pasar una policy explícita:
from jano import DriftBasedRetrain, WalkForwardRunner
runner = WalkForwardRunner(
model=model,
target_col="target",
retrain_policy=DriftBasedRetrain(
metric="mae",
threshold=0.05,
baseline="last_retrain",
),
metrics={"mae": mae},
)
DriftBasedRetrain usa métricas observadas en folds previos para decidir si el fold
siguiente debería disparar un retraining. Eso lo vuelve útil como benchmark operativo
inicial, sin meter lógica de drift dentro del splitter.
Cuando DriftBasedRetrain se crea sin una métrica explícita, usa el
primary_metric del perfil de evaluación.
Alineación a días calendario¶
Por defecto, las ventanas por duración arrancan desde el primer timestamp observado. Si la
primera fila es 2024-01-01 05:21 y train_size="7D", la primera ventana de train
termina en 2024-01-08 05:21.
A veces eso no es lo buscado. En datasets operativos, podés querer días calendario completos: train hasta Jan 7 y test desde Jan 8.
Usá calendar_frequency="D" en TemporalPartitionSpec para eso:
simulation = WalkForwardPolicy(
time_col="timestamp",
partition=TemporalPartitionSpec(
layout="train_test",
train_size="7D",
test_size="1D",
calendar_frequency="D",
),
step="1D",
strategy="rolling",
)
Jano usa boundaries cerrados-abiertos: [start, end). Un train que termina en
2024-01-08 00:00:00 contiene filas anteriores a Jan 8, mientras que test puede empezar
exactamente en Jan 8.
Si el source data es un array NumPy, referenciá la columna temporal por posición entera:
Input NumPy
import numpy as np
values = np.array(
[
["2025-09-01", 0.2, 1],
["2025-09-02", 0.4, 0],
["2025-09-03", 0.1, 1],
["2025-09-04", 0.3, 0],
],
dtype=object,
)
simulation = TemporalSimulation(
time_col=0,
partition=TemporalPartitionSpec(
layout="train_test",
train_size="2D",
test_size="1D",
),
step="1D",
strategy="single",
)
Si el source data es un frame de Polars, la misma configuración funciona con columnas nombradas:
polars.DataFrame
import polars as pl
frame = pl.DataFrame(
{
"timestamp": ["2025-09-01", "2025-09-02", "2025-09-03", "2025-09-04"],
"feature": [0.2, 0.4, 0.1, 0.3],
"target": [1, 0, 1, 0],
}
).with_columns(pl.col("timestamp").str.strptime(pl.Datetime, "%Y-%m-%d"))
simulation = TemporalSimulation(
time_col="timestamp",
partition=TemporalPartitionSpec(
layout="train_test",
train_size="2D",
test_size="1D",
),
step="1D",
strategy="single",
)
result = simulation.run(frame)
Elegir el motor de particionado¶
Todas las APIs de simulación de alto nivel aceptan engine. El default,
engine="auto", elige la representación interna usada para calcular boundaries
temporales e índices de filas:
simulation = TemporalSimulation(
time_col="timestamp",
partition=TemporalPartitionSpec(
layout="train_test",
train_size="7D",
test_size="1D",
),
step="1D",
strategy="rolling",
engine="auto",
)
result = simulation.run(frame)
print(result.engine_metadata.to_dict())
engine="auto" mantiene inputs Polars y NumPy nativos para planning cuando es seguro.
Usá engine="pandas" para forzar el camino pandas estable, o engine="polars" /
engine="numpy" cuando quieras forzar un motor específico de particionado.
Control manual low-level¶
Cuando necesitás control directo de los folds o integrar con un training loop externo, usá TemporalBacktestSplitter directamente.
from jano import TemporalBacktestSplitter, TemporalPartitionSpec
splitter = TemporalBacktestSplitter(
time_col="timestamp",
partition=TemporalPartitionSpec(
layout="train_test",
train_size="10D",
test_size="5D",
),
step="5D",
strategy="rolling",
)
for split in splitter.iter_splits(frame):
print(split.summary())
El mismo splitter también puede precalcular la geometría completa de la partición:
plan = splitter.plan(frame)
print(plan.to_frame()[["iteration", "train_start", "train_end", "test_start", "test_end"]])
Este es el modo manual completo. Es el lugar correcto cuando querés componer por tu cuenta todo el proceso: layouts de partición, gaps temporales, exclusión de fechas especiales, lookbacks por grupo de features, training loops del modelo o cualquier lógica de evaluación que no convenga esconder detrás de un helper high-level.
Estudios con cutoff fijo¶
Estos son casos especiales encima del workflow básico de simulación.
Jano los expone como policies temporales dedicadas en lugar de dejarlos como recetas manuales.
Test fijo, train creciente
from jano import TrainHistoryPolicy
policy = TrainHistoryPolicy(
"timestamp",
cutoff="2025-09-15",
train_sizes=["7D", "14D", "21D", "28D"],
test_size="4D",
)
result = policy.evaluate(
frame,
model=model,
target_col="target",
feature_cols=["feature_1", "feature_2"],
metrics={"mae": mae, "rmse": rmse},
)
print(result.to_frame()[["train_size", "mae", "rmse"]])
print(result.find_optimal_train_size(metric="rmse", tolerance=0.01))
Esto mantiene fijo el mismo test mientras train se expande hacia el pasado. Es la forma correcta para preguntas sobre suficiencia de historia y eficiencia de datos.
El caso opuesto también es común: dejar train fijo y mover test día a día para medir cuánto tiempo un modelo o regla mantiene su performance sin retraining.
Train fijo, test móvil
from jano import DriftMonitoringPolicy
policy = DriftMonitoringPolicy(
"timestamp",
cutoff="2025-09-15",
train_size="30D",
test_size="3D",
step="1D",
max_windows=10,
)
result = policy.evaluate(
frame,
model=model,
target_col="target",
feature_cols=["feature_1", "feature_2"],
metrics={"mae": mae, "rmse": rmse},
)
print(result.to_frame()[["window", "test_start", "rmse"]])
print(result.find_drift_onset(metric="rmse", threshold=0.15, baseline="first"))
Policy compuesta: optimizar historia de train dentro de cada iteración walk-forward¶
Cuando la pregunta es más compleja, podés seguir dentro de la superficie recomendada.
RollingTrainHistoryPolicy ejecuta un loop walk-forward externo y, dentro de cada
iteración, elige la menor ventana de train que queda dentro de la tolerancia del mejor
score para el test fijo de esa iteración.
from jano import RollingTrainHistoryPolicy, TemporalPartitionSpec
policy = RollingTrainHistoryPolicy(
"timestamp",
partition=TemporalPartitionSpec(
layout="train_test",
train_size="30D",
test_size="1D",
),
step="1D",
strategy="rolling",
max_folds=10,
train_sizes=["5D", "10D", "15D", "30D"],
)
result = policy.evaluate(
frame,
model=model,
target_col="target",
feature_cols=["feature_1", "feature_2"],
metrics={"rmse": rmse},
metric="rmse",
tolerance=0.01,
)
print(result.to_frame().head())
print(result.summary())
Semántica temporal y control de leakage¶
Cuando una sola columna temporal no alcanza, podés pasar un TemporalSemanticsSpec en lugar de un simple string en time_col.
Esto permite separar:
la timeline usada para reporting y bounds globales
la columna de orden interno
la columna temporal usada para decidir la elegibilidad de cada segmento
Eso importa en datasets más parecidos a producción, donde tiempo de evento y tiempo de disponibilidad no son iguales.
from jano import TemporalBacktestSplitter, TemporalPartitionSpec, TemporalSemanticsSpec
splitter = TemporalBacktestSplitter(
time_col=TemporalSemanticsSpec(
timeline_col="departured_at",
segment_time_cols={
"train": "arrived_at",
"test": "departured_at",
},
),
partition=TemporalPartitionSpec(
layout="train_test",
train_size="14D",
test_size="3D",
gap_before_train="1D",
gap_before_test="1D",
gap_after_test="2D",
),
step="1D",
strategy="rolling",
)
Lookback por grupo de features¶
Algunos pipelines necesitan una capa adicional más allá del fold: distintos grupos de features pueden requerir diferente cantidad de historia, aun cuando el segmento supervisado de train sea fijo.
from jano import FeatureLookbackSpec
split = next(splitter.iter_splits(frame))
lookbacks = FeatureLookbackSpec(
default_lookback="15D",
group_lookbacks={"lag_features": "65D"},
feature_groups={"lag_features": ["lag_30", "lag_60"]},
)
history = split.slice_feature_history(
frame,
lookbacks,
time_col="timestamp",
segment_name="train",
)
recent_context = history["__default__"]
lag_context = history["lag_features"]
Preview simple de HTML¶
Debajo hay un mock compacto del tipo de timeline que muestra el reporte HTML generado.