Package org.jcsp.lang

Class Parallel

  • All Implemented Interfaces:
    CSProcess
    Direct Known Subclasses:
    PriParallel

    public class Parallel
    extends java.lang.Object
    implements CSProcess
    This process constructor taks an array of CSProcesses and returns a CSProcess that is the parallel composition of its process arguments.

    Shortcut to the Constructor and Method Summaries.

    Description

    The Parallel constructor taks an array of CSProcesses and returns a CSProcess that is the parallel composition of its process arguments. A run of a Parallel process terminates when, and only when, all its component processes terminate.

    Note: for those familiar with the occam multiprocessing language, the Parallel class gives the semantics of the PAR construct. However, none of the parallel usage checks mandated by occam can be made by the Java compiler, so we need to exercise that care ourselves. For instance, do not try to run the same process instance more than once in parallel and, generally, watch out for accidentally shared objects! Running different instances of the same process in parallel is, of course, allowed.

    CSProcesses can be added to a Parallel object either via the constructor or the addProcess methods. If a call to addProcess is made while the run method is executing, the extra process(es) will not be included in the network until the next time run is invoked.

    CSProcesses can be removed from a Parallel object via the removeProcess or removeAllProcesses method. If a call to removeProcess or removeAllProcesses is made while the run method is executing, the process will not be removed from the network until the next time run is invoked.

    Note: to add/remove a process to/from a network whilst it is running, see the ProcessManager class.

    Example

    The following examples demonstrate high and low level use of Parallel.

    High Level

    This high-level example sets up a communicating network of (in this case non-terminating) processes. Data-flow diagrams are a great help for designing, understanding and maintaining such parallel systems:

    Here is the JCSP code:
     import org.jcsp.lang.*;
     import org.jcsp.plugNplay.*;
     
     class ParaplexIntExample {
     
       public static void main (String[] args) {
     
         final One2OneChannelInt[] a = Channel.one2oneIntArray (3);
         final One2OneChannel b = Channel.one2one ();
     
         new Parallel (
           new CSProcess[] {
             new NumbersInt (a[0].out ()),
             new SquaresInt (a[1].out ()),
             new FibonacciInt (a[2].out ()),
             new ParaplexInt (Channel.getInputArray (a), b.out ()),
             new CSProcess () {
               public void run () {
                 System.out.println ("\n\t\tNumbers\t\tSquares\t\tFibonacci\n");
                 while (true) {
                   int[] data = (int[]) b.in ().read ();
                   for (int i = 0; i < data.length; i++) {
                     System.out.print ("\t\t" + data[i]);
                   }
                   System.out.println ();
                 }
               }
             }
           }
         ).run ();
       }
     
     }
     
    This example tabulates columns of (respectively) natural numbers, perfect squares and the Fibonacci sequence. At this level, we are only aware of five communicating processes: three that generate the respective sequences of integers, one that multiplexes a single item from each sequence into a single packet and the in-lined process that receives this packet and tabulates its contents. And, at this level, that is all we need to think about.

    However, clicking on any of the generator processes reveals sub-networks (and, in the case of SquaresInt and FibonacciInt, sub-sub-networks). Altogether, the example contains 28 parallel processes -- 18 of them high-level (and non-terminating) and 10 low-level (and transient, but repeatedly re-invoked). One of the key benefits of CSP is that its semantics are compositional -- i.e. we do not have to reason about all those 28 processes at the same time to reason about how they behave in this application. We can build up the complexity in layers.

    Note: the above example is just to build fluency with the CSP/occam concept of parallel composition and to show how easy it is. The network decomposes into fine-grained stateless components that would be excellent if we were refining this application down to a silicon (e.g. FPGA) implementation -- but for software running on a uni-processor JVM, we would not suggest going quite so far!

    Note: the above layered network of communicating parallel processes is completely deterministic. It will produce the same results regardless of the scheduling characteristics of the underlying JVM and regardless of its physical distribution on to separate processors (and their relative speeds). This default determinism is one of the founding strengths of CSP concurrency that reinforces confidence in the systems we build with it.

    Non-determinism, of course, needs to be addressed for many applications and is catered for in JCSP by its Alternative construct (which corresponds to the CSP external choice operator and occam ALT), by its any-1, 1-any and any-any channels (e.g. Any2OneChannel) and by the overwriting semantics that can be defined for its channels (e.g. OverWriteOldestBuffer). The fact that non-determinism has to be explicitly introduced reduces the chance of overlooking race-hazards caused by that non-determinism.

    Low Level

    For a low-level application of Parallel, here is the implementation of the ParaplexInt process used in the high-level example above:
     package org.jcsp.plugNplay.ints;
     
     import org.jcsp.lang.*;
     
     public final class ParaplexInt implements CSProcess {
     
       private final ChannelInputInt[] in;
     
       private final ChannelOutput out;
     
       public ParaplexInt (final ChannelInputInt[] in, final ChannelOutput out) {
         this.in = in;
         this.out = out;
       }
     
       public void run () {
     
         final ProcessReadInt[] inputProcess = new ProcessReadInt[in.length];
         for (int i = 0; i < in.length; i++) {
           inputProcess[i] = new ProcessReadInt (in[i]);
         }
     
         Parallel parInput = new Parallel (inputProcess);
     
         int[][] data = new int[2][in.length];               // double-buffer
         int index = 0;                                      // initial buffer index
     
         while (true) {
           parInput.run ();
           int[] buffer = data[index];                       // grab a buffer
           for (int i = 0; i < in.length; i++) {
             buffer[i] = inputProcess[i].value;
           }
           out.write (buffer);
           index = 1 - index;                                // switch buffers
         }
     
       }
     
     }
     
    Note that the Parallel object (parInput) is constructed once and contains an array of processes (ProcessReadInt), each of which performs only a single channel input and, then, terminates. Each time it is run (parInput.run inside the loop), all those sub-processes run concurrently -- the parallel run terminating when, and only when, all those sub-processes have terminated. See the documentation of ParaplexInt for the motivation for this low-level concurrency (and for the double-buffering).

    Implementation Note

    The Parallel object creates new Threads to run the first (n - 1) of its processes, running the last one in its own thread of control. After each run of the Parallel CSProcess, all those threads are parked for reuse in the next run. Thus in the above low-level application, the overhead for Java thread creation for the internal concurrency is only incurred on its first cycle. All these implementation Threads are daemons and, so, will terminate if everything else terminates.

    If a Parallel process has finished its run() and is not going to be used again, its parked threads may be unparked and terminated by invoking its releaseAllThreads method. This will release the memory used by those threads.

    See Also:
    CSProcess, ProcessManager, Sequence
    • Field Summary

      Fields 
      Modifier and Type Field Description
      private static java.util.Set allParThreads
      The threads created by all Parallel and ProcessManager objects.
      private Barrier barrier
      Used to synchronise the termination of processes in each run of Parallel
      private static boolean destroyCalled
      Indicates that the destroy() method has already been called.
      private static boolean displayErrors
      TRUE iff uncaught errors are to the displayed.
      private static boolean displayExceptions
      TRUE iff uncaught exceptions are to be displayed.
      private int nProcesses
      The number of processes in this Parallel
      private int nThreads
      The number of threads created so far by this Parallel
      private ParThread[] parThreads
      A pool of ParThreads
      private boolean priority  
      private CSProcess[] processes
      The processes to be executed in Parallel
      private boolean processesChanged  
      private java.lang.Object sync
      Monitor for internal synchronisation.
    • Constructor Summary

      Constructors 
      Constructor Description
      Parallel()
      Construct a new Parallel object initially without any processes.
      Parallel​(boolean priority)
      Construct a new Parallel object initially without any processes.
      Parallel​(CSProcess[] processes)
      Construct a new Parallel object with the processes specified.
      Parallel​(CSProcess[][] processes)
      Construct a new Parallel object with the processes specified.
      Parallel​(CSProcess[] processes, boolean priority)
      Construct a new Parallel object with the processes specified.
    • Method Summary

      All Methods Static Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void addProcess​(CSProcess process)
      Add the process to the Parallel object.
      void addProcess​(CSProcess[] newProcesses)
      Add the array of processes to the Parallel object.
      (package private) static void addToAllParThreads​(java.lang.Thread newThread)
      Adds the thread object to the allParThreads collection.
      static void destroy()
      Stops all threads created by all Parallel and ProcessManager objects.
      protected void finalize()
      System finalizer.
      int getNumberProcesses()  
      void insertProcessAt​(CSProcess process, int index)
      Insert another process to the pri-parallel object at the specifed index.
      void releaseAllThreads()
      Release all threads saved by the Parallel object for future runs - the threads all terminate and release their associated workspaces.
      void removeAllProcesses()
      Remove all processes from the Parallel object.
      (package private) static void removeFromAllParThreads​(java.lang.Thread oldThread)
      Removes the thread object from the allParThreads collection.
      void removeProcess​(CSProcess process)
      Remove the process from the Parallel object.
      static void resetDestroy()
      Cancels a call to destroy allowing the JCSP system to be reused.
      void run()
      Run the parallel composition of the processes registered with this Parallel object.
      static void setUncaughtErrorDisplay​(boolean enable)
      Enables or disables the display or Errors uncaught by a CSProcess running within a Parallel or under a ProcessManager object.
      static void setUncaughtExceptionDisplay​(boolean enable)
      Enables or disables the display of Exceptions uncaught by a CSProcess running within a Parallel or under a ProcessManager object.
      (package private) static void uncaughtException​(java.lang.String caller, java.lang.Throwable t)  
      • Methods inherited from class java.lang.Object

        clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Field Detail

      • sync

        private final java.lang.Object sync
        Monitor for internal synchronisation.
      • processes

        private CSProcess[] processes
        The processes to be executed in Parallel
      • nProcesses

        private int nProcesses
        The number of processes in this Parallel
      • parThreads

        private ParThread[] parThreads
        A pool of ParThreads
      • nThreads

        private int nThreads
        The number of threads created so far by this Parallel
      • barrier

        private Barrier barrier
        Used to synchronise the termination of processes in each run of Parallel
      • priority

        private boolean priority
      • processesChanged

        private boolean processesChanged
      • allParThreads

        private static final java.util.Set allParThreads
        The threads created by all Parallel and ProcessManager objects.
      • destroyCalled

        private static boolean destroyCalled
        Indicates that the destroy() method has already been called.
      • displayExceptions

        private static boolean displayExceptions
        TRUE iff uncaught exceptions are to be displayed.
      • displayErrors

        private static boolean displayErrors
        TRUE iff uncaught errors are to the displayed.
    • Constructor Detail

      • Parallel

        public Parallel()
        Construct a new Parallel object initially without any processes.
      • Parallel

        Parallel​(boolean priority)
        Construct a new Parallel object initially without any processes. If the priority parameter has the value true, priorities higher in the process list will be given a higher priority.
        Parameters:
        priority - indicates that different priorities should be given to processes.
      • Parallel

        public Parallel​(CSProcess[] processes)
        Construct a new Parallel object with the processes specified.
        Parameters:
        processes - The processes to be executed in parallel
      • Parallel

        public Parallel​(CSProcess[][] processes)
        Construct a new Parallel object with the processes specified.
        Parameters:
        processes - The processes to be executed in parallel
      • Parallel

        Parallel​(CSProcess[] processes,
                 boolean priority)
        Construct a new Parallel object with the processes specified. If the priority parameter has the value true, priorities higher in the process list will be given a higher priority.
        Parameters:
        processes - the processes to be executed in parallel
        priority - indicates that different priorities should be given to processes.
    • Method Detail

      • addToAllParThreads

        static void addToAllParThreads​(java.lang.Thread newThread)
                                throws java.lang.InterruptedException
        Adds the thread object to the allParThreads collection. This should be called by any infrastructure threads when they start.
        Parameters:
        newThread - the thread to be added to the collection.
        Throws:
        java.lang.InterruptedException
      • removeFromAllParThreads

        static void removeFromAllParThreads​(java.lang.Thread oldThread)
        Removes the thread object from the allParThreads collection.
      • destroy

        public static void destroy()
        Stops all threads created by all Parallel and ProcessManager objects. No new threads can be created until the resetDestroy method gets called.
      • resetDestroy

        public static void resetDestroy()
        Cancels a call to destroy allowing the JCSP system to be reused. This is provided to that destroy can be called from an Applet's termination method, but the Applet can be restarted later.
      • addProcess

        public void addProcess​(CSProcess process)
        Add the process to the Parallel object. The extended network will be executed the next time run() is invoked.
        Parameters:
        process - the CSProcess to be added
      • addProcess

        public void addProcess​(CSProcess[] newProcesses)
        Add the array of processes to the Parallel object. The extended network will be executed the next time run() is invoked.
        Parameters:
        newProcesses - the CSProcesses to be added
      • insertProcessAt

        public void insertProcessAt​(CSProcess process,
                                    int index)
        Insert another process to the pri-parallel object at the specifed index. The point of insertion is significant because the ordering of process components determines the priorities. The extended network will be executed the next time run() is invoked.

        Parameters:
        process - the process to be inserted
        index - the index at which to insert the process
      • removeProcess

        public void removeProcess​(CSProcess process)
        Remove the process from the Parallel object. The cut-down network will not be executed until the next time run() is invoked.
        Parameters:
        process - the CSProcess to be removed
      • removeAllProcesses

        public void removeAllProcesses()
        Remove all processes from the Parallel object. The cut-down network will not be executed until the next time run() is invoked.
      • finalize

        protected void finalize()
                         throws java.lang.Throwable
        System finalizer. When this object falls out of scope it will release all of the threads that it has allocated.
        Overrides:
        finalize in class java.lang.Object
        Throws:
        java.lang.Throwable
      • releaseAllThreads

        public void releaseAllThreads()
        Release all threads saved by the Parallel object for future runs - the threads all terminate and release their associated workspaces. This should only be executed when the Parallel object is not running. If this Parallel object is run again, the necessary threads will be recreated.
      • getNumberProcesses

        public int getNumberProcesses()
        Returns:
        the number of processes currently registered.
      • run

        public void run()
        Run the parallel composition of the processes registered with this Parallel object. It terminates when, and only when, all its component processes terminate.

        Implementation note: In its first run, only (numProcesses - 1) Threads are created to run the processes -- the last process is executed in the invoking Thread. Sunsequent runs reuse these Threads (so the overhead of thread creation happens only once).

        Specified by:
        run in interface CSProcess
      • setUncaughtExceptionDisplay

        public static void setUncaughtExceptionDisplay​(boolean enable)
        Enables or disables the display of Exceptions uncaught by a CSProcess running within a Parallel or under a ProcessManager object.
      • setUncaughtErrorDisplay

        public static void setUncaughtErrorDisplay​(boolean enable)
        Enables or disables the display or Errors uncaught by a CSProcess running within a Parallel or under a ProcessManager object.
      • uncaughtException

        static void uncaughtException​(java.lang.String caller,
                                      java.lang.Throwable t)