Package org.jcsp.lang

Class Bucket

java.lang.Object
org.jcsp.lang.Bucket
All Implemented Interfaces:
Serializable

public class Bucket extends Object implements Serializable
This enables bucket synchronisation between a set of processes.

Shortcut to the Constructor and Method Summaries.

Description

A bucket is a non-deterministic cousin of a Barrier. A bucket is somewhere to fallInto when a process needs somewhere to park itself. There is no limit on the number of processes that can fallInto a bucket - and all are blocked when they do. Release happens when a process (and it will have to be another process) chooses to flush that bucket. When that happens, all processes in the bucket (which may be none) are rescheduled for execution.

Buckets are a non-deterministic primitive, since the decision to flush is a free (internal) choice of the process concerned and the scheduling of that flush impacts on the semantics. Usually, only one process is given responsibility for flushing a bucket (or set of buckets). Flushing a bucket does not block the flusher.

Note: this notion of bucket corresponds to the BUCKET synchronisation primitive added to the KRoC occam language system.

Implementation Note

The fallInto and flush methods of Bucket are just a re-badging of the wait and notifyAll methods of Object - but without the need to gain a monitor lock and without the need to look out for the wait being interrupted.

Currently, though, this is how they are implemented. Beware that a notifyAll carries an O(n) overhead (where n is the number of processes being notified), since each notified process must regain the monitor lock before it can exit the synchronized region. Future JCSP implementations of Bucket will look to follow occam kernels and reduce the overheads of both fallInto and flush to O(1).

A Simple Example

This consists of 10 workers, one bucket and one flusher:

Here is the code for this network:
 import org.jcsp.lang.*;
 
 public class BucketExample1 {
 
   public static void main (String[] args) {
 
     final int nWorkers = 10;
 
     final int second = 1000;
     // JCSP timer units are milliseconds
     final int interval = 5*second;
     final int maxWork = 10*second;
 
     final long seed = new CSTimer ().read ();
     // for the random number generators
 
     final Bucket bucket = new Bucket ();
 
     final Flusher flusher = new Flusher (interval, bucket);
 
     final Worker[] workers = new Worker[nWorkers];
     for (int i = 0; i invalid input: '<' workers.length; i++) {
       workers[i] = new Worker (i, i + seed, maxWork, bucket);
     }
 
     System.out.println ("*** Flusher: interval = " + interval
                         + " milliseconds");
 
     new Parallel (
       new CSProcess[] {
         flusher,
         new Parallel (workers)
       }
     ).run ();
 
   }
 
 }
 
A Worker cycle consists of one shift (which takes a random amount of time) and, then, falling into the bucket:
 import org.jcsp.lang.*;
 import java.util.*;
 
 public class Worker implements CSProcess {
 
   private final int id;
   private final long seed;
   private final int maxWork;
   private final Bucket bucket;
 
   public Worker (int id, long seed, int maxWork, Bucket bucket) {
     this.id = id;
     this.seed = seed;
     this.maxWork = maxWork;
     this.bucket = bucket;
   }
 
   public void run () {
 
     final Random random = new Random (seed);
     // each process gets a different seed
 
     final CSTimer tim = new CSTimer ();
     final int second = 1000;
     // JCSP timer units are milliseconds
 
     final String working = "\t... Worker " + id
                            + " working ...";
     final String falling = "\t\t\t     ... Worker " + id
                            + " falling ...";
     final String flushed = "\t\t\t\t\t\t  ... Worker "
                            + id + " flushed ...";
 
     while (true) {
       System.out.println (working);
       tim.sleep (random.nextInt (maxWork));
       //These lines represent one unit of work
 
       System.out.println (falling);
       bucket.fallInto ();
       System.out.println (flushed);
     }
   }
 
 }
 
The Flusher just flushes the bucket at preset time intervals:
import org.jcsp.lang.*;
 import java.util.*;
 
 public class Flusher implements CSProcess {
 
   private final int interval;
   private final Bucket bucket;
 
   public Flusher (int interval, Bucket bucket) {
     this.interval = interval;
     this.bucket = bucket;
   }
 
   public void run () {
 
     final CSTimer tim = new CSTimer ();
     long timeout = tim.read () + interval;
 
     while (true) {
       tim.after (timeout);
       System.out.println ("*** Flusher: about to flush ...");
       final int n = bucket.flush ();
       System.out.println ("*** Flusher: number flushed = " + n);
       timeout += interval;
     }
   }
 
 }
 

The Flying Dingbats

This consists of many buckets, a single bucket keeper (responsible for flushing the buckets) and flock of Dingbats (who regularly fall into various buckets). Here is the system diagram:

And here is the network code:
 import org.jcsp.lang.*;
 
 public class BucketExample2 {
 
   public static void main (String[] args) {
 
     final int minDingbat = 2;
     final int maxDingbat = 10;
     final int nDingbats = (maxDingbat - minDingbat) + 1;
 
     final int nBuckets = 2*maxDingbat;
 
     final Bucket[] bucket = Bucket.create (nBuckets);
 
     final int second = 1000;
     // JCSP timer units are milliseconds
     final int tick = second;
     final BucketKeeper bucketKeeper = new BucketKeeper (tick, bucket);
 
     final Dingbat[] dingbats = new Dingbat[nDingbats];
     for (int i = 0; i invalid input: '<' dingbats.length; i++) {
       dingbats[i] = new Dingbat (i + minDingbat, bucket);
     }
 
     new Parallel (
       new CSProcess[] {
         bucketKeeper,
         new Parallel (dingbats)
       }
     ).run ();
 
   }
 
 }
 
The BucketKeeper keeps time, flushing buckets in sequence at a steady rate. When the last one has been flushed, it starts again with the first:
 import org.jcsp.lang.*;
 
 class BucketKeeper implements CSProcess {
 
   private final long interval;
   private final Bucket[] bucket;
 
   public BucketKeeper (long interval, Bucket[] bucket) {
     this.interval = interval;
     this.bucket = bucket;
   }
 
   public void run () {
 
     String[] spacer = new String[bucket.length];
     spacer[0] = "";
     for (int i = 1; i invalid input: '<' spacer.length; i++) {
       spacer[i] = spacer[i - 1] + "  ";
     }
 
     final CSTimer tim = new CSTimer ();
     long timeout = tim.read ();
     int index = 0;
 
     while (true) {
       final int n = bucket[index].flush ();
       if (n == 0) {
         System.out.println (spacer[index] + "*** bucket " +
                             index + " was empty ...");
       }
       index = (index + 1) % bucket.length;
       timeout += interval;
       tim.after (timeout);
     }
   }
 
 }
 
So the buckets represent time values. A process falling into one of them will sleep until the prescribed time when the BucketKeeper next flushes it.

Dingbats live the following cycle. First, they do some work (rather brief in the following code). Then, they work out which bucket to fall into and fall into it - that is all. In this case, Dingbats just fly on id buckets from whence they were just flushed (where id is the Dingbat number indicated in the above network diagram):

 import org.jcsp.lang.*;
 
 public class Dingbat implements CSProcess {
 
   private final int id;
   private final Bucket[] bucket;
 
   public Dingbat (int id, Bucket[] bucket) {
     this.id = id;
     this.bucket = bucket;
   }
 
   public void run () {
 
     int logicalTime = 0;
 
     String[] spacer = new String[bucket.length];
     spacer[0] = "";
     for (int i = 1; i invalid input: '<' spacer.length; i++) {
       spacer[i] = spacer[i - 1] + "  ";
     }
 
     String message = "Hello world from " + id + " ==> time = ";
 
     while (true) {
       logicalTime += id;
       final int slot = logicalTime % bucket.length;
       // assume: id invalid input: '<'= bucket.length
       bucket[slot].fallInto ();
       System.out.println (spacer[slot] + message + logicalTime);
       // one unit of work
     }
   }
 
 }
 

Danger - Race Hazard

This example contains a race hazard whose elimination is left as an exercise for the reader. The problem is to ensure that all flushed Dingbats have settled in their next chosen buckets before the BucketKeeper next flushes it. If a Dingbat is a bit slow, it may fall into its chosen bucket too late - the BucketKeeper has already flushed it and the creature will have to remain there until the next cycle.

With the rate of flushing used in the above system, this is unlikely to happen. But it is just possible - if something suspended execution of the system for a few seconds immediately following a flush, then the BucketKeeper could be rescheduled before the flying Dingbats.

Acknowledgement

This example is from a discrete event modelling approach due to Jon Kerridge (Napier University, Scotland). The Dingbats could easilly model their own timeouts for themselves. However, setting a timeout is an O(n) operation (where n is the number of processes setting them). Here, there is only one process setting timeouts (the BucketKeeper) and the bucket operations fallInto and flush have O(1) costs (at least, that is the case for occam).

Of course, removal of the above race hazard means that the timeout by the BucketKeeper can also be eliminated. The buckets can be flushed just as soon as it knows that the previously flushed Dingbats have settled. In this way, the event model can proceed at full speed, maintaining correct simulated time - each bucket representing one time unit - without needing any timeouts in the simulation itself.

See Also:
  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    private int
    Barrier uses an even/odd flag because the barrier cannot sync without every process Bucket can happily keep working while old processes are waiting around, so a flag is not enough Instead, a count must be used.
    private final Object
    The monitor lock used for synchronization
    private int
    The number of processes currently enrolled on this bucket.
  • Constructor Summary

    Constructors
    Constructor
    Description
     
  • Method Summary

    Modifier and Type
    Method
    Description
    static Bucket[]
    create(int n)
    Creates an array of Buckets.
    void
    Fall into the bucket.
    int
    Flush the bucket.
    int
    This returns the number of processes currently held in the bucket.

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Field Details

    • nHolding

      private int nHolding
      The number of processes currently enrolled on this bucket.
    • bucketLock

      private final Object bucketLock
      The monitor lock used for synchronization
    • bucketCycle

      private int bucketCycle
      Barrier uses an even/odd flag because the barrier cannot sync without every process Bucket can happily keep working while old processes are waiting around, so a flag is not enough Instead, a count must be used. Theoretically this is unsafe, but the likelihood of the bucket completing 4 *billion* cycles before the process wakes up is somewhat slim.
  • Constructor Details

    • Bucket

      public Bucket()
  • Method Details

    • fallInto

      public void fallInto()
      Fall into the bucket. The process doing this will be blocked until the next flush().
    • flush

      public int flush()
      Flush the bucket. All held processes will be released. It returns the number that were released.

      Returns:
      the number of processes flushed.
    • holding

      public int holding()
      This returns the number of processes currently held in the bucket. Note that this number is volatile - for information only! By the time the invoker of this method receives it, it might have changed (because of further processes falling into the bucket or someone flushing it).

      Returns:
      the number of processes currently held in the bucket.
    • create

      public static Bucket[] create(int n)
      Creates an array of Buckets.

      Parameters:
      n - the number of Buckets to create in the array
      Returns:
      the array of Buckets