Class Bucket
- java.lang.Object
-
- org.jcsp.lang.Bucket
-
- All Implemented Interfaces:
java.io.Serializable
public class Bucket extends java.lang.Object implements java.io.Serializable
This enables bucket synchronisation between a set of processes.Shortcut to the Constructor and Method Summaries.
Description
A bucket is a non-deterministic cousin of aBarrier
. A bucket is somewhere tofallInto
when a process needs somewhere to park itself. There is no limit on the number of processes that can fallInto a bucket - and all are blocked when they do. Release happens when a process (and it will have to be another process) chooses toflush
that bucket. When that happens, all processes in the bucket (which may be none) are rescheduled for execution.Buckets are a non-deterministic primitive, since the decision to flush is a free (internal) choice of the process concerned and the scheduling of that flush impacts on the semantics. Usually, only one process is given responsibility for flushing a bucket (or set of buckets). Flushing a bucket does not block the flusher.
Note: this notion of bucket corresponds to the BUCKET synchronisation primitive added to the KRoC occam language system.
Implementation Note
ThefallInto
andflush
methods of Bucket are just a re-badging of thewait
andnotifyAll
methods ofObject
- but without the need to gain a monitor lock and without the need to look out for the wait being interrupted.Currently, though, this is how they are implemented. Beware that a notifyAll carries an O(n) overhead (where n is the number of processes being notified), since each notified process must regain the monitor lock before it can exit the synchronized region. Future JCSP implementations of Bucket will look to follow occam kernels and reduce the overheads of both fallInto and flush to O(1).
A Simple Example
This consists of 10 workers, one bucket and one flusher:
Here is the code for this network:import org.jcsp.lang.*; public class BucketExample1 { public static void main (String[] args) { final int nWorkers = 10; final int second = 1000; // JCSP timer units are milliseconds final int interval = 5*second; final int maxWork = 10*second; final long seed = new CSTimer ().read (); // for the random number generators final Bucket bucket = new Bucket (); final Flusher flusher = new Flusher (interval, bucket); final Worker[] workers = new Worker[nWorkers]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker (i, i + seed, maxWork, bucket); } System.out.println ("*** Flusher: interval = " + interval + " milliseconds"); new Parallel ( new CSProcess[] { flusher, new Parallel (workers) } ).run (); } }
A Worker cycle consists of one shift (which takes a random amount of time) and, then, falling into the bucket:import org.jcsp.lang.*; import java.util.*; public class Worker implements CSProcess { private final int id; private final long seed; private final int maxWork; private final Bucket bucket; public Worker (int id, long seed, int maxWork, Bucket bucket) { this.id = id; this.seed = seed; this.maxWork = maxWork; this.bucket = bucket; } public void run () { final Random random = new Random (seed); // each process gets a different seed final CSTimer tim = new CSTimer (); final int second = 1000; // JCSP timer units are milliseconds final String working = "\t... Worker " + id + " working ..."; final String falling = "\t\t\t ... Worker " + id + " falling ..."; final String flushed = "\t\t\t\t\t\t ... Worker " + id + " flushed ..."; while (true) { System.out.println (working); tim.sleep (random.nextInt (maxWork)); //These lines represent one unit of work System.out.println (falling); bucket.fallInto (); System.out.println (flushed); } } }
The Flusher just flushes the bucket at preset time intervals:import org.jcsp.lang.*; import java.util.*; public class Flusher implements CSProcess { private final int interval; private final Bucket bucket; public Flusher (int interval, Bucket bucket) { this.interval = interval; this.bucket = bucket; } public void run () { final CSTimer tim = new CSTimer (); long timeout = tim.read () + interval; while (true) { tim.after (timeout); System.out.println ("*** Flusher: about to flush ..."); final int n = bucket.flush (); System.out.println ("*** Flusher: number flushed = " + n); timeout += interval; } } }
The Flying Dingbats
This consists of many buckets, a single bucket keeper (responsible for flushing the buckets) and flock of Dingbats (who regularly fall into various buckets). Here is the system diagram:
And here is the network code:import org.jcsp.lang.*; public class BucketExample2 { public static void main (String[] args) { final int minDingbat = 2; final int maxDingbat = 10; final int nDingbats = (maxDingbat - minDingbat) + 1; final int nBuckets = 2*maxDingbat; final Bucket[] bucket = Bucket.create (nBuckets); final int second = 1000; // JCSP timer units are milliseconds final int tick = second; final BucketKeeper bucketKeeper = new BucketKeeper (tick, bucket); final Dingbat[] dingbats = new Dingbat[nDingbats]; for (int i = 0; i < dingbats.length; i++) { dingbats[i] = new Dingbat (i + minDingbat, bucket); } new Parallel ( new CSProcess[] { bucketKeeper, new Parallel (dingbats) } ).run (); } }
The BucketKeeper keeps time, flushing buckets in sequence at a steady rate. When the last one has been flushed, it starts again with the first:import org.jcsp.lang.*; class BucketKeeper implements CSProcess { private final long interval; private final Bucket[] bucket; public BucketKeeper (long interval, Bucket[] bucket) { this.interval = interval; this.bucket = bucket; } public void run () { String[] spacer = new String[bucket.length]; spacer[0] = ""; for (int i = 1; i < spacer.length; i++) { spacer[i] = spacer[i - 1] + " "; } final CSTimer tim = new CSTimer (); long timeout = tim.read (); int index = 0; while (true) { final int n = bucket[index].flush (); if (n == 0) { System.out.println (spacer[index] + "*** bucket " + index + " was empty ..."); } index = (index + 1) % bucket.length; timeout += interval; tim.after (timeout); } } }
So the buckets represent time values. A process falling into one of them will sleep until the prescribed time when the BucketKeeper next flushes it.Dingbats live the following cycle. First, they do some work (rather brief in the following code). Then, they work out which bucket to fall into and fall into it - that is all. In this case, Dingbats just fly on id buckets from whence they were just flushed (where id is the Dingbat number indicated in the above network diagram):
import org.jcsp.lang.*; public class Dingbat implements CSProcess { private final int id; private final Bucket[] bucket; public Dingbat (int id, Bucket[] bucket) { this.id = id; this.bucket = bucket; } public void run () { int logicalTime = 0; String[] spacer = new String[bucket.length]; spacer[0] = ""; for (int i = 1; i < spacer.length; i++) { spacer[i] = spacer[i - 1] + " "; } String message = "Hello world from " + id + " ==> time = "; while (true) { logicalTime += id; final int slot = logicalTime % bucket.length; // assume: id <= bucket.length bucket[slot].fallInto (); System.out.println (spacer[slot] + message + logicalTime); // one unit of work } } }
Danger - Race Hazard
This example contains a race hazard whose elimination is left as an exercise for the reader. The problem is to ensure that all flushed Dingbats have settled in their next chosen buckets before the BucketKeeper next flushes it. If a Dingbat is a bit slow, it may fall into its chosen bucket too late - the BucketKeeper has already flushed it and the creature will have to remain there until the next cycle.With the rate of flushing used in the above system, this is unlikely to happen. But it is just possible - if something suspended execution of the system for a few seconds immediately following a flush, then the BucketKeeper could be rescheduled before the flying Dingbats.
Acknowledgement
This example is from a discrete event modelling approach due to Jon Kerridge (Napier University, Scotland). The Dingbats could easilly model their own timeouts for themselves. However, setting a timeout is an O(n) operation (where n is the number of processes setting them). Here, there is only one process setting timeouts (the BucketKeeper) and the bucket operations fallInto and flush have O(1) costs (at least, that is the case for occam).Of course, removal of the above race hazard means that the timeout by the BucketKeeper can also be eliminated. The buckets can be flushed just as soon as it knows that the previously flushed Dingbats have settled. In this way, the event model can proceed at full speed, maintaining correct simulated time - each bucket representing one time unit - without needing any timeouts in the simulation itself.
- See Also:
Barrier
, Serialized Form
-
-
Field Summary
Fields Modifier and Type Field Description private int
bucketCycle
Barrier uses an even/odd flag because the barrier cannot sync without every process Bucket can happily keep working while old processes are waiting around, so a flag is not enough Instead, a count must be used.private java.lang.Object
bucketLock
The monitor lock used for synchronizationprivate int
nHolding
The number of processes currently enrolled on this bucket.
-
Constructor Summary
Constructors Constructor Description Bucket()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static Bucket[]
create(int n)
Creates an array of Buckets.void
fallInto()
Fall into the bucket.int
flush()
Flush the bucket.int
holding()
This returns the number of processes currently held in the bucket.
-
-
-
Field Detail
-
nHolding
private int nHolding
The number of processes currently enrolled on this bucket.
-
bucketLock
private final java.lang.Object bucketLock
The monitor lock used for synchronization
-
bucketCycle
private int bucketCycle
Barrier uses an even/odd flag because the barrier cannot sync without every process Bucket can happily keep working while old processes are waiting around, so a flag is not enough Instead, a count must be used. Theoretically this is unsafe, but the likelihood of the bucket completing 4 *billion* cycles before the process wakes up is somewhat slim.
-
-
Method Detail
-
fallInto
public void fallInto()
Fall into the bucket. The process doing this will be blocked until the nextflush()
.
-
flush
public int flush()
Flush the bucket. All held processes will be released. It returns the number that were released.- Returns:
- the number of processes flushed.
-
holding
public int holding()
This returns the number of processes currently held in the bucket. Note that this number is volatile - for information only! By the time the invoker of this method receives it, it might have changed (because of further processes falling into the bucket or someone flushing it).- Returns:
- the number of processes currently held in the bucket.
-
create
public static Bucket[] create(int n)
Creates an array of Buckets.- Parameters:
n
- the number of Buckets to create in the array- Returns:
- the array of Buckets
-
-