There are five basic synchronization classes in the library:
1. Sync Class
A HPCxx_Sync<T> object is a variable that can be written to
once and read as many times as you want. However, if a read is attempted
prior to a write, the reading thread will be blocked. Many readers can be
waiting for a single HPCXX_Sync<T> object and when a value is written to it all
the readers are released. Readers that come after the initial write see this
as a const value. The CC++ language provides this capability
through the sync modifier.
The standard methods for HPCxx_Sync<T> are
template<class T>
class HPCxx_Sync{
public:
operator T(); // read a value
operator =(T &); // copy value from var or reference
void read(T &); // another form of read
void write(T &); // another form of writing
bool peek(T &); // TRUE if the value is there,
// returns FALSE otherwise.
};
2. Sync Queue Class
There are several other standard methods for HPCxx_SyncQ<T>.
template<class T>
class HPCxx_SyncQ{
public:
operator T(); // read a value
operator =(T &); // copy value from var or reference
void read(T &); // another form of read
void write(T &); // another form of writing
int length(); // the number of values in the queue
// wait until the value is there and then
// read the value but do not remove it from
// the queue. The next waiting thread is signaled.
void waitAndCopy(T& data);
bool peek(T &); // same as for HPCxx_Sync<>
}
For example, threads that synchronize around a producer-consumer
interaction can be easily build with this mechanism.
class Producer: public HPCxx_Thread{
HPCxx_SyncQ<int> &x;
public:
Producer( HPCxx_SyncQ<int> &y): x(y){}
void run(){
printf("hi there\n");
x = 1; // produce a value for x
}
};
int main(int argc, char *argv[]){
Hpcxx_Group *g;
hpcxx_init(&argc, &argv, g);
HPCxx_SyncQ<int> a;
MyThread *t = new Producer(a);
printf("start then wait for a value to be assigned\");
t->start();
int x = a; // consume a value here.
hpcxx_exit(g);
return x;
}
3. Semaphore Class
HPCxx_CSem provide a way to wait for a group of threads
to synchronize termination of a number of tasks. When constructed, a limit
value is supplied and a counter is set to zero. A thread executing
wait() will suspend until the counter reaches the "limit" value.
The counter is then reset to zero. The operator incr() increments
the counter by one.
class HPCxx_CSem{
public:
HPCxx_CSem(int limit);
void incr(); // increment the counter
void operator++(); // another way to increment the counter
wait(); // wait until the counter reaches the limit value
// then reset the counter to 0 and exit.
int getCount(); // returns the limit value
int setCount(int val); // sets the limit value
int getStatus(); // returns the counter value
};
By passing a reference to a HPCxx_CSem to a group of threads
each of which does a ``++'' prior to exit, you can build a
multi-threaded "join" operation.
class Worker: public HPCxx_Thread{
HPCxx_CSem &c;
public:
Worker(HPCxx_CSem &c_): c(c_){}
void run(){
// work
c.incr();
}
};
int main(int argc, char *argv[]){
HPCxx_Group *g;
hpcxx_init(&argc, &argv, g);
HPCxx_CSem cs(NUMWORKERS);
for(int i = 0; i < NUMWORKERS; i++)
Worker *w = new Worker(cs);
w->start();
}
cs.wait(); //wait here for all workers to finish.
hpcxx_exit(g);
return 0;
}
4. The Mutex Class
class HPCxx_Mutex{
public:
int lock(); // Returns an implementation-dependent error code
int unlock(); // Returns an implementation-dependent error code
};
To provide a synchronized method that only allows one thread at a
time execution authority, one can introduce a private mutex
variable and protect the critical section with locks as follows.
class Myclass: public HPCxx_Runnable{
HPCxx_Mutex l;
public:
void synchronized(){
l.lock();
....
l.unlock();
}
5. Condition Variables
Whereas the HPCxx_Mutex
class allows threads to synchronize by controlling their access to data via
a locking mechanism, the HPCxx_Cond class allows threads to
synchronize on the value of the data. Cooperating threads wait until data
reaches some particular state or until some particular event occurs. A rule
of thumb: use mutex locks to synchronize access to shared data and use
condition variables to synchronize threads on events.
Before waiting on an event, a mutex must be obtained and passed as an
argument to the wait method. For example, the following code segment causes
a thread to wait for a counter to reach some threshold value:
countMutex.lock();
while (count < threshold) {
countCondition.wait(countMutex);
}
// do something now that the condition has been met
countMutex.unlock();
The condition variable must be signaled to release the waiting thread.
For example
countMutex.lock(); count = threshold; countCondition.signal() countMutex.unlock();Notice that both the waiting thread and the signalling thread lock the same mutex. This may seem strange but it is the proper usage as the wait() method unlocks the mutex when it is called.
Recall that an HPC++ computation consists of a set of nodes, each of which contains one or more contexts. Each context runs one or more threads.
The HPCxx_Group Class
At the startup time of a computation the HPC++Lib initialization creates an
object called a group. The HPCxx_Group class has the following public
interface.
class HPCxx_Group{
public:
// Create a new group for the current context.
HPCxx_Group(hpcxx_id_t &groupID = HPCXX_GEN_LOCAL_GROUPID,
const char *name = NULL);
// Create a group whose membership is this context
//and those in the list
HPCxx_Group(const HPCxx_ContextID *&id,
int count,
hpcxx_id_t &groupID = HPCXX_GEN_LOCAL_GROUPID,
const char *name = NULL);
~HPCxx_Group();
hpcxx_id_t &getGroupID();
static HPCxx_Group *getGroup(hpcxx_id_t groupID);
// Get the number of contexts that are participating
// in this group
int getNumContexts();
// Return an ordered array of context IDs in
// this group. This array is identical for every member
// of the group.
HPCxx_ContextID *getContextIDs();
// Return the context id for zero-based context <n> where
// <n> is less than the current number of contexts
HPCxx_ContextID getContextID(int context);
// Set the number of threads for this group in *this*
// context.
void setNumThreads(int count);
int getNumThreads();
void setName(const char *name);
const char *getName();
};
Groups are used to identify sets of threads and sets of contexts that
participate in collective operations like barriers. In the next section, we
only describe how a set of threads on a single context can use collective
operations. Multi-context operations will be described in greater detail in
the multi-context programming section.
Collective Operations for Threads
Barrier Synchronization
The basic operation is barrier synchronization. This is
accomplished in following steps:
We first allocate an object of type HPCxx_Group and set the number of threads to the maximum number that will participate in the operation. For example, to set the thread count on the main group to be 13 we can write the following.
int main(int argc, char *argv[]){
HPCxx_Group *g
hpcxx_init(&argc, &argv, g);
g->setThreadCout(13);
HPCxx_Barrier barrier(g);
As shown above, a HPCxx_Barrier object must be allocated
for the group. This can be accomplished in three ways:
The constructor for the barrier takes a reference to the Group object.
Each thread that will participate in the barrier operation must then acquire a key from the barrier object with the getKey() function. (Threads do not have a natural integer index, so the key serves to enumerate threads associated with a group. While order is not essential for the operation of a barrier, it is important for reductions and scan operations as described below.)
The barrier can be invoked by means of the overloaded () operator as shown in the example below.
class Worker: public HPCxx_Thread{
int my_key;
HPCxx_Barrier &barrier;
public:
Worker(HPCxx_Barrier & b): barrier(b){
my_key = barrier.getKey();
}
void run(){
while( notdone ){
// work
barrier(my_key);
}
}
};
int main(int argc, char *argv[]){
HPCxx_Group *g;
hpcxx_init(&argc, &argv, g);
g->setThreadCount(14); // 13 thread plus main
HPCxx_Barrier barrier(*g);
int key0=barrier.getKey();
for(int i = 0; i < 13; i++){
Worker *w = new Worker(barrier);
w->start();
}
barrier(key0);
hpcxx_exit(g);
}
A thread can participate in more than one barrier group and a
barrier can be deallocated. The thread count of a Group
may be changed, a new barrier may be allocated and thread can
request new keys.
Reduction
Let intAdd be the class:
class intAdd{
public:
int & operator()(int &x, int &y) { x += y; return x;}
};
To create an object that can be used to form the sum-reduction of one integer from each thread, the declaration takes the form
HPCxx_Reduct<int, intAdd> r(group);and it can be used in the threads as follows:
class Worker: public HPCxx_Thread{
int my_key;
HPCxx_Reduct<int, intAdd> &add;
public:
Worker(HPCxx_Reduct<int, intAdd> & a): add(a){
my_key = add.acquireKey();
}
void run(){
int x =3.14*my_id;
// now compute the sum of all x values
int t = add(my_key, x);
}
}
};
The public definition of the reduction class for a single,
multi-threaded context is given by
template <class T, class Oper>
class HPCxx_Reduct{
public:
HPCxx_Reduct(HPCxx_Group &, Oper op);
T operator()(int key, T &x);
T operator()(int key, T *buffer, int size);
...
};
The operation can be invoked with the overloaded () operation as in
the example above, or with a version that takes a buffer of arguments of a
given size. In this case the binary operator is applied pointwise and the
operation is destructive (i.e. the buffer may be modified). It requires a
user supplied buffer to hold the arguments and returns a pointer to the
buffer that holds the result. To avoid making copies, all of the buffers are
modified in the computation. This operation is designed to be as efficient
as possible, so it is implemented as a tree reduction. Hence the binary
operator is required to be associate, i.e.
op(x, op(y, z)) == op( op(x, y), z)The destructive form of the operator is much faster if the size of the data type T is large.
Gather
Often threads need to coordinate so that one thread may share a
value with the other threads, or so that a number of threads can
concatenate subvectors into one vector. Both of these
operations are instances of a function we call gather which is a
parallel-prefix concatenation operation and is a member of the
HPCxx_Collective class.
template <class T>
class HPCxx_Collective{
public:
HPCxx_Collective(HPCxx_Group *g);
int acquireKey();
T * gather(int threadKey, T* thread_data, int &size);
...
};
The gather operation allows each thread to contribute a vector of data of
arbitrary size and returns the vector of concatenated subvectors (whose size
is given by the value the reference parameter size when the call completes).
(A broadcast is simply a special case of the gather operation, where all but
one thread contributes a vector of length 0.)
Collective Operations for Contexts
In SPMD execution mode, the runtime system provides the same collective operations as were provided earlier for multi-threaded computation. The only semantic difference is that the collective operations apply across every context and every thread of the group.
The only syntactic difference is that we allow a special form of the overloaded () operator that does not require a thread "key". For example, to do a barrier between contexts all we need is is the HPCxx_Group object.
int main(int argc, char *argv[]){
HPCxx_Group *context_set;
hpcxx_init(&argc, &argv, context_set);
HPCxx_Barrier barrier(context_set);
HPCxx_Reduct<float, floatAdd>
float_reduct(context_set, floatAdd());
...
barrier();
float z = 3.14
z = float_reduct(z);
Note that the thread key can be used if there are multiple threads in a
context that want to synchronize with the other contexts.
Last modified: Wed Apr 21 19:06:55 EST 1999