Custom Pandas Graph: All cust_data Nodes
This example runs the same custom all-nodes graph in pandas across:
cust.csvorders.csvorder_products.csvnotifications.csvnotification_interactions.csvnotification_interaction_types.csv
Complete Example
#!/usr/bin/env python
"""Custom pandas GraphReduce example using all cust_data nodes."""
from __future__ import annotations
import datetime
from pathlib import Path
import pandas as pd
from graphreduce.enum import ComputeLayerEnum
from graphreduce.graph_reduce import GraphReduce
from graphreduce.node import GraphReduceNode
DATA_PATH = Path("tests/data/cust_data")
class CustNode(GraphReduceNode):
def do_annotate(self):
self.df[self.colabbr("name_length")] = self.df[self.colabbr("name")].fillna("").astype(str).str.len()
return self.df
def do_filters(self):
self.df = self.df[self.df[self.colabbr("id")].notna()]
return self.df
def do_normalize(self):
self.df[self.colabbr("name")] = (
self.df[self.colabbr("name")]
.fillna("")
.astype(str)
.str.strip()
.str.lower()
)
return self.df
def do_reduce(self, reduce_key):
return self.df
def do_labels(self, reduce_key):
return self.df
def do_post_join_annotate(self):
ord_ct = self.df["ord_num_orders"].fillna(0) if "ord_num_orders" in self.df.columns else 0
not_ct = self.df["not_num_notifications"].fillna(0) if "not_num_notifications" in self.df.columns else 0
engaged = (
self.df["not_num_engaged_interactions"].fillna(0)
if "not_num_engaged_interactions" in self.df.columns
else 0
)
self.df["cust_total_events"] = ord_ct + not_ct + engaged
self.df["cust_activity_tier"] = pd.cut(
self.df["cust_total_events"],
bins=[-1, 1, 4, float("inf")],
labels=["low", "medium", "high"],
).astype(str)
return self.df
def do_post_join_filters(self):
self.df = self.df[self.df["cust_total_events"] >= 0]
return self.df
class OrderNode(GraphReduceNode):
def do_annotate(self):
self.df[self.colabbr("amount_dbl")] = pd.to_numeric(self.df[self.colabbr("amount")], errors="coerce")
return self.df
def do_filters(self):
self.df = self.df[self.df[self.colabbr("ts")] >= "2022-01-01"]
return self.df
def do_normalize(self):
self.df[self.colabbr("amount_dbl")] = self.df[self.colabbr("amount_dbl")].fillna(0.0)
return self.df
def do_reduce(self, reduce_key):
return (
self.prep_for_features()
.groupby(self.colabbr(reduce_key))
.agg(
**{
self.colabbr("num_orders"): pd.NamedAgg(column=self.colabbr(self.pk), aggfunc="nunique"),
self.colabbr("sum_amount"): pd.NamedAgg(column=self.colabbr("amount_dbl"), aggfunc="sum"),
self.colabbr("avg_amount"): pd.NamedAgg(column=self.colabbr("amount_dbl"), aggfunc="mean"),
}
)
.reset_index()
)
def do_labels(self, reduce_key):
return self.df
def do_post_join_annotate(self):
return self.df
def do_post_join_filters(self):
return self.df
class OrderProductsNode(GraphReduceNode):
def do_annotate(self):
self.df[self.colabbr("product_id_int")] = pd.to_numeric(self.df[self.colabbr("product_id")], errors="coerce")
return self.df
def do_filters(self):
self.df = self.df[self.df[self.colabbr("product_id_int")].notna()]
return self.df
def do_normalize(self):
return self.df
def do_reduce(self, reduce_key):
return (
self.prep_for_features()
.groupby(self.colabbr(reduce_key))
.agg(
**{
self.colabbr("num_order_products"): pd.NamedAgg(column=self.colabbr(self.pk), aggfunc="count"),
self.colabbr("num_distinct_products"): pd.NamedAgg(
column=self.colabbr("product_id_int"), aggfunc="nunique"
),
}
)
.reset_index()
)
def do_labels(self, reduce_key):
return self.df
def do_post_join_annotate(self):
return self.df
def do_post_join_filters(self):
return self.df
class NotificationNode(GraphReduceNode):
def do_annotate(self):
self.df[self.colabbr("ts_month")] = self.df[self.colabbr("ts")].astype(str).str.slice(5, 7)
return self.df
def do_filters(self):
self.df = self.df[self.df[self.colabbr("ts")] >= "2022-01-01"]
return self.df
def do_normalize(self):
return self.df
def do_reduce(self, reduce_key):
return (
self.prep_for_features()
.groupby(self.colabbr(reduce_key))
.agg(
**{
self.colabbr("num_notifications"): pd.NamedAgg(column=self.colabbr(self.pk), aggfunc="nunique"),
self.colabbr("max_notification_ts"): pd.NamedAgg(column=self.colabbr("ts"), aggfunc="max"),
self.colabbr("num_interactions"): pd.NamedAgg(
column="ni_num_interactions", aggfunc="sum"
),
self.colabbr("num_engaged_interactions"): pd.NamedAgg(
column="ni_num_engaged_interactions", aggfunc="sum"
),
}
)
.reset_index()
)
def do_labels(self, reduce_key):
return self.df
def do_post_join_annotate(self):
return self.df
def do_post_join_filters(self):
return self.df
class NotificationInteractionsNode(GraphReduceNode):
def do_annotate(self):
self.df[self.colabbr("ts_day")] = self.df[self.colabbr("ts")].astype(str).str.slice(0, 10)
return self.df
def do_filters(self):
self.df = self.df[self.df[self.colabbr("ts")] >= "2022-01-01"]
return self.df
def do_normalize(self):
return self.df
def do_reduce(self, reduce_key):
return (
self.prep_for_features()
.groupby(self.colabbr(reduce_key))
.agg(
**{
self.colabbr("num_interactions"): pd.NamedAgg(column=self.colabbr(self.pk), aggfunc="count"),
self.colabbr("num_interaction_types"): pd.NamedAgg(
column=self.colabbr("interaction_type_id"), aggfunc="nunique"
),
self.colabbr("num_engaged_interactions"): pd.NamedAgg(
column="nit_is_engagement_type", aggfunc="sum"
),
}
)
.reset_index()
)
def do_labels(self, reduce_key):
return self.df
def do_post_join_annotate(self):
return self.df
def do_post_join_filters(self):
return self.df
class NotificationInteractionTypeNode(GraphReduceNode):
def do_annotate(self):
self.df[self.colabbr("name")] = self.df[self.colabbr("name")].astype(str).str.strip().str.lower()
self.df[self.colabbr("is_engagement_type")] = (
self.df[self.colabbr("name")].isin(["clicked", "dismissed"]).astype(int)
)
self.df[self.colabbr("is_view_event")] = self.df[self.colabbr("name")].str.contains("view", na=False).astype(int)
return self.df
def do_filters(self):
self.df = self.df[self.df[self.colabbr("id")].notna()]
return self.df
def do_normalize(self):
return self.df
def do_reduce(self, reduce_key):
return (
self.prep_for_features()
.groupby(self.colabbr(reduce_key))
.agg(
**{
self.colabbr("name"): pd.NamedAgg(column=self.colabbr("name"), aggfunc="first"),
self.colabbr("is_engagement_type"): pd.NamedAgg(
column=self.colabbr("is_engagement_type"), aggfunc="max"
),
self.colabbr("is_view_event"): pd.NamedAgg(column=self.colabbr("is_view_event"), aggfunc="max"),
}
)
.reset_index()
)
def do_labels(self, reduce_key):
return self.df
def do_post_join_annotate(self):
return self.df
def do_post_join_filters(self):
return self.df
def build_custom_pandas_graph() -> GraphReduce:
cust = CustNode(
fpath=str(DATA_PATH / "cust.csv"),
fmt="csv",
prefix="cust",
pk="id",
compute_layer=ComputeLayerEnum.pandas,
columns=["id", "name"],
)
orders = OrderNode(
fpath=str(DATA_PATH / "orders.csv"),
fmt="csv",
prefix="ord",
pk="id",
date_key="ts",
compute_layer=ComputeLayerEnum.pandas,
columns=["id", "customer_id", "ts", "amount"],
)
order_products = OrderProductsNode(
fpath=str(DATA_PATH / "order_products.csv"),
fmt="csv",
prefix="op",
pk="id",
compute_layer=ComputeLayerEnum.pandas,
columns=["id", "order_id", "product_id"],
)
notifications = NotificationNode(
fpath=str(DATA_PATH / "notifications.csv"),
fmt="csv",
prefix="not",
pk="id",
date_key="ts",
compute_layer=ComputeLayerEnum.pandas,
columns=["id", "customer_id", "ts"],
)
notification_interactions = NotificationInteractionsNode(
fpath=str(DATA_PATH / "notification_interactions.csv"),
fmt="csv",
prefix="ni",
pk="id",
date_key="ts",
compute_layer=ComputeLayerEnum.pandas,
columns=["id", "notification_id", "interaction_type_id", "ts"],
)
interaction_types = NotificationInteractionTypeNode(
fpath=str(DATA_PATH / "notification_interaction_types.csv"),
fmt="csv",
prefix="nit",
pk="id",
compute_layer=ComputeLayerEnum.pandas,
columns=["id", "name"],
)
gr = GraphReduce(
name="custom_pandas_all_nodes",
parent_node=cust,
compute_layer=ComputeLayerEnum.pandas,
cut_date=datetime.datetime(2023, 6, 30),
)
for node in [cust, orders, order_products, notifications, notification_interactions, interaction_types]:
gr.add_node(node)
gr.add_entity_edge(cust, orders, parent_key="id", relation_key="customer_id", reduce=True)
gr.add_entity_edge(orders, order_products, parent_key="id", relation_key="order_id", reduce=True)
gr.add_entity_edge(cust, notifications, parent_key="id", relation_key="customer_id", reduce=True)
gr.add_entity_edge(notifications, notification_interactions, parent_key="id", relation_key="notification_id", reduce=True)
gr.add_entity_edge(notification_interactions, interaction_types, parent_key="interaction_type_id", relation_key="id", reduce=True)
return gr
def run_custom_pandas_all_nodes() -> pd.DataFrame:
gr = build_custom_pandas_graph()
gr.do_transformations()
return gr.parent_node.df
def main() -> None:
df = run_custom_pandas_all_nodes()
print("rows:", len(df), flush=True)
print("columns:", len(df.columns), flush=True)
print("column_names:", list(df.columns), flush=True)
if __name__ == "__main__":
main()
Full runnable script:
examples/custom_pandas_all_nodes.py
Run Interactive
Idle