Skip to content

Abstractions

Core abstractions

  • GraphReduce - the top-level graph orchestration class that manages node/edge relationships and executes feature and label computation across the full table graph.
  • GraphReduceNode - the base table/file node abstraction for dataframe-style compute, where you define filtering, annotation, normalization, reduction, and labeling behavior.
  • SQLNode - a SQL-oriented node abstraction that enables query-based feature engineering on SQL dialects/backends instead of dataframe-only APIs.

Node abstraction

We represent files and tables as nodes. A node could be a csv file on your laptop, a parquet file in s3, or a Snowflake table in the cloud.

We parameterize the data location, a string prefix so we know where the data originates, a primary key, a date key (if any), a compute layer (e.g., pandas, dask), and some other optional parameters to intantiate these.

GraphReduceNode

The base GraphReduceNode requires the following abstract methods be defined

  • do_filters - all filter operations for this node go here (e.g., df.filter...)

  • do_annotate - all annotations go here (e.g., df['zip'] = df.zipfull.apply(lambda x: x.split('-')[0]))

  • do_post_join_annotate - annotations that require data from a child be joined in (e.g., need a delta between dates from two tables)

  • do_normalize - all anomaly filtering, data normalization, etc. go here (e.g., df['val_norm'] = df['val'].apply(lambda x: x/df['val'].max()))

  • do_post_join_filters - all filters requiring data from more than 1 table go here

  • do_reduce - all aggregation operations for features (e.g., df.groupby(key).agg(...))

  • do_labels - any label-specific aggregation operations (e.g., df.groupby(key).agg(had_order = 1))

DynamicNode

A dynamic node is any node that is instantiated without defined methods. These are typically used for doing automated feature engineering.

SQLNode

A SQL node is an abstraction for SQL dialects and backends. This allows us to go beyond the dataframe API that a typical GraphReduceNode or DynamicNode is built for and leverage a number of SQL backends. There is more detail about how to use these in the SQL backends tutorial.

Edge

An edge is a relationship between two nodes. This is typically a foreign key. For example if we had a customers table and orders table we would add an edge between the customers node and the orders node:

gr.add_entity_edge(
    parent_node=customer_node,
    relation_node=orders_node,
    parent_key='id',
    relation_key='customer_id',
    reduce=True
)

The reduce parameter tells graphreduce whether or not to execute aggregation operations. In some cases a user may want to maintain an aggregation operation but avoid executing it for a particular compute graph.

GraphReduce Graph

The top-level GraphReduce class inherits directly from networkx.DiGraph to take advantage of many graph algorithms implemented in networkx. The instances house to shared parameters for the entire graph of computation across all nodes and edges.

Things such as the node to which to aggregate the data, the date for splitting the data, the compute layer (e.g., pandas, dask), the amount of history to include (365 days), the label period, whether or not to automate feature engineering, the label/target node and label/target column, etc. All of these parameters get pushed down through the graph so we can do things like point in time correctness, etc.

Since we inherit from networkx the API for adding nodes is unchanged:

import datetime
from graphreduce.graph_reduce import GraphReduce
from grpahreduce.enum import PeriodUnit, ComputeLayerEnum

gr = GraphReduce(
    name='test',
    parent_node=customer_node,
    fmt='parquet',
    compute_layer=ComputeLayerEnum.spark,
    cut_date=datetime.datetime(2024, 7, 1),
    compute_period_val=365,
    compute_period_unit=PeriodUnit.day
    auto_features=False,
    label_node=order_node,
    label_operation='sum',
    label_field='order_total',
    label_period_val=60,
    label_period_unit=PeriodUnit.day,
    spark_sqlctx=sqlCtx
)

gr.add_node(customer_node)
gr.add_node(order_node)
gr.add_node(notification_node)
gr.add_node(...)
...

Examples

GraphReduceNode example

Use a custom node when you want explicit control over filtering, feature reduction, and labels:

from graphreduce.node import GraphReduceNode

class OrderNode(GraphReduceNode):
    def do_filters(self, df): return df
    def do_annotate(self, df): return df
    def do_post_join_annotate(self, df): return df
    def do_normalize(self, df): return df
    def do_post_join_filters(self, df): return df
    def do_reduce(self, df): return df.groupby('customer_id').agg(total=('amount', 'sum')).reset_index()
    def do_labels(self, df): return df.groupby('customer_id').agg(had_order=('id', 'count')).reset_index()

SQLNode example

Use SQLNode when you want SQL-native execution against a backend/dialect:

from graphreduce.node import SQLNode
from graphreduce.enum import SQLOpType

orders = SQLNode(
    fpath='orders',
    prefix='ord',
    pk='id',
    dialect='postgresql'
)

orders.build_query(
    optype=SQLOpType.select,
    cols=['customer_id', 'amount']
)

GraphReduce example

Use GraphReduce to connect nodes and run the graph-wide feature computation:

from graphreduce.graph_reduce import GraphReduce

gr = GraphReduce(
    name='customer_features',
    parent_node=customer_node,
    compute_layer=ComputeLayerEnum.pandas,
    auto_features=True
)

gr.add_node(customer_node)
gr.add_node(orders_node)
gr.add_entity_edge(
    parent_node=customer_node,
    relation_node=orders_node,
    parent_key='id',
    relation_key='customer_id',
    reduce=True
)

See also: