Skip to content

Abstractions

Node abstraction

We represent files and tables as nodes. A node could be a csv file on your laptop, a parquet file in s3, or a Snowflake table in the cloud.

We parameterize the data location, a string prefix so we know where the data originates, a primary key, a date key (if any), a compute layer (e.g., pandas, dask), and some other optional parameters to intantiate these.

GraphReduceNode

The base GraphReduceNode requires the following abstract methods be defined

  • do_filters - all filter operations for this node go here (e.g., df.filter...)

  • do_annotate - all annotations go here (e.g., df['zip'] = df.zipfull.apply(lambda x: x.split('-')[0]))

  • do_post_join_annotate - annotations that require data from a child be joined in (e.g., need a delta between dates from two tables)

  • do_normalize - all anomaly filtering, data normalization, etc. go here (e.g., df['val_norm'] = df['val'].apply(lambda x: x/df['val'].max()))

  • do_post_join_filters - all filters requiring data from more than 1 table go here

  • do_reduce - all aggregation operations for features (e.g., df.groupby(key).agg(...))

  • do_labels - any label-specific aggregation operations (e.g., df.groupby(key).agg(had_order = 1))

DynamicNode

A dynamic node is any node that is instantiated without defined methods. These are typically used for doing automated feature engineering.

SQLNode

A SQL node is an abstraction for SQL dialects and backends. This allows us to go beyond the dataframe API that a typical GraphReduceNode or DynamicNode is built for and leverage a number of SQL backends. There is more detail about how to use these in the SQL backends tutorial.

Edge

An edge is a relationship between two nodes. This is typically a foreign key. For example if we had a customers table and orders table we would add an edge between the customers node and the orders node:

gr.add_entity_edge(
    parent_node=customer_node,
    relation_node=orders_node,
    parent_key='id',
    relation_key='customer_id',
    reduce=True
)

The reduce parameter tells graphreduce whether or not to execute aggregation operations. In some cases a user may want to maintain an aggregation operation but avoid executing it for a particular compute graph.

GraphReduce Graph

The top-level GraphReduce class inherits directly from networkx.DiGraph to take advantage of many graph algorithms implemented in networkx. The instances house to shared parameters for the entire graph of computation across all nodes and edges.

Things such as the node to which to aggregate the data, the date for splitting the data, the compute layer (e.g., pandas, dask), the amount of history to include (365 days), the label period, whether or not to automate feature engineering, the label/target node and label/target column, etc. All of these parameters get pushed down through the graph so we can do things like point in time correctness, etc.

Since we inherit from networkx the API for adding nodes is unchanged:

import datetime
from graphreduce.graph_reduce import GraphReduce
from grpahreduce.enum import PeriodUnit, ComputeLayerEnum

gr = GraphReduce(
    name='test',
    parent_node=customer_node,
    fmt='parquet',
    compute_layer=ComputeLayerEnum.spark,
    cut_date=datetime.datetime(2024, 7, 1),
    compute_period_val=365,
    compute_period_unit=PeriodUnit.day
    auto_features=False,
    label_node=order_node,
    label_operation='sum',
    label_field='order_total',
    label_period_val=60,
    label_period_unit=PeriodUnit.day,
    spark_sqlctx=sqlCtx
)

gr.add_node(customer_node)
gr.add_node(order_node)
gr.add_node(notification_node)
gr.add_node(...)
...