Syntax of C#FBP (C# Implementation of FBP)
and Component API


Warning!

Checking has been added (Aug., 2014) to make sure that all mandatory input connections have been provided. Since this was not being checked before, it may be necessary to add the new optional parameter to the InPort metadata for some components. If this causes any problems for current users, please let us know.

Contents

General

In my book, "Flow-Based Programming", I describe the syntax of the network specifications of various FBP dialects that were in existence when the book was written. C#FBP, the C# implementation of the FBP concepts, did not exist at that time, so this web page has been added describing the syntax of C#FBP network definitions.

The source code for the various constituents of C#FBP is now being held on a public GitHub project. There is also a README file on the GitHub web site.

One advantage of defining the network as executable code, as compared with other approaches that merely list connections in a language-independent way, is that the network can contain additional logic. This logic then controls the way the network is defined, rather than the way it runs. Some may regard this as a defect, rather than as an asset, and both views can certainly be defended, but one of the neat things it enables us to do is to adjust multiplexing levels of various structures in the diagram using a table of values (remember the multiplexing example in Sample DrawFlow Diagram). One merely retrieves a value from a table for the degree of multiplexing in a particular structure in the diagram, and this value is then used both as the index of a loop invoking the connect statement, and as the index for the elements of an array-type port (see below for both of these terms).

Since the way the syntax relates to the underlying diagram may not be all that clear, a brief description is in order.  At the end of this page, I have given an extremely simple C#FBP component.

Network Definitions

Any C#FBP network definition starts as follows:

using System;
using FBPComponents;
using FBPLib;


namespace nnnnnnnnn
{
public class xxxxxx : Network
{
public override void Define() {

where xxxxxx is the Network name, including the usual usings, copyright statements, namespace specification, etc. 

The network definition is terminated with:

}
internal static void Main(String[] argv)
{
new xxxxx().Go();
}
}

In between the beginning and the ending statements defining the network, you specify a list of connections, using the following methods, which I will refer to as "clauses":

  • Component - define an instance of a component (an FBP "process")

  • Connect - define a connection

  • Initialize - define a connection including an Initial Information Packet (IIP)

  • Port - define a port on a process

Every component instance must have a unique character string identifying it, which allows other component instances or initial information packets (IIPs) to be attached to it via a connection.

The following method call: 

Component("xxxx")

returns a reference to a component instance. The first reference to this particular component instance must specify the component class to be executed by that occurrence. This is done by coding

Component("xxxx", typeof(cccc))

where cccc  is the name of the C# module to be executed.

Similarly, a port is identified by a Port clause, e.g. Port("xxxx").

A port may be an array-type port, in which case the port clauses referring to its elements have index values, as follows: 

Port("xxxx",n)

where "n" runs up monotonically from 0. Each element of the port array will be connected to a different component occurrence or IIP.

A Connect or Initialize clause may contain the relevant Component clauses, together with their corresponding Port clauses, embedded within it, as e.g.

     Connect(Component("Read", typeof(ReadText)),
         Port("OUT"),
         Component("Splitter1", typeof(Splitter1)),
         Port("IN"));

or the Connect and Component portions may be in separate statements, provided Component precedes any Connects that reference it.

A Connect contains:

  • "from" Component clause

  • "from" Port clause

  • "to" Component clause

  • "to" Port clause

Optionally a Connect may have a fifth parameter: the connection capacity, specified as an int.  If this is omitted, the default value is used: 1 for testing, or 10 for production (this currently has to be changed by hand in the Network.class). 

If an asterisk (*) is specified for the "from" port, this is called an "automatic output port", and indicates a signal generated when the "from" component instance terminates (actually the port is just closed, so no packet has to be disposed of). 

If an asterisk is specified for the "to" port, this is called an "automatic input port", and indicates a delay - the "to" component instance does not start until a signal or a close is received at this port.

If *SUBEND is specified as a port name on a subnet, a packet containing null is emitted at this port every time the subnet deactivates, i.e. all the contained components terminate.  It doesn't have to be named in the port metadata.  This null packet is emitted for all activations, including the last one.

An Initialize clause contains:

  • a reference to any object

  • a Component clause

  • a Port clause

as e.g.

     Initialize(new StreamReader(
@"c:\com\jpmorrsn\eb2engine\test\data\myXML3.txt"),
       Component("Read"),
           Port("SOURCE"));

However, it has been recommended that IIPs should be strings, rather than arbitrary objects, to facilitate future graphical management of networks.

One last point: any number of "from" ports can be connected to a single "to" port; only one "to" port can ever be connected to a given "from" port.

Sample Network

Let us code up a network implementing the following picture:

 

First list the Component clauses, together with the component classes they are to execute (assuming that component classes have been written to execute the various nodes of the diagram), e.g.:

 
Component("Read Masters",typeof(Read))
Component("Read Details",typeof(Read))
Component("Collate",typeof(Collate))
Component("Process Merged Stream",typeof(Proc))
Component("Write New Masters",typeof(Write))
Component("Summary & Errors",typeof(Report))

Now these Component clauses may either be made into separate statements or they can be imbedded into the Connect statements that follow.  Here are the connections in the diagram, without imbedded Component definitions:

  Connect(Component("Read Masters"),Port("OUT"),Component("Collate"),
Port("IN",0));    // array port
Connect(component("Read Details"),Port("OUT"),Component("Collate"),
Port("IN",1));      // array port
Connect(component("Collate"),Port("OUT"),Component("Process Merged Stream"),
Port("IN"));
Connect(Component("Process Merged Stream"),Port("OUTM"),
Component("Write New Masters"),Port("IN"));
Connect(Component("Process Merged Stream"),Port("OUTSE"),
Component("Summary & Errors"),Port("IN"));

Each item in this list is a separate C# statement.

We can now add the class designation to the first Component clause referencing a particular component occurrence, giving the following:

  
Connect(Component("Read Masters",typeof(Read)),Port("OUT"),
    Component("Collate",typeof(Collate)), Port("IN",0)); // array port
  Connect(Component("Read Details",typeof(Read)),Port("OUT"),
     Component("Collate"),Port("IN",1)); // array port
  Connect(Component("Collate"),Port("OUT"),
Component("Process Merged Stream",typeof(Proc)),Port("IN"));
Connect(Component("Process Merged Stream"),Port("OUTM"),
Component("Write New Masters",typeof(Write)),Port("IN"));
Connect(Component("Process Merged Stream"),Port("OUTSE"),
Component("Summary & Errors",typeof(Report)),Port("IN"));

Now "Read Masters" and "Read Details" use the same C# class, so we need some way to indicate the name of the file that each is going to read. This is done using Initial Information Packets (IIPs). In this case they might usefully specify StreamReader objects, so we need to add two Initialize clauses, as follows:

  
Initialize(new StreamReader(@"c:\mastfile"),
Component("Read Masters"),
Port("SOURCE"));
Initialize(new StreamReader(@"c:\detlfile"),
Component("Read Details"),
Port("SOURCE"));

Note that, since both "Read" component occurrences use the same class code, they naturally have the same port names - of course, the ports are attached to different IIPs.

Remember that back-slashes have to be doubled in C# character strings, unless you precede the string with an @-sign. Process names can contain any character - but double quotes and back-slashes in the name must be "escaped" using a back-slash

"Write New Masters" will have to have an IIP to specify the output destination - perhaps:

  
Initialize(new StreamWriter(@"c:\newmast"),
Component("Write New Masters"),
Port("DESTINATION"));

Note also that this IIP is not a destination for the Writer - it is an object used by this component occurrence so that the latter can figure out where to send data to.

Add the beginning and ending statements, and you're done!   The actual sequence of Connect and Initialize statements is irrelevant.

Here is the final result:

using System;
using FBPComponents;
using FBPLib;


namespace nnnnnnnnn
{
public class xxxxxx : Network
{
public override void Define() {
Connect(Component("Read Masters",typeof(Read)),Port("OUT"),
    Component("Collate",typeof(Collate)), Port("IN",0)); // array port
  Connect(Component("Read Details",typeof(Read)),Port("OUT"),
     Component("Collate"),Port("IN",1)); // array port
  Connect(Component("Collate"),Port("OUT"),
Component("Process Merged Stream",typeof(Proc)),Port("IN"));
Connect(Component("Process Merged Stream"),Port("OUTM"),
Component("Write New Masters",typeof(Write)),Port("IN"));
Connect(Component("Process Merged Stream"),Port("OUTSE"),
Component("Summary & Errors",typeof(Report)),Port("IN"));
Initialize(new StreamReader(@"c:\mastfile"),
Component("Read Masters"),
Port("SOURCE"));
Initialize(new StreamReader(@"c:\detlfile"),
Component("Read Details"),
Port("SOURCE"));
Initialize(new StreamWriter(@"c:\newmast"),
Component("Write New Masters"),
Port("DESTINATION"));

}
internal static void Main(String[] argv)
{
new xxxxx().Go();
}
}



Alternative (Simplified) Notation

In C#FBP there is a simplified notation, in addition to that shown above.  In this notation Connect specifies two character strings, and Initialize specifies an object and a character string.   In both cases, the second character string specifies a combination of component and port, with the two parts separated by a period.  Array port indices, if required, are specified using square brackets, e.g.

"component.port[3]"

The old port notation will still be supported, but is only really needed when the port index is a variable.  When debugging, it will be noted that the square bracket notation is used in trace lines, even when it was not used in the network definition.

Component names must of course not include periods or most special characters, but they may include blanks, numerals, hyphens and underscores, and they must be associated with their implementing class using a (preceding) Component statement.

Here is the above network using the new notation:

 using System;
using FBPComponents;
using FBPLib;


namespace nnnnnnnnn
{
public class xxxxxx : Network
{
public override void Define() {
Component("Read Masters",typeof(Read));
Component("Read Details",typeof(Read));
Component("Collate",typeof(Collate));
Component("Process Merged Stream",typeof(Proc));
Component("Write New Masters",typeof(Write));
Component("Summary & Errors",typeof(Report));
Connect("Read Masters.OUT", "Collate.IN[0]");
Connect("Read Details.OUT", "Collate.IN[1]");
Connect("Collate.OUT"), "Process Merged Stream.IN");
Connect("Process Merged Stream.OUTM", "Write New Masters.IN");
Connect("Process Merged Stream.OUTSE", "Summary & Errors.IN");
Initialize(new FileReader("c:\\mastfile"), "Read Masters.SOURCE");
Initialize(new FileReader("c:\\detlfile"), "Read Details.SOURCE");
Initialize(new FileWriter("c:\\newmast"), "Write New Masters.DESTINATION");
}

internal static void Main(String[] argv)
{
new xxxxx().Go();
}
}

}

Here is a network example showing how variable port numbers can be used with the LoadBalance function to define an (admittedly fairly trivial) self-balancing network. 


using System;
using FBPComponents;
using FBPLib;


namespace nnnnnnnnn
{
public class TestLoadBalancer : Network
{
public override void Define() {
int multiplex_factor = 10;
Component("generate", typeof(Generate));
Component("display", typeof(WriteToConsole));
Component("lbal", typeof(LoadBalance));
Connect("generate.OUT", "lbal.IN");
Initialize("100 ", Component("generate"), Port("COUNT"));
for (int i = 0; i < multiplex_factor; i++) {
Connect(Component("lbal"), Port("OUT", i),
Component("passthru" + i, typeof(Passthru)), Port("IN"));
Connect(Component("passthru" + i), Port("OUT"), "display.IN");
}
}

internal static void Main(String[] argv)
{
new
TestLoadBalancer().Go();
}

}
}

Simple Subnet

As described in the book, networks can be built up level by level, using what we call "subnets" - they may be thought of as networks with "sticky" connections.  Here is a very simple subnet.

Note the metadata - a subnet can act as a component, so metadata is required.


using System; using System.Collections.Generic; using System.Text; using Components; using FBPLib; namespace TestNetworks.Networks { [OutPort("OUT")] [InPort("IN")] class SubnetX : SubNet { public override void Define() { Component("SUBIN", typeof(SubInSS)); Component("SUBOUT", typeof(SubOutSS)); Component("Pass", typeof(Passthru)); Initialize("IN", Component("SUBIN"), Port("NAME")); Connect(Component("SUBIN"), Port("OUT"), Component("Pass"), Port("IN")); Connect(Component("Pass"), Port("OUT"), Component("SUBOUT"), Port("IN")); Initialize("OUT", Component("SUBOUT"), Port("NAME")); } } }



Sample C#FBP Component

A more complete description of the API is given in the next section.

This component generates a stream of 'n' IPs, where 'n' is specified in an InitializationConnection (specified by an Initialize clause in the foregoing). Each IP just contains an arbitrary string of characters, in order to illustrate the concept.  Of course any copyright information included is up to the developer.

using System;
using FBPLib;

namespace FBPComponents
{

/** Component to generate a stream of 'n' packets, where 'n' is
* specified in an InitializationConnection.
*/
[InPort("COUNT", description="Number of packets", type=typeof(System.String))]
[OutPort("OUT")]
[ComponentDescription("Generate stream of packets based on count")]

public class Generate : Component
{

internal static string _copyright =
"Copyright ....";

OutputPort _outport;
IInputPort _count;


public override void Execute() /* throws Throwable */ {
Packet ctp = _count.Receive();

string param = ctp.Content.ToString();
Int32 ct = Int32.Parse(param);
Drop(ctp);
_count.Close();

for (int i = ct; i > 0; i--)
{
string s = String.Format("{0:d6}", i) + new String('a', 1000);

Packet p = Create(s);
_outport.Send(p);


}

// output.close();
// terminate();
}
/*
As of C# 2.0, Introspect has been replaced by the metadata

public override System.Object[] Introspect()
{

return new Object[] {
"generates a set of Packets under control of a counter" ,
"OUT", "output", Type.GetType("System.String"),
"lines generated",
"COUNT", "parameter", Type.GetType("System.String"),
"Count of number of entities to be generated"};
}
*/
public override void OpenPorts()
{

_outport = OpenOutput("OUT");

_count = OpenInput("COUNT");

}
}
}

The scheduling rules for most FBP implementations are described in the chapter of my book called Scheduling Rules.

The previous C# implementation of FBP (C#FBP-1.5.3) presents an IIP to a component once per activation. This has been changed in the latest implementation (C#FBP-2.0) to once per invocation.  In practice this will only affect "non-loopers" (components that get reactivated multiple times).  

There are a few minor changes to component code for C#FBP-2.0:

  • As good programming practice, we now feel that IIP ports should be closed after a Receive has been executed, in case it is attached to an upstream component (rather than an IIP), and that component mistakenly sends more than one IP - this statement has accordingly been added to the above example.
  • The Drop statement now takes the packet as a parameter, rather than being a method of Packet.
  • The Send is now unconditional - it either works or crashes.
  • We are adding a "long wait" state to components, specifying a timeout value in seconds. This is coded as follows:
  •       double _timeout = 2;   // 2 secs

    ....

    LongWaitStart(_timeout);

    // activity taking time goes here

    LongWaitEnd();
  • Typically, the timeout value is given a default value in the code, and overridden (if desired) by an IIP.
  • While the component in question is executing the activity taking time, its state will be set to "long wait". If one or more components are in "long wait" state while all other components are suspended or not started, this situation is not treated as a deadlock. However, if one of the components exceeds its timeout value, an error will be reported (Complain).

Component Metadata:

  • Input and output port names will be coded on components using C# 5.0 "attribute" notation. This metadata can be used to do analysis of networks without having to actually execute the components involved. Here is an example of the attributes for the "Collate" component:
  •     [InPort("CTLFIELDS")]
    [InPort("IN", arrayPort = true)]
    [OutPort("OUT")]
    [ComponentDescription("Collates input streams at array
    port IN and sends them to port OUT")]

    public class Collate : Component {

  • Input ports do not necessarily have to be connected, even though attributes are specified for them; output ports, however, must be.
  • MustRun is also specified as metadata, rather than as an interface, as it was in version 1.5.3,  i.e.

[MustRun]

A new service has been added for component code for C#FBP-2.2:

  • <output port name>.IsConnected returns bool

To support IsConnected, a new metadata attribute called optional has been added to OutPort, e.g.

  • [OutPort("OUT",optional=true)]

C#FBP Component API

Component Metadata:


[ComponentDescription]
parameters:
- string

[InPort]
parameters:
- string
- arrayPort (bool)
- description (string)
- type (typeof(...))

[OutPort]
parameters:
- string
- arrayPort (bool)
- description (string)
- type (typeof(...))
 - optional (bool)


[MustRun]

[Priority(ThreadPriority.Highest)] // default is ThreadPriority.Lowest


Packet class:

/**
* A Packet may either contain an Object, when type is NORMAL,
* or a String, when type is not NORMAL. The latter case
* is used for things like open and close brackets (where the
* String will be the name of a group. e.g. accounts)
**/

Dictionary Attributes; /* returns attributes */
Object Content; /* returns null if type <> Normal */


Component class:

/**
* All verbs must extend this class, defining its two abstract methods:
* openPorts, and execute.
**/


Packet p = Create(Object o);
Packet p = Create(Packet.Types type, string s);

Drop(Packet p); // Note this change!

LongWaitStart(double interval); // in seconds
LongWaitEnd();



/** 3 stack methods - as of JavaFBP-2.3
**/

Push (Packet p);

Packet p = Pop(); // return null if empty

int StackSize();


IInputPort interface:

Packet = Receive();

void Close();


OutputPort class:

void Send(Packet packet);

bool IsConnected(); // as of 2.2

void Close();