Abstractions
Core abstractions
GraphReduce- the top-level graph orchestration class that manages node/edge relationships and executes feature and label computation across the full table graph.GraphReduceNode- the base table/file node abstraction for dataframe-style compute, where you define filtering, annotation, normalization, reduction, and labeling behavior.SQLNode- a SQL-oriented node abstraction that enables query-based feature engineering on SQL dialects/backends instead of dataframe-only APIs.
Node abstraction
We represent files and tables as nodes. A node could be a csv file on your laptop, a parquet file in s3, or a Snowflake table in the cloud.
We parameterize the data location, a string prefix so we know where the data originates,
a primary key, a date key (if any), a compute layer (e.g., pandas, dask), and
some other optional parameters to intantiate these.
GraphReduceNode
The base GraphReduceNode requires the following abstract methods be defined
-
do_filters- all filter operations for this node go here (e.g.,df.filter...) -
do_annotate- all annotations go here (e.g.,df['zip'] = df.zipfull.apply(lambda x: x.split('-')[0])) -
do_post_join_annotate- annotations that require data from a child be joined in (e.g., need a delta between dates from two tables) -
do_normalize- all anomaly filtering, data normalization, etc. go here (e.g.,df['val_norm'] = df['val'].apply(lambda x: x/df['val'].max())) -
do_post_join_filters- all filters requiring data from more than 1 table go here -
do_reduce- all aggregation operations for features (e.g.,df.groupby(key).agg(...)) -
do_labels- any label-specific aggregation operations (e.g.,df.groupby(key).agg(had_order = 1))
DynamicNode
A dynamic node is any node that is instantiated without defined methods. These are typically used for doing automated feature engineering.
SQLNode
A SQL node is an abstraction for SQL dialects and backends. This allows us to
go beyond the dataframe API that a typical GraphReduceNode or DynamicNode
is built for and leverage a number of SQL backends. There is more detail
about how to use these in the SQL backends tutorial.
Edge
An edge is a relationship between two nodes. This is typically a foreign key. For
example if we had a customers table and orders table we would add an edge between the
customers node and the orders node:
gr.add_entity_edge(
parent_node=customer_node,
relation_node=orders_node,
parent_key='id',
relation_key='customer_id',
reduce=True
)
The reduce parameter tells graphreduce whether or not to execute
aggregation operations. In some cases a user may want to maintain
an aggregation operation but avoid executing it for a particular compute graph.
GraphReduce Graph
The top-level GraphReduce class inherits directly from networkx.DiGraph
to take advantage of many graph algorithms implemented in networkx. The instances
house to shared parameters for the entire graph of computation across all nodes and edges.
Things such as the node to which to aggregate the data, the date for splitting the data, the compute layer (e.g., pandas, dask), the amount of history to include (365 days), the label period, whether or not to automate feature engineering, the label/target node and label/target column, etc. All of these parameters
get pushed down through the graph so we can do things like point in time correctness, etc.
Since we inherit from networkx the API for adding nodes is unchanged:
import datetime
from graphreduce.graph_reduce import GraphReduce
from grpahreduce.enum import PeriodUnit, ComputeLayerEnum
gr = GraphReduce(
name='test',
parent_node=customer_node,
fmt='parquet',
compute_layer=ComputeLayerEnum.spark,
cut_date=datetime.datetime(2024, 7, 1),
compute_period_val=365,
compute_period_unit=PeriodUnit.day
auto_features=False,
label_node=order_node,
label_operation='sum',
label_field='order_total',
label_period_val=60,
label_period_unit=PeriodUnit.day,
spark_sqlctx=sqlCtx
)
gr.add_node(customer_node)
gr.add_node(order_node)
gr.add_node(notification_node)
gr.add_node(...)
...
Examples
GraphReduceNode example
Use a custom node when you want explicit control over filtering, feature reduction, and labels:
from graphreduce.node import GraphReduceNode
class OrderNode(GraphReduceNode):
def do_filters(self, df): return df
def do_annotate(self, df): return df
def do_post_join_annotate(self, df): return df
def do_normalize(self, df): return df
def do_post_join_filters(self, df): return df
def do_reduce(self, df): return df.groupby('customer_id').agg(total=('amount', 'sum')).reset_index()
def do_labels(self, df): return df.groupby('customer_id').agg(had_order=('id', 'count')).reset_index()
SQLNode example
Use SQLNode when you want SQL-native execution against a backend/dialect:
from graphreduce.node import SQLNode
from graphreduce.enum import SQLOpType
orders = SQLNode(
fpath='orders',
prefix='ord',
pk='id',
dialect='postgresql'
)
orders.build_query(
optype=SQLOpType.select,
cols=['customer_id', 'amount']
)
GraphReduce example
Use GraphReduce to connect nodes and run the graph-wide feature computation:
from graphreduce.graph_reduce import GraphReduce
gr = GraphReduce(
name='customer_features',
parent_node=customer_node,
compute_layer=ComputeLayerEnum.pandas,
auto_features=True
)
gr.add_node(customer_node)
gr.add_node(orders_node)
gr.add_entity_edge(
parent_node=customer_node,
relation_node=orders_node,
parent_key='id',
relation_key='customer_id',
reduce=True
)
See also: