Process J - A Concurrent Language

Building Processes as a Network of Processes


← Previous (Selecting Input) Building Processes as a Network of Processes Next (Graceful Termination) →

Let us start with a simple process that reads integer numbers from an in channel, and sums them in an internal counter and writes this sum on an out channel.

Integrate (serial).
public proc void integrate(chan<int>.read in,
                           chan<int>.write out) {
  int total = 0;
  while (true) {
    int x;
    x = in.read();
    total = total + x;
    out.write(total);
  }
}

However, we could have built it by combining smaller processes as well. Let us consider three simple processes: delta, which reads input from an input channel and writes the read value to two output channels:

Delta (serial).
public proc void delta(chan<int>.read in,
                       chan<int>.write out1,
                       chan<int>.write out2) {
  while (true) {
    int x;
    x = in.read();
    out1.write(x);
    out2.write(x);
  }
}

Note, the two writes could have been done in parallel, so a different, but equally valid implementation could have been:

Delta (parallel).
public proc void delta(chan<int>.read in,
	               chan<int>.write out1,
	               chan<int>.write out2) {
  while (true) {
    int x;
    x = in.read();
    par {
      out1.write(x);
      out2.write(x);
    }
  }
}

We now need a plus process that reads input from two input channels, adds the read values together and writes the sum to the output channel:

Plus (parallel).
public proc void plus(chan<int>.read in1,
                      chan<int>.read in2,
                      chan<int>.write out) {
  while (true) {
    int x1, x2, sum;
    par {
      x1 = in1.read();
      x2 = in2.read();
    }
    sum = x1 + x2;
    out.write(sum);
  }
}

Again, we took advantage of being able to do things in parallel by performing the reads of in1 and in2 concurrently. Finally, we need a prefix process, which takes as a parameter an intial value, an input and an output channel. The behavior of the prefix process is to write the initial value to the output channel and then simply behave as an identity process, which will relay any value from the intput channel to the output channel.

Prefix.
public proc void prefix(int initVal,
                        chan<int>.read in,
                        chan<int>.write out) {
  out.write(initVal);
  while (true) {
    int x;
    x = in.read();
    out.write(x);
  }
}

We can now draw the following process network showing how these three processes can be combined to in the following network to form the integrate process:

ab, and c are internal channels to the integrateParallelprocess, and in and out are input and output channels to the environment. So we can now write the integrateParallel procedure:

IntegrateParallel.
public proc void integrateParallel(chan<int>.read in,
                                   chan<int>.write out) {
  chan<int> a,b,c;
  par {
    plus(in, c.read, a.write);
    prefix(0, b.read, c.write);
    delta(a.read, out, b.write);
  }
}

We can now write everything up with a producer process to produce numbers on the in channel and a consumer process to read values from the out> channel:

Full Integrator with Consumer and Producer.
public proc void consume(chan<int>.read in) {
  while (true) {
    int x;
    x = in.read();
    println(x);
  }
}

public proc void produce(chan<int>.write out) {
  int x = 0;
  while (true) {
    out.write(x);
    x++;
  }
}

proc void main(string args[]) {
  chan<int> in, out;
  par {
    produce(in.write);
    consume(out.read);
    integrate(in.read, out.write);
  }
}


← Previous (Selecting Input) Building Processes as a Network of Processes Next (Graceful Termination) →