HPC++ Synchronization

There are two types of synchronization mechanisms used in this HPC++ implementation: primitive synchronization objects and collective operator objects. The primitive synchronization objects are usd for thread synchronization, while the collective operations (which are based on the Hpcxx_Group class which plays a role in HPC++ that is similar to that of the communicator in MPI) can be used for collective operations on a set of threads and/or contexts.
  • Primitive Synchronization Classes
  • Sync
  • Sync Queue
  • Semaphore
  • Mutex
  • Condition Variables
  • Collective Operations
  • The HPCxx_Group Class
  • Collective Operations for Threads
  • Barrier Synchronization
  • Reduction
  • Gather
  • Collective Operations for Contexts

  • Primitive Synchronization Classes

    There are five basic synchronization classes in the library:

    1. Sync Class

    A HPCxx_Sync<T> object is a variable that can be written to once and read as many times as you want. However, if a read is attempted prior to a write, the reading thread will be blocked. Many readers can be waiting for a single HPCXX_Sync<T> object and when a value is written to it all the readers are released. Readers that come after the initial write see this as a const value. The CC++ language provides this capability through the sync modifier.

    The standard methods for HPCxx_Sync<T> are

         
    template<class T>
    class HPCxx_Sync{
        public:
           operator T();    // read a value
           operator =(T &); // copy value from var or reference
           void read(T &);  // another form of read
           void write(T &); // another form of writing
           bool peek(T &);  // TRUE if the value is there, 
                            // returns FALSE otherwise.
    };
    
    2. Sync Queue Class

    Another template called HPCxx_SyncQ<T> provides a dual "queue" of values of type T. Any attempt to read a sync variable before it is written will cause the reading thread to suspend until a value has been assigned. The thread waiting will "take the value" from the queue and continue executing. Waiting threads are also queued. The ith thread in the queue will receive the ith value written to the sync variable.

    There are several other standard methods for HPCxx_SyncQ<T>.

         
    template<class T>
    class HPCxx_SyncQ{
        public:
           operator T();    // read a value
           operator =(T &); // copy value from var or reference
           void read(T &);  // another form of read
           void write(T &); // another form of writing
           int length();    // the number of values in the queue
    
           // wait until the value is there and then
           // read the value but do not remove it from
           // the queue. The next waiting thread is signaled.
           void waitAndCopy(T& data); 
           bool peek(T &);  // same as for HPCxx_Sync<>
           
    }
    
    For example, threads that synchronize around a producer-consumer interaction can be easily build with this mechanism.
           
    class Producer: public HPCxx_Thread{
          HPCxx_SyncQ<int> &x;
      public:
          Producer( HPCxx_SyncQ<int> &y): x(y){}
          void run(){
               printf("hi there\n");
               x = 1;  // produce a value for x
          }
    };
              
    int main(int argc, char *argv[]){
       Hpcxx_Group *g;
       hpcxx_init(&argc, &argv, g);
       HPCxx_SyncQ<int> a;
       MyThread *t = new Producer(a);
       printf("start then wait for a value to be assigned\");
       t->start();
       int x = a; // consume a value here.
       hpcxx_exit(g);
       return x;
    }
    

    3. Semaphore Class

    HPCxx_CSem provide a way to wait for a group of threads to synchronize termination of a number of tasks. When constructed, a limit value is supplied and a counter is set to zero. A thread executing wait() will suspend until the counter reaches the "limit" value. The counter is then reset to zero. The operator incr() increments the counter by one.

    class HPCxx_CSem{
     public:
       HPCxx_CSem(int limit);
       void incr();       // increment the counter
       void operator++(); // another way to increment the counter
       wait();            // wait until the counter reaches the limit value
                          // then reset the counter to 0 and exit.
       int getCount();    // returns the limit value
       int setCount(int val); // sets the limit value
       int getStatus();   // returns the counter value
    };
    
    By passing a reference to a HPCxx_CSem to a group of threads each of which does a ``++'' prior to exit, you can build a multi-threaded "join" operation.

    class Worker: public HPCxx_Thread{
        HPCxx_CSem &c;
       public:
        Worker(HPCxx_CSem &c_): c(c_){}
        void run(){
             // work
             c.incr();
           }
    };
    int main(int argc, char *argv[]){
       HPCxx_Group *g;
       hpcxx_init(&argc, &argv, g);
       HPCxx_CSem cs(NUMWORKERS);
       for(int i = 0; i < NUMWORKERS; i++)
            Worker *w = new Worker(cs);
            w->start();
            }
       cs.wait(); //wait here for all workers to finish. 
       hpcxx_exit(g);
       return 0;
    }
    
    
    4. The Mutex Class

    Unlike Java, the library cannot support synchronized methods or CC++ atomic members, but a simple Mutex object with two functions lock and unlock provide the basic capability.
    class HPCxx_Mutex{
       public:
         int lock();    // Returns an implementation-dependent error code
         int unlock();  // Returns an implementation-dependent error code
    };
    
    To provide a synchronized method that only allows one thread at a time execution authority, one can introduce a private mutex variable and protect the critical section with locks as follows.
    class Myclass: public HPCxx_Runnable{
       HPCxx_Mutex l;
      public:
          void synchronized(){
               l.lock();
               ....
               l.unlock();
             }
    

    5. Condition Variables

    Whereas the HPCxx_Mutex class allows threads to synchronize by controlling their access to data via a locking mechanism, the HPCxx_Cond class allows threads to synchronize on the value of the data. Cooperating threads wait until data reaches some particular state or until some particular event occurs. A rule of thumb: use mutex locks to synchronize access to shared data and use condition variables to synchronize threads on events. Before waiting on an event, a mutex must be obtained and passed as an argument to the wait method. For example, the following code segment causes a thread to wait for a counter to reach some threshold value:

     countMutex.lock();
     while (count < threshold) {
       countCondition.wait(countMutex);
     }
     // do something now that the condition has been met
     countMutex.unlock();
    
    The condition variable must be signaled to release the waiting thread. For example
      countMutex.lock();
      count = threshold;
      countCondition.signal()
      countMutex.unlock();
    
    Notice that both the waiting thread and the signalling thread lock the same mutex. This may seem strange but it is the proper usage as the wait() method unlocks the mutex when it is called.

    Collective Operations

    Recall that an HPC++ computation consists of a set of nodes, each of which contains one or more contexts. Each context runs one or more threads.

    The HPCxx_Group Class

    At the startup time of a computation the HPC++Lib initialization creates an object called a group. The HPCxx_Group class has the following public interface.

    class HPCxx_Group{
      public:
      // Create a new group for the current context.
      HPCxx_Group(hpcxx_id_t &groupID = HPCXX_GEN_LOCAL_GROUPID,
                  const char *name = NULL);
    
      // Create a group whose membership is this context 
      //and those in the list
      HPCxx_Group(const HPCxx_ContextID *&id, 
                  int count,
                  hpcxx_id_t &groupID = HPCXX_GEN_LOCAL_GROUPID,
                  const char *name = NULL);
    
      ~HPCxx_Group();
      hpcxx_id_t &getGroupID();
      static HPCxx_Group *getGroup(hpcxx_id_t groupID);
      // Get the number of contexts that are participating
      // in this group
      int getNumContexts();
      // Return an ordered array of context IDs in
      // this group.  This array is identical for every member
      // of the group.
      HPCxx_ContextID *getContextIDs();
      // Return the context id for zero-based context <n> where
      // <n> is less than the current number of contexts
      HPCxx_ContextID getContextID(int context);
      // Set the number of threads for this group in *this*
      // context.
      void setNumThreads(int count);
      int getNumThreads();
      void setName(const char *name);
      const char *getName();
    };
    
    Groups are used to identify sets of threads and sets of contexts that participate in collective operations like barriers. In the next section, we only describe how a set of threads on a single context can use collective operations. Multi-context operations will be described in greater detail in the multi-context programming section.

    Collective Operations for Threads

    Barrier Synchronization

    The basic operation is barrier synchronization. This is accomplished in following steps:

    We first allocate an object of type HPCxx_Group and set the number of threads to the maximum number that will participate in the operation. For example, to set the thread count on the main group to be 13 we can write the following.

     
    int main(int argc, char *argv[]){
       HPCxx_Group *g
       hpcxx_init(&argc, &argv, g);
       g->setThreadCout(13);
       HPCxx_Barrier barrier(g);
    
    As shown above, a HPCxx_Barrier object must be allocated for the group. This can be accomplished in three ways:

    The constructor for the barrier takes a reference to the Group object.

    Each thread that will participate in the barrier operation must then acquire a key from the barrier object with the getKey() function. (Threads do not have a natural integer index, so the key serves to enumerate threads associated with a group. While order is not essential for the operation of a barrier, it is important for reductions and scan operations as described below.)

    The barrier can be invoked by means of the overloaded () operator as shown in the example below.

    class Worker: public HPCxx_Thread{
       int my_key;
       HPCxx_Barrier &barrier;
      public:
         Worker(HPCxx_Barrier & b): barrier(b){
           my_key = barrier.getKey();
         }
         void run(){
             while( notdone ){
                  // work
                  barrier(my_key);
              }
         }
    };
    int main(int argc, char *argv[]){
       HPCxx_Group *g;
       hpcxx_init(&argc, &argv, g);
       
       g->setThreadCount(14); // 13 thread plus main
       HPCxx_Barrier barrier(*g);
       int key0=barrier.getKey();
       
       for(int i = 0; i < 13; i++){
             Worker *w = new Worker(barrier);
             w->start();
            }
            
       barrier(key0);
       
       hpcxx_exit(g);
    }
    
    A thread can participate in more than one barrier group and a barrier can be deallocated. The thread count of a Group may be changed, a new barrier may be allocated and thread can request new keys.

    Reduction

    Let intAdd be the class:
    class intAdd{
       public:
         int & operator()(int &x, int &y) { x += y; return x;}
    };
    

    To create an object that can be used to form the sum-reduction of one integer from each thread, the declaration takes the form

    HPCxx_Reduct<int, intAdd> r(group);
    
    and it can be used in the threads as follows:

    class Worker: public HPCxx_Thread{
       int my_key;
       HPCxx_Reduct<int, intAdd> &add;
      public:
         Worker(HPCxx_Reduct<int, intAdd>  & a): add(a){
           my_key = add.acquireKey();
         }
         void run(){
                 int x =3.14*my_id;
                 // now compute the sum of all x values
                 int t =  add(my_key, x); 
              }
         }
    };
    
    The public definition of the reduction class for a single, multi-threaded context is given by

    template <class T, class Oper>
    class HPCxx_Reduct{
        public:
            HPCxx_Reduct(HPCxx_Group &, Oper op);
            T operator()(int key, T &x);
            T operator()(int key, T *buffer, int size);
            ...
     };
    
    The operation can be invoked with the overloaded () operation as in the example above, or with a version that takes a buffer of arguments of a given size. In this case the binary operator is applied pointwise and the operation is destructive (i.e. the buffer may be modified). It requires a user supplied buffer to hold the arguments and returns a pointer to the buffer that holds the result. To avoid making copies, all of the buffers are modified in the computation. This operation is designed to be as efficient as possible, so it is implemented as a tree reduction. Hence the binary operator is required to be associate, i.e.

       op(x, op(y, z)) == op( op(x, y), z)
    
    The destructive form of the operator is much faster if the size of the data type T is large.

    Gather

    Often threads need to coordinate so that one thread may share a value with the other threads, or so that a number of threads can concatenate subvectors into one vector. Both of these operations are instances of a function we call gather which is a parallel-prefix concatenation operation and is a member of the HPCxx_Collective class.

    template <class T>
    class HPCxx_Collective{
     public:
       HPCxx_Collective(HPCxx_Group *g);
       int acquireKey();
       T * gather(int threadKey, T* thread_data, int &size);
       ...
    };
    
    The gather operation allows each thread to contribute a vector of data of arbitrary size and returns the vector of concatenated subvectors (whose size is given by the value the reference parameter size when the call completes). (A broadcast is simply a special case of the gather operation, where all but one thread contributes a vector of length 0.)

    Collective Operations for Contexts

    In SPMD execution mode, the runtime system provides the same collective operations as were provided earlier for multi-threaded computation. The only semantic difference is that the collective operations apply across every context and every thread of the group.

    The only syntactic difference is that we allow a special form of the overloaded () operator that does not require a thread "key". For example, to do a barrier between contexts all we need is is the HPCxx_Group object.

    
    int main(int argc, char *argv[]){
       HPCxx_Group *context_set;
    
       hpcxx_init(&argc, &argv, context_set);
    
       HPCxx_Barrier barrier(context_set);
       HPCxx_Reduct<float, floatAdd>  
                      float_reduct(context_set, floatAdd());
       ...
       barrier();  
       float z = 3.14
       z = float_reduct(z);
        
    Note that the thread key can be used if there are multiple threads in a context that want to synchronize with the other contexts.


    hpc++@extreme.indiana.edu

    Last modified: Wed Apr 21 19:06:55 EST 1999