Freezing and Replaying SQL Execution Plans
This page shows how to:
- run a SQL-backed
GraphReducegraph withtrain=Trueandauto_features=True - freeze the exact executed
sqlopinstances - replay those same ops into a fresh
train=Truegraph - replay those same ops into a fresh
train=Falsegraph
The important point is that GraphReduce.freeze_execution_plan() freezes the
executed SQL operation program, not just the final dataframe. That means a later
graph can reuse the same per-method sqlop instances instead of dynamically
recomputing feature SQL.
What Gets Frozen
After do_transformations_sql() runs, GraphReduce stores structured execution
records for methods such as:
do_datado_annotatedo_filtersdo_normalizedo_reducedo_labelsdo_post_join_annotatedo_post_join_filters
freeze_execution_plan() returns those executed ops grouped by method and also
preserves the execution order by node and edge. apply_execution_plan(plan)
loads that frozen plan into a fresh graph so do_transformations_sql() can
replay the same method outputs.
When replaying into train=False:
- label ops are not applied
- label-dependent post-join ops are not applied
- date-node joins are not attempted
- date literals in frozen SQL ops are rebound to the receiving graph's current
cut_date
If you do not pass cut_date to the scoring graph, that graph keeps its normal
default behavior, which is datetime.datetime.now().
Simple Dataset
This example uses the same sqlite dataset across all three runs:
custordersnotifications
The customer node also has a post-join annotation that depends on the label
column created from the orders node during training.
import datetime
import sqlite3
import pandas as pd
from graphreduce.graph_reduce import GraphReduce
from graphreduce.node import SQLNode
from graphreduce.enum import ComputeLayerEnum, PeriodUnit, SQLOpType
from graphreduce.models import sqlop
def setup_sqlite():
conn = sqlite3.connect(":memory:")
pd.DataFrame(
[
{"id": 1, "name": "alex"},
{"id": 2, "name": "blair"},
{"id": 3, "name": "casey"},
{"id": 4, "name": "devon"},
]
).to_sql("cust", conn, index=False, if_exists="replace")
pd.DataFrame(
[
{"id": 1, "customer_id": 1, "ts": "2023-01-05", "amount": 10.0},
{"id": 2, "customer_id": 1, "ts": "2023-06-01", "amount": 25.0},
{"id": 3, "customer_id": 2, "ts": "2023-04-15", "amount": 5.0},
{"id": 4, "customer_id": 4, "ts": "2023-05-20", "amount": 99.0},
]
).to_sql("orders", conn, index=False, if_exists="replace")
pd.DataFrame(
[
{"id": 1, "customer_id": 1, "ts": "2023-06-10"},
{"id": 2, "customer_id": 1, "ts": "2023-06-11"},
{"id": 3, "customer_id": 3, "ts": "2023-03-01"},
]
).to_sql("notifications", conn, index=False, if_exists="replace")
return conn
def build_graph(conn, train, cut_date=None):
orders = SQLNode(
fpath="orders",
pk="id",
prefix="ord",
client=conn,
compute_layer=ComputeLayerEnum.sqlite,
columns=["id", "customer_id", "ts", "amount"],
date_key="ts",
)
cust = SQLNode(
fpath="cust",
pk="id",
prefix="cust",
client=conn,
compute_layer=ComputeLayerEnum.sqlite,
columns=["id", "name"],
do_post_join_annotate_ops=[
sqlop(optype=SQLOpType.select, opval="*"),
sqlop(
optype=SQLOpType.select,
opval="case when ord_id_label > 0 then 1 else 0 end as has_order_label",
),
] if train else None,
do_post_join_annotate_requires=[orders],
)
notifications = SQLNode(
fpath="notifications",
pk="id",
prefix="not",
client=conn,
compute_layer=ComputeLayerEnum.sqlite,
columns=["id", "customer_id", "ts"],
date_key="ts",
)
kwargs = dict(
name=f"cust_sql_plan_train_{train}",
parent_node=cust,
compute_period_unit=PeriodUnit.day,
compute_period_val=730,
label_node=orders,
label_field="id",
label_operation="bool",
label_period_unit=PeriodUnit.day,
label_period_val=90,
compute_layer=ComputeLayerEnum.sqlite,
use_temp_tables=True,
lazy_execution=False,
auto_features=True,
auto_feature_hops_back=3,
auto_feature_hops_front=0,
sql_client=conn,
date_filters_on_agg=True,
train=train,
)
if cut_date is not None:
kwargs["cut_date"] = cut_date
gr = GraphReduce(**kwargs)
gr.add_node(cust)
gr.add_node(orders)
gr.add_node(notifications)
gr.add_entity_edge(cust, notifications, parent_key="id", relation_key="customer_id", reduce=True)
gr.add_entity_edge(cust, orders, parent_key="id", relation_key="customer_id", reduce=True)
return gr
Example 1: train=True with Dynamic Auto-Generated Features
This is the normal training path. GraphReduce inspects the graph, auto-generates SQL feature ops where needed, executes labels, and records the executed SQL ops.
train_cut_date = datetime.datetime(2023, 6, 30)
train_conn = setup_sqlite()
train_gr = build_graph(train_conn, train=True, cut_date=train_cut_date)
train_gr.do_transformations_sql()
train_df = pd.read_sql_query(
f"select * from {train_gr.parent_node._cur_data_ref}",
train_conn,
).sort_values("cust_id")
frozen_plan = train_gr.freeze_execution_plan()
sql_ops_by_method = train_gr.get_executed_sqlops_by_method(exclude_date_filters=True)
print(sorted(sql_ops_by_method))
print(train_df.columns.tolist())
What happened in this run:
do_reducemay include auto-generated SQL ops fromsql_auto_featuresdo_labelsexecutes because this is a training graphdo_post_join_annotateexecutes after the label join because it depends onord_id_labelfreeze_execution_plan()captures the exact executedsqlopsequences
Example 2: train=True Replaying a Previously Frozen Plan
Now create a brand-new graph against the same dataset and same training cut date. Instead of dynamically generating features again, load the frozen plan first.
replay_train_conn = setup_sqlite()
replay_train_gr = build_graph(replay_train_conn, train=True, cut_date=train_cut_date)
replay_train_gr.apply_execution_plan(frozen_plan)
replay_train_gr.do_transformations_sql()
replay_train_df = pd.read_sql_query(
f"select * from {replay_train_gr.parent_node._cur_data_ref}",
replay_train_conn,
).sort_values("cust_id")
replay_train_sql_ops = replay_train_gr.get_executed_sqlops_by_method(
exclude_date_filters=True
)
print("do_labels" in replay_train_sql_ops)
print(train_df.equals(replay_train_df))
What changed in this run:
- the graph shape is the same
- the executed SQL ops are loaded from
frozen_plan do_transformations_sql()replays the frozensqlopoutputs by node, method, and edgedo_labelsstill executes because this graph is stilltrain=True- the output should match the original training run
This is useful when you want reproducible SQL feature generation across repeated training jobs.
Example 3: train=False Replaying Frozen Features on the Same Dataset
Now build the same graph again, but this time for scoring.
Do not pass a scoring cut_date here if you want the graph to keep its normal
train=False behavior. In that case the graph uses its current default
cut_date, which is datetime.datetime.now().
score_conn = setup_sqlite()
score_gr = build_graph(score_conn, train=False)
score_gr.apply_execution_plan(frozen_plan)
score_gr.do_transformations_sql()
score_df = pd.read_sql_query(
f"select * from {score_gr.parent_node._cur_data_ref}",
score_conn,
).sort_values("cust_id")
score_sql_ops = score_gr.get_executed_sqlops_by_method(exclude_date_filters=True)
print("do_labels" in score_sql_ops)
print("do_post_join_annotate" in score_sql_ops)
print(score_df.columns.tolist())
What changed in this run:
- the frozen feature ops are replayed
- date literals from the frozen training plan are rebound to the scoring graph's
current
cut_date do_labelsis omitted- label-dependent
do_post_join_annotateanddo_post_join_filtersare omitted - date-node joins are skipped
This lets you keep feature logic fixed between training and scoring while still allowing the scoring graph to use its own current cutoff date.
Mental Model
The three runs are doing different things with the same graph structure:
train=Truedynamic run: generate SQL ops and execute themtrain=Truefrozen replay: reuse the exact executed SQL ops from step 1train=Falsefrozen replay: reuse the frozen feature ops but omit labels and label-dependent post-join logic
The core API is:
plan = train_gr.freeze_execution_plan()
new_gr.apply_execution_plan(plan)
new_gr.do_transformations_sql()
Use get_executed_sqlops_by_method() or get_executed_op_records() if you want
to inspect exactly what ran after execution.