Concurrent Building Blocks for Manycore Service Orchestration
SOX is a concurrent service orchestration library for .NET. The principle goal of SOX is to improve the programming productivity of r the service-as-component software, particularly on the multicore/manycore hardware platform. The service orchestration constructs in SOX is based on WS-BPEL, which is a standard Web Service workflow language. However SOX itself is not a BPEL engine or interpreter. Instead, SOX is a C# library, which contains numbers of concurrent programming patterns for the service orchestration. We prefer the fine-grained library over a monolithic workflow engine is because that the library can provide a more flexible and intuitive programming environment by leveraging the C# language and .NET libraries. While the verbose/awkward WS-BPEL language is daunting and unsafe, programming in C# enables us to take huge advantage from the .NET platform and language itself (such as type safety and numerous .NET libraries).
SOX is built upon Microsoft CCR (Concurrency and Coordination Runtime) of MS Robotic Studio, and it heavily relies on the concurrency abstractions (e.g., Port and Arbiters) of CCR.
The basic building block in SOX is class AsynActivity, which is an abstract class for all potential service-orchestration activities, either atomic or compound ones. An AsynActivity can be executed by invokes its Execute method, which will return an AsynActivityInstance object. Basically AsynActivityInstance is just a handle to the execution of the activity and it contains at least three CCR ports, via which users can tell if the execution is completed or faulted or can abort the execution at any moment.

Below code shows the general programming pattern of using AsynActivity.
// execute the activity asynchronously
AsynInstance instance = act.Execute(parentScope);
// register the callback for what to do when the exeuction is done
Arbiter.Activate(dq, Arbiter.Choice(
Arbiter.Receive(false,
instance.CompletionPort.Ports.P0,
delegate(bool
b)
{
//the exeuction of the instance is done;
}),
Arbiter.Receive(false,
instance.CompletionPort.Ports.P1,
delegate(Exception
exp)
{
//exception is thrown during the exeuction;
}),
Arbiter.Receive(false,
myAbortPort,
delegate(EmptyValue
abort)
{
// the execution is asked to be aborted
// then aborts the execution of the instance;
instance.AbortPort.Post(EmptyValue.SharedInstance);
})
));
However users usually don’t need to program in this way, all those detail will be hidden by SOX library. To simplify the programming, SOX provides a static class Orchestrate which contains a collection of static methods as the factory methods for the each AsynActivity class.
·
Orchestrate.FromHandler
TypedInvoke<RequestT, ResponseT> Orchestrate.Invoke<RequestT, ResponseT>(ServiceOperation<RequestT, ResponseT> op, RequestT request);
SOX assumes the to-be-invoked service already has its local proxy, which
provides the typed interface, and all the XML message types needed by the
service have been data-bound as the C# classes. The service can be any regular
web service or DSS
service, for which WCF can generate the proxy, for which the MSRS can
generate the proxy. ServiceOperation<RequestT, ResponseT> is a generic delegator, whose signature is :
delegate PortSet<ResT,
W3C.Soap.Fault> ServiceOperation<ReqT,
ResT>(ReqT req);
Most DSS proxy operations
conform to this signature.
Invoke(op, request) function
takes this delegator as the parameter as well as the typed request object and will
return the instance of the invocation activity. Once the invocation is done,
the result (typed) will be saved in the OutputVar member of this instance.
SOX represents the BPEL <Sequence> activity, in which each
enclosed sub activities will be executed sequentially, simplly as a C# iterators
function with the speicified signature:
delegate IEnumerator<AsyncActivity> ActivityIteratorHandler();
delegate IEnumerator<AsyncActivity>
ActivityIteratorHandler<T0>(T0 t0);
delegate IEnumerator<AsyncActivity> ActivityIteratorHandler<T0,
T1>(T0 t0, T1 t1);
delegate IEnumerator<AsyncActivity> ActivityIteratorHandler<T0,
T1, T2>(T0 t0, T1 t1, T2 t2);
The below example shows how
to sequentially invoke the Add, Minus and Multiply operations of a caculator
service (e.g., m_service) to calculate (x+y) * (x-y)
IEnumerator<AsyncActivity>
BasicSeqWorkflow(int x, int y, st1.Result result)
{
st1.AddRequest addRequest
= new st1.AddRequest();
addRequest.X = x; addRequest.Y = y;
TypedInvoke<st1.AddRequest,
st1.Result>
addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add,
addRequest);
yield return
addInvoke;
st1.MinusRequest
minusRequest = new st1.MinusRequest();
minusRequest.X = x; minusRequest.Y = y;
TypedInvoke<st1.MinusRequest,
st1.Result>
minusInvoke = Orchestrate.Invoke<st1.MinusRequest, st1.Result>(m_service.Minus,
minusRequest);
yield return
minusInvoke;
st1.MultiplyRequest
mulRequest = new st1.MultiplyRequest();
mulRequest.X = minusInvoke.OutputVar.Value;
mulRequest.Y = addInvoke.OutputVar.Value;
TypedInvoke<st1.MultiplyRequest,
st1.Result>
mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply,
mulRequest);
yield return
mulInvoke;
if (x * x - y * y != mulInvoke.OutputVar.Value)
{
Console.WriteLine("Error!");
yield break;
}
else
Console.WriteLine("[Seq
OK]");
result.Value = mulInvoke.OutputVar.Value;
yield break;
}
Notice that you doesn’t need to execute the
AsynActivity explicitly, you just need to yield return the
AsynActivity object, and then SOX library will execute the AsynActivity
appropraitely. When the execution of
the AsynActivity completes (e.g.,
getting response of the invokation), the C# streaming iteartor mechanism will automatically
continue the execution of the function , but right after the yield return.
Also notice that the BPEL <Assignment> is not necessary any
more. Since it is a regular C# function we can represent the sementic of <Assignment> with the regular C#
expression. Furthermore all the assignment expressions are type-safe since all the
message types are bound as C# objects. For example in order to assign the result of
Add operation to the input of the Multiply operation, BPEL script would need
the XPath expression to represent that.
<assign>
<copy>
<from>
$addResult.Result/ns1:Value
</from>
<to>
$multiplyMsg.RequestMessagePart/ns2:X
</to>
</copy>
</assign>
The above BPEL expression is
neither intuitive nor type safe. Instead, if the message is data bound we can
represent this assignment simply by the type safe language syntax:
mulRequest.X =
addResult.Value;
Last, not the least, is that
besides the assignments users are free to do any other operations in the
orcheistration as long as they are language-supported or library-provided. For example, BEPL defines numbers of control
blocks such as <If> <For>
<While> and <Switch>. But in SOX we don’t need to provide the extra
activities for those ordinary control sementics since C# already provides
language syntax for those semantics and we just need to program them as the
normal sequantial programming. The below example shows a while loop of invoking
the Add service.
/// <summary>
/// below sequential
workflow is equivelent with BPEL <while> </while>
/// </summary>
/// <returns></returns>
IEnumerator<AsyncActivity>
WhileWorkflowExample()
{
st1.AddRequest addRequest
= new st1.AddRequest();
addRequest.X = 2; addRequest.Y = 9;
int v = 0;
while (v < 100)
{
TypedInvoke<st1.AddRequest,
st1.Result>
addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add,
addRequest);
yield return addInvoke;
v = addInvoke.OutputVar.Value;
addRequest.Y = v;
}
Console.WriteLine("[While
OK]");
yield break;
}
With the iterator function,
now we can create the Sequence activity simplly by calling:
st1.Result result = new st1.Result();
Orchestrate.Sequence(number_1, number_2, result, this.BasicSeqWorkflow);
The Sequence can also be
called in another ActivityIteratorHandler so that one ActivityIteartorHandler
can be nested in another one. The below example show the nested example.
IEnumerator<AsyncActivity>
NestedWorkflow(int x, int
y, st1.Result result)
{
//Calculate the value of x^2 – y^2
st1.Result result1 = new st1.Result();
result1.Value = 1;
yield return Orchestrate.Sequence(x, y, result1,
BasicSeqWorkflow);
//Calculate the value of y^2 – x^2
st1.Result result2 = new st1.Result();
result2.Value = 9;
yield return Orchestrate.Sequence(y, x, result2,
BasicSeqWorkflow);
//x^2 – y^2 + y^2 – x^2 = 0
if (result1.Value + result2.Value != 0)
{
Console.WriteLine("Error! {0}
- {1} ", result1.Value, result2.Value);
yield break;
}
Console.WriteLine("[Nested
OK]");
yield break;
}
You may notice that in the
above example the Add operation is independent with the Minus operation, so
they can be executed in parallel. To support the parallel execution of the
activities, SOX provides the Parallel activity which is equivelent with the
BPEL <Flow> activity without any <Link>.
IEnumerator<AsyncActivity>
BasicParWorkflow(int x, int y, st1.Result result)
{
st1.AddRequest addRequest
= new st1.AddRequest();
addRequest.X = x; addRequest.Y = y;
TypedInvoke<st1.AddRequest,
st1.Result>
addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add,
addRequest);
st1.MinusRequest
minusRequest = new st1.MinusRequest();
minusRequest.X = x; minusRequest.Y = y;
TypedInvoke<st1.MinusRequest,
st1.Result>
minusInvoke = Orchestrate.Invoke<st1.MinusRequest, st1.Result>(m_service.Minus,
minusRequest);
// (x+y) (x-y)
yield return Orchestrate.Parallel(addInvoke, minusInvoke);
st1.MultiplyRequest
mulRequest = new st1.MultiplyRequest();
mulRequest.X = minusInvoke.OutputVar.Value;
mulRequest.Y = addInvoke.OutputVar.Value;
TypedInvoke<st1.MultiplyRequest,
st1.Result>
mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply,
mulRequest);
yield return mulInvoke;
if (x * x - y * y != mulInvoke.OutputVar.Value)
{
Console.WriteLine("Error!");
yield break;
}
else
Console.WriteLine("[Parallel
OK]");
result.Value = mulInvoke.OutputVar.Value;
yield break;
}
As
you see, simplly yield return
Orchestrate.Parallel(addInvoke, minusInvoke)
will have the two Invoke activities execute in parrallel and only when both are
done (namly responses are received), the next line of code in the function will
be executed. As for the exception handeling for Orchestrate.Parallel, as long
as one activity throws exception the other one will be automatically aborted
and then the exception will be caught by the parent scope (For more detail, see
the Orchestrate.Scope).
Orchestrate.ParallelForEach
allows an actvity be executed in a for-loop manner, but each iteration is
executed in parallel. The whole activity
completes only after each iteration completes and as long as one iteration
throws the exception all others are aborted and the exception will be thrown to
the parent scope. The below example demostrates a ParallelForEach example, in
which the square value for the each value in the 0…up will be calculated and
saved into the array pForEachResult.
/// <summary>
/// a foreach iterator,
doing the sqaure (i * i) operation;
/// </summary>
/// <param
name="idx"></param>
/// <returns></returns>
IEnumerator<AsyncActivity>
Square(int idx)
{
st1.MultiplyRequest
mulRequest = new st1.MultiplyRequest();
mulRequest.X = idx; mulRequest.Y = idx;
TypedInvoke<st1.MultiplyRequest,
st1.Result>
mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply,
mulRequest);
yield return mulInvoke;
pForEachResult[idx] = mulInvoke.OutputVar.Value;
//Console.WriteLine(mulInvoke.OutputVar.Value);
yield break;
}
/// <summary>
/// an exmpale for the
parallel forEach,
/// </summary>
/// <returns></returns>
IEnumerator<AsyncActivity>
ForEachExample()
{
int upper = 10;
pForEachResult = new int[upper];
yield return Orchestrate.ParallelForEach(0, upper, Square);
for (int i = 0; i < upper;
++i)
{
if (pForEachResult[i] != i * i)
{
Console.WriteLine("Error!");
yield break;
}
}
Console.WriteLine("[Parallel
ForEach OK]");
yield break;
}
Note that Orchestrate.ParallelForEach(0, upper, Square) will take the 2 integers to indicate the low bound and the upper bound of the for loop as well as an iterate function delegator, which will be invoke for each iteration with an integer parameter indicating the loop index.
Orchestrate.Flow allows to
build the control dependencies among the activities in the form of Directed
Acyclic Gragh. It bascially follows the semantic of BPEL <Flow>.
Essentially each edge in the graph is represented as a

IEnumerator<AsyncActivity>
FlowExample(int x, int
y)
{
// define variables
st1.AddRequest addRequest
= new st1.AddRequest();
addRequest.X = x; addRequest.Y = y;
st1.Result addResult = new st1.Result();
st1.Result squareResult = new st1.Result();
st1.Result multiplyResult
= new st1.Result();
st1.Result multiplyResult2
= new st1.Result();
AsyncActivity addAct = Orchestrate.Sequence(addRequest,
addResult, AddExample);
AsyncActivity squareAct = Orchestrate.Sequence(addResult,
squareResult, SquareExample);
AsyncActivity mulAct = Orchestrate.Sequence(addResult,
squareResult, multiplyResult, MultiplyExample);
AsyncActivity mul2Act = Orchestrate.Sequence(squareResult,
multiplyResult, multiplyResult2, MultiplyExample);
TypedFlow flow = Orchestrate.Flow();
flow.Connect(addAct, squareAct);
flow.Connect(addAct, mulAct);
flow.Connect(squareAct, mulAct);
flow.Connect(mulAct, mul2Act);
flow.Connect(squareAct, mul2Act);
yield return flow;
if (multiplyResult2.Value != (x + y) * (x + y) * (x + y) * (x +
y) * (x + y))
Console.WriteLine("Error!");
else
Console.WriteLine("[FLOW
OK]");
yield break;
IEnumerator<AsyncActivity>
AddExample(st1.AddRequest addRequest, st1.Result addResult)
{
TypedInvoke<st1.AddRequest,
st1.Result> addInvoke =
Orchestrate.Invoke<st1.AddRequest,
st1.Result>(m_service.Add, addRequest);
yield return addInvoke;
addResult.Value = addInvoke.OutputVar.Value;
}
IEnumerator<AsyncActivity>
MultiplyExample(st1.Result addResult, st1.Result squareResult, st1.Result
output)
{
st1.MultiplyRequest
request = new st1.MultiplyRequest();
request.X = addResult.Value; request.Y = squareResult.Value;
TypedInvoke<st1.MultiplyRequest,
st1.Result> invoke =
Orchestrate.Invoke<st1.MultiplyRequest,
st1.Result>(m_service.Multiply, request);
yield return invoke;
output.Value = invoke.OutputVar.Value;
}
IEnumerator<AsyncActivity>
SquareExample(st1.Result addResult, st1.Result squareResult)
{
st1.MultiplyRequest
request = new st1.MultiplyRequest();
request.X = addResult.Value; request.Y = addResult.Value;
TypedInvoke<st1.MultiplyRequest,
st1.Result> invoke =
Orchestrate.Invoke<st1.MultiplyRequest,
st1.Result>(m_service.Multiply, request);
yield return invoke;
squareResult.Value = invoke.OutputVar.Value;
}
<Scope> may be the most
sophiescated construct in WS-BPEL, it prvoides the isolation mechansim for the
BPEL activiteis. It consists of a root activity, a set of variables and a set
of handlers(e.g. event handler and fault handler). All the enclosed activities
share those resource.
<scope >
< v a r i a b l e s >?
. . .
< e v e n tHa n d l e r s >?
< f a u l tHa n d l e r s >?
< compens a t ionHandl e r >?
. . .
a c t i v i t y
</ scope >
The basic concurrent
semantics of <Scope> are:
SOX provides to construct Orchestrate.Scope to represent
the BPEL <Scope>. Orchestrate.Scope( ) takes 4 parameters
TypedScope Scope(AsyncActivity
root, FaultHandlerGroup fhg, EventHandlerGroup ehg, CompensationHandler
ch);
Every Faut Handler is just a
generic iterator function conforming this
signature: IEnumerator<AsyncActivity>
ActivityIteratorHandler<T0>(T0 t0) where
T : Exception
Below example demonstrates
two dummy fault handlers for handeling the NotSupportedException and
OutofMemoryException respectivly. In the fault handler function, besides the AsynActivity we have introduced
you also can yield return Orchestrate.Compensate(), which will
do the compensation action for every successfully completed sub-scope in this scope.
IEnumerator<AsyncActivity>
DummyFaultHandle1(NotSupportedException exp)
{
Console.WriteLine("Get the
NotsupportedException error \n" + exp);
//yield return Orchestrate.Compensate();
yield break;
}
IEnumerator<AsyncActivity>
DummyFaultHandle2(OutOfMemoryException exp)
{
Console.WriteLine("Get the
OutofMemoryException error \n" + exp);
yield return Orchestrate.Compensate();
}
Notice
that since we are building C# library to throw an exception you don’t need the
speical activity as BPEL does (e.g., the <Throw> activity). You can throw
the exception by using throw keyword as you does in the general C# programming.
SOX will somehow catch any exception you throws and try to find a matching
handlering to handle the exception.
A compensation handler is represented just as a normal ActivityIteratorHandler. Below example shows the usage of the compensation in the Scope. Assuming the Calculator service internal maintains a counter whose initial value is 0. The execution the below workflow follows the steps:

IEnumerator<AsyncActivity>
DecreaseByOne()
{
st1.IncreaseRequest
request = new st1.IncreaseRequest();
request.Val = -1;
yield return Orchestrate.Invoke<st1.IncreaseRequest,
DefaultUpdateResponseType>(m_service.Increase,
request);
}
IEnumerator<AsyncActivity>
Root()
{
st1.IncreaseRequest
request = new st1.IncreaseRequest();
request.Val = 1;
yield return
Orchestrate.Scope(
Orchestrate.Invoke<st1.IncreaseRequest,
DefaultUpdateResponseType>(m_service.Increase,
request),
new FaultHandlerGroup(),
new EventHandlerGroup(),
new CompensationHandler(
DecreaseByOne
)
);
Thread.Sleep(1000);
throw new NotSupportedException("test!");
}
IEnumerator<AsyncActivity>
ExceptionExample()
{
yield return
Orchestrate.Scope(
Orchestrate.Sequence(Root),
new FaultHandlerGroup(
new GenericFaultHandler<NotSupportedException>(DummyFaultHandle1),
new GenericFaultHandler<OutOfMemoryException>(DummyFaultHandle2)
),
new EventHandlerGroup()
);
yield break;
}
IEnumerator<AsyncActivity>
DummyFaultHandle1(NotSupportedException exp)
{
Console.WriteLine("Get the
NotsupportedException error \n" + exp);
yield return Orchestrate.Compensate();
}
IEnumerator<AsyncActivity>
DummyFaultHandle2(OutOfMemoryException exp)
{
Console.WriteLine("Get the
OutofMemoryException error \n" + exp);
yield break;
}
TODO
The Tutorial source
code can be downloaded from here
Wei Lu