Skip to content

Post-join method execution

There is a method decorator for specifying that a node instance method can only be executed once other nodes have been joined to the node.

For example, using the cust.csv and orders.csv datasets from prior examples, let's say we want to compute the difference between an order date and the signup date of the customer. In this case we need the timestamp of the order and the timestamp of the customer.

# In the node definition of the customer
# specify this dependency
from graphreduce.node import GraphReduceNode
from graphreduce.context import method_requires

class CustomerNode(GraphReduceNode):
    def do_filters(self):
        pass
    ...
    ...

    @method_requires(nodes=[OrderNode])
    def do_post_join_annotate(self):
        self.df[self.colabbr('signup_order_diff_seconds')] = self.df.apply(
            lambda x: (x['ord_ts'] - x[self.colabbr('signup_date')]).total_seconds(),
            axis=1
        )

By using the method_requires decorator we've told graphreduce that the CustomerNode.do_post_join_annotate method can only be executed once the OrderNode has been joined.

There are cases where we need multiple child nodes' data merged to a parent node before certain operations can be executed, such as annotations and filters.

This would look like the following snippet:

class ParentNode(GraphReduceNode):
    ...

    @method_requies(nodes=[ChildNodeOne, ChildNodeThree])
    def do_post_join_filters(self):
        self.df[self.colabbr('newcol')] = self.df.apply(lambda x: f"{x['col1']}-{x['child_col']}-{x['child3_col']}", axis=1)


    @method_requires(nodes=[ChildNodeOne, ChildNodeTwo, ChildNodeThree])
    def do_post_join_filters(self):
        self.df = self.df[..]