Flow Graph Basics: Reservation

The Intel® Threading Building Blocks (Intel® TBB) flow graph join_node has four possible policies: queueing, reserving, key_matching and tag_matching. join_nodes need messages at every input before they can create an output message. The reserving join_node does not have internal buffering, and it does not pull messages from its inputs until it has a message at each input. To create an output message it temporarily reserves a message at each input port, and only if all input ports succeed reserving messages will an output message be created. If any input port fails to reserve a message, no message will be pulled by the join_node.

To support the reserving join_node some nodes support reservation of their outputs. The way reservation works is:

Because the reserving join_node will only attempt to push when each input port has at least one edge in a pull state, and will only attempt to create and push a message if all input ports succeed reserving messages, at least one of the predecessors to each of the reserving join_node input ports must be reservable.

The following example demonstrates a reserving join_node's behavior. buffer_nodes buffer their output, so they accept a switch of their output edge from push to pull mode. broadcast_nodes do not buffer messages and do not support try_get() or try_reserve().

void run_example2() {  // example for Flow_Graph_Reservation.xml
    graph g;
    broadcast_node<int> bn(g);
    buffer_node<int> buf1(g);
    buffer_node<int> buf2(g);
    typedef join_node<tuple<int,int> reserving> join_type;
    join_type jn(g);
    buffer_node<join_type::output_type> buf_out(g);
    join_type::output_type tuple_out;
    int icnt;

    // join_node predecessors are both reservable buffer_nodes
    make_edge(buf1,input_port<0>jn));
    make_edge(bn,input_port<0>jn));      // attach a broadcast_node
    make_edge(buf2,input_port<1>jn));
    make_edge(jn, buf_out);
    bn.try_put(2);
    buf1.try_put(3);
    buf2.try_put(4);
    buf2.try_put(7);
    g.wait_for_all();
    while (buf_out.try_get(tuple_out)) {
        printf("join_node output == (%d,%d)\n",get<0>tuple_out), get<1>tuple_out) );
    }
    if(buf1.try_get(icnt)) printf("buf1 had %d\n", icnt);
    else printf("buf1 was empty\n");
    if(buf2.try_get(icnt)) printf("buf2 had %d\n", icnt);
    else printf("buf2 was empty\n");
}

In the example above, port 0 of the reserving join_node jn has two predecessors: a buffer_node buf1 and a broadcast_node bn. Port 1 of the join_node has one predecessor, buffer_node buf2.


bn.try_put(2)

We will discuss one possible execution sequence (the scheduling of tasks may differ slightly, but the end result will be the same.)

    bn.try_put(2);

bn attempts to forward 2 to jn. jn does not accept the value and the arc from bn to jn reverses. Because neither bn nor jn buffer messages, the message is dropped. Because not all the inputs to jn have available predecessors, jn does nothing further.

CAUTION

Any node which does not support reservation will not work correctly when attached to a reserving join_node. This program demonstrates why this occurs; connecting non-reserving nodes to nodes requiring support for reservation is not recommended practice.


bn.try_put(2)

    buf1.try_put(3);

buf1 attempts to forward 3 to jn. jn does not accept the value and the arc from buf1 to jn reverses. Because not all the inputs to jn have available predecessors, jn does nothing further.


buf1.try_put(3)

    buf2.try_put(4);

buf2 attempts to forward 4 to jn. jn does not accept the value and the arc from buf2 to jn reverses. Now both inputs of jn have predecessors, a task to build and forward a message from jn will be spawned. We assume that task is not yet executing.


buf2.try_put(4)

    buf2.try_put(7);

buf2 has no successor (because the arc to jn is reversed,) so it stores the value 7.


buf2.try_put(7)

Now the task spawned to run jn runs.


jn reserves buffers, constructs message.

Now jn pushes its message to buf_out, which accepts it. Because the push succeeded, jn signals buf1 and buf2 that the reserved values were used, and the buffers discard those values. Now jn attempts to reserve again.


jn fails to reserve buf1, reverses arc.

No further activity occurs in the graph, and the wait_for_all() will complete. The output of this code is

join_node output == (3,4)
buf1 was empty
buf2 had 7