Syntax of GoFBP (Go Implementation of FBP)
and Component API


Contents

General

In my book, "Flow-Based Programming", I describe the syntax of the network specifications of various FBP dialects that were in existence when the book was written. GoFBP, the Go implementation of the FBP concepts, did not exist at that time, so this web page has been added describing the syntax of GoFBP network definitions.

As described in my book, many non-trivial applications can be built using only the network definition language, so this web page starts with a description of the network definition language.  A running program can be built up in layers, using GoFBP's subnet notation(not yet implemented).  There is also a diagramming tool (DrawFBP), which can be used to define the network graphically, and which can actually generate the network definitions (not yet implemented). 

If new components have to be coded, they will be coded in Go, using the Go component API. 

The source code for the various constituents of GoFBP is now being held on a public GitHub project. There is also a README file on the GitHub web site.

Network Definitions

Since the way the syntax relates to the underlying diagram may not be all that clear, a brief description is in order.  At the end of this page, I have given an extremely simple GoFBP component.

Any GoFBP network definition starts as follows (preceded by any necessary imports, including "github.com/jpaulm/gofbp/core"):

      func main() { {  
	params, err := core.LoadXMLParams("params.xml")  // [optional]
	if err != nil {                                  //  do.   
		panic(err)                               //  do.
	}                                                // do.
	net := core.NewNetwork("Merge")
	net.SetParams(params)      // [ optional]
	

where "Merge" is the FBP network string name.

The network definition is terminated with:

     }
net.Run()
}
}

In between the beginning and the ending statements defining the network, you specify a list of connections, using the following methods, which I will refer to as "clauses":

  • NewProc - define an FBP "process"
  • Connect - define a connection
  • Initialize - define a connection including an Initial Information Packet (IIP)

Every process must have a unique character string identifying it, which allows other processes or initial information packets (IIPs) to be attached to it via a connection.

The following method call: 

NewProc

returns a reference to a process. This clause also specifies the name of the process code .go program to be executed. Thus:

     
proc1 := net.NewProc("xxxxxx", &testrtn.yyyyyy{})

where xxxxxx is a process name, and yyyyyy is the name of the .go program - the former has to be unique, but the latter doesn't have to be.

A Connect or Initialize clause will contain a reference to two or one process variables, as e.g.

               net.Connect(proc1, "OUT", proc3, "IN", 6)  
and
	       net.Initialize("15", proc1, "COUNT")
	

It should be pointed out that the "value" of an Initialize clause may be any type - examples usually show strings, for compatibility with common DrawFBP usage.

The 5th parameter of Connect is the capacity of the connection.

If an asterisk (*) is specified for the "from" port, this is called an "automatic output port", and indicates a signal generated when the "from" component instance terminates (actually the port is just closed, so no packet has to be disposed of). 

If an asterisk (*) is specified for the "to" port, this is called an "automatic input port", and indicates a delay - the "to" component instance does not start until a signal or a close is received at this port.

One last point: any number of "from" ports can be connected to a single "to" port; only one "to" port can ever be connected to a given "from" port.

Sample Network

Here is a simple network (main.go) illustrating indexed ports:

 
package main import ( "github.com/jpaulm/gofbp/components/testrtn" "github.com/jpaulm/gofbp/core" ) func main() { net := core.NewNetwork("RRDist") proc1 := net.NewProc("Sender", &testrtn.Sender{}) proc2 := net.NewProc("RoundRobinSender", &testrtn.RoundRobinSender{}) proc3a := net.NewProc("WriteToConsole", &testrtn.WriteToConsole{}) proc3b := net.NewProc("Receiver1", &testrtn.Receiver{}) proc3c := net.NewProc("Receiver2", &testrtn.Receiver{}) net.Initialize("15", proc1, "COUNT") net.Connect(proc1, "OUT", proc2, "IN", 6) net.Connect(proc2, "OUT[0]", proc3a, "IN", 6) net.Connect(proc2, "OUT[1]", proc3b, "IN", 6) net.Connect(proc2, "OUT[2]", proc3c, "IN", 6) net.Run() }

Note that array-type input and output ports are indicated in the Connect statements using square brackets.

Network and Subnet Methods

Subnets.

Network and subnet methods (except for core.LoadXMLParams) will be prefixed by the name of the Network variable obtained by the NewNetwork method call. Here is the list of the methods:

  • func core.LoadXMLParams(s string) *Params, error (Params does not have to be declared)
  • func core.NewNetwork(s string) *Network
  • func (n *Network) SetParams(p *Params)
  • func (n *Network) NewProc(s string, c &Component) *Process
  • func (n *Network) Connect (p1 *Process, outport string, p2 *Process, inport string, capacity int)
  • func (n *Network) Initialize (initValue interface {}, p2 *Process, inport string)
  • func (n *Network) Run ()

Component Coding

Sample Component

Here is an example of a GoFBP component - note that there are two main sections: Setup, which initializes the input and/or output ports, and Execute, both of which must always be present in a component definition. In addition there will be the imports, and the input and/or output port definitions themselves.

 
package testrtn import ( "fmt" "github.com/jpaulm/gofbp/core" ) type RoundRobinSender struct { ipt core.InputConn out core.OutputArrayConn } func (rrsender *RoundRobinSender) Setup(p *core.Process) { rrsender.ipt = p.OpenInPort("IN") rrsender.out = p.OpenOutArrayPort("OUT") } func (rrsender *RoundRobinSender) Execute(p *core.Process) { var i = 0 j := rrsender.out.ArrayLength() for { var pkt = p.Receive(rrsender.ipt) if pkt == nil { break } fmt.Println("Output: ", pkt.Contents) opt := rrsender.out.GetArrayItem(i) p.Send(opt, pkt) i = (i + 1) % j } }

By now, you should be familiar with the concept of "MustRun": normally processes only run if they have (non-IIP, i.e. dynamic) data to process, but in the case of a file writer or a counter component, you want at least an empty file or a zero, respectively, to be written/emitted. In GoFBP, this is achieved by including a line like func (ComponentName) MustRun() {} in the component code.

Here is the WriteFile.go component:

 
package io import ( "fmt" "os" "github.com/jpaulm/gofbp/core" ) // WriteFile type defines iptIP, ipt, and opt type WriteFile struct { iptIP core.InputConn ipt core.InputConn opt core.OutputConn } //Setup method initializes Process func (writeFile *WriteFile) Setup(p *core.Process) { writeFile.iptIP = p.OpenInPort("FILENAME") writeFile.ipt = p.OpenInPort("IN") writeFile.opt = p.OpenOutPortOptional("OUT") } //MustRun method func (WriteFile) MustRun() {} //Execute method starts Process func (writeFile *WriteFile) Execute(p *core.Process) { icpkt := p.Receive(writeFile.iptIP) fname, ok := icpkt.Contents.(string) if !ok { panic("Parameter (file name) not a string") } p.Discard(icpkt) p.Close(writeFile.iptIP) f, err := os.Create(fname) if err != nil { panic("Unable to open file: " + fname) } defer f.Close() for { var pkt = p.Receive(writeFile.ipt) if pkt == nil { break } data := []byte(pkt.Contents.(string) + "\n") _, err2 := f.Write(data) if err2 != nil { panic("Unable to write file: " + fname) } if !writeFile.opt.IsConnected() { p.Discard(pkt) } else { p.Send(writeFile.opt, pkt) } } fmt.Println(p.Name+": File", fname, "written") }

Note that output port OUT is optional in this component. This means that the component has to test whether the port is connected, and do a "send" if it is, or do a "discard" otherwise.

The scheduling rules for most FBP implementations are described in the chapter of my book called Scheduling Rules.

As good programming practice, we now feel that IIP ports should be closed after a receive has been executed, in case it is attached to an upstream component (rather than an IIP), and that component mistakenly sends more than one IP. (I may not have done that consistently for my GoFBP components!)

Component API

All Process API calls will be prefixed by the name of the Process variable defined in the Setup or Execute function header, as appropriate, e.g. p.Discard(pkt).

In the following list, InputConn and OutputConn are interfaces, so do not need the leading asterisk.

Here is the list of the Process API calls:

  • func (p *Process) OpenInPort(s string) InputConn
  • func (p *Process) OpenInArrayPort(s string) *InArrayPort
  • func (p *Process) OpenOutPort(s string) OutputConn
  • func (p *Process) OpenOutPortOptional(s string) OutputConn
  • func (p *Process) OpenOutArrayPort(s string) *OutArrayPort
  • func (p *Process) Send(o OutputConn, pkt *Packet) bool (returns false if Send fails)
  • func (p *Process) Receive(c InputConn) *Packet (returns nil, when there's no more data)
  • func (p *Process) Close(c InputConn)

Information Packet services (under Process)

  • func (p *Process) Create(x interface{}) *Packet
  • func (p *Process) CreateBracket(pktType int32, s string) *Packet
    First parameter can be core.OpenBracket or core.CloseBracket; second parameter is required, but can be a zero-length string, i.e. ""
  • func (p *Process) CreateSignal(s string) *Packet
  • func ((p *Process) Discard(pkt *Packet)

The InputArrayConn API call:

  • GetArray() []*InPort (returns input array - this can be used with "range")
  • GetArrayItem(i int) *InPort (returns input array item number i)
  • ArrayLength() int

The OutputArrayConn API calls:

  • GetArray() []*OutPort (returns output array - this can be used with "range")
  • GetItemWithFewestIPs() int (returns output array item number witth fewest IPs stacked up in it)
  • GetArrayItem(i int) *OutPort (returns array item number i)
  • ArrayLength() int

Packet chaining calls (coded under Process):

  • (p *Process) NewChain(pkt *Packet, name string) (*Chain, bool)
  • (p *Process) GetChain(pkt *Packet, name string) (*Chain, bool)
  • (p *Process) Attach(chn *Chain, subpkt *Packet)
  • (p *Process) Detach(chn *Chain, subpkt *Packet)

Chain has public attributes First and Last, referring to Packets, and Packet has a public attribute Next, all of which can be used when "walking through" a chain.

Network level calls (used when defining a network or subnet):

  • net := NewNetwork(netName ...string) *Network (if netName is omitted, NewNetwork will use the func name for the network)
  • (n *Network)NewProc(procName string, code Component) *Process (NewProc procNames must be unique)
  • (n *Network)Connect(p1 *Process, outPort string, p2 *Process, inPort string, cap int) *InPort
    May return an InPort reference if this is needed, e.g. for the SetDropOldest call - for an example, see DropOldest Test
  • (n *Network)Initialize(i interface{}, p1 *Process, inPort string)
  • (n *Network)Run()
  • (n *Network)SetDropOldest(i *InPort)
  • (n *Network) SetParams(p *Params)

Simple Subnet

A subnet is a network with "sticky" connections, but it also acts as a component, so it has both network API calls as well as component attributes. This is why I have placed this at the end of this web page.

Here is a simple subnet (subnets/subnet1.go). Note that the only external differences from regular networks are the presence of type Subnet1 struct{}, together with Setup and Execute methods, the use of newSubnet in place of newNetwork (and the presence of SubIn and/or SubOut components). The Setup method will be empty, as its ports are "external".

 
package subnets import ( "github.com/jpaulm/gofbp/components/testrtn" "github.com/jpaulm/gofbp/core" ) type Subnet1 struct{} func (subnet *Subnet1) Setup(p *core.Process) {} func (subnet *Subnet1) Execute(p *core.Process) { net := core.NewSubnet("Subnet1", p) proc1 := net.NewProc("SubIn", &core.SubIn{}) proc2 := net.NewProc("WriteToConsole1", &testrtn.WriteToConsole{}) proc3 := net.NewProc("SubOut", &core.SubOut{}) net.Initialize("IN", proc1, "NAME") net.Connect(proc1, "OUT", proc2, "IN", 6) net.Connect(proc2, "OUT", proc3, "IN", 6) net.Initialize("OUT", proc3, "NAME") net.Run() }