SOX (Service Orchestration eXtension for .NET)

Concurrent Building Blocks for Manycore Service Orchestration

Introduction

    SOX is a concurrent service orchestration library for .NET.  The principle goal of SOX is to improve the programming productivity of r the service-as-component software, particularly on the multicore/manycore hardware platform. The service orchestration constructs in SOX is based on WS-BPEL, which is a standard Web Service workflow language. However SOX itself is not a BPEL engine or interpreter. Instead, SOX is a C# library, which contains numbers of concurrent programming patterns for the service orchestration. We prefer the fine-grained library over a monolithic workflow engine is because that the library can provide a more flexible and intuitive programming environment by leveraging the C# language and .NET libraries. While the verbose/awkward WS-BPEL language is daunting and unsafe, programming in C# enables us to take huge advantage from the .NET platform and language itself (such as type safety and numerous .NET libraries).

SOX is built upon Microsoft CCR (Concurrency and Coordination Runtime) of MS Robotic Studio, and it heavily relies on the concurrency abstractions (e.g., Port and Arbiters) of CCR.

AsynActivity

    The basic building block in SOX is class AsynActivity, which is an abstract class for all potential service-orchestration activities, either atomic or compound ones. An AsynActivity can be executed by invokes its Execute method, which will return an AsynActivityInstance object.  Basically AsynActivityInstance is just a handle to the execution of the activity and it contains at least three CCR ports, via which users can tell if the execution is completed or faulted or can abort the execution at any moment.

Below code shows the general programming pattern of using AsynActivity.

// execute the activity asynchronously

AsynInstance instance = act.Execute(parentScope);

 

// register the callback for what to do when the exeuction is done

Arbiter.Activate(dq, Arbiter.Choice(

Arbiter.Receive(false, instance.CompletionPort.Ports.P0,

delegate(bool b)

{

//the exeuction of the instance is done;

}),

Arbiter.Receive(false, instance.CompletionPort.Ports.P1,

delegate(Exception exp)

{

//exception is thrown during the exeuction;

}),

Arbiter.Receive(false, myAbortPort,

delegate(EmptyValue abort)

{

// the execution is asked to be aborted

// then aborts the execution of the instance;

instance.AbortPort.Post(EmptyValue.SharedInstance);

})

));

 

However users usually don’t need to program in this way, all those detail will be hidden by SOX library. To simplify the programming, SOX provides a static class Orchestrate which contains a collection of static methods as the factory methods for the each AsynActivity class.

 

·         Orchestrate.Invoke

·         Orchestrate.Sequence

·         Orchestrate.Parallel

·         Orchestrate.ParallelForEach

·         Orchestrate.Flow

·         Orchestrate.Compensate

·         Orchestrate.Scope

·         Orchestrate.FromHandler

 

Orchestrate. Invoke

TypedInvoke<RequestT, ResponseT>  Orchestrate.Invoke<RequestT, ResponseT>(ServiceOperation<RequestT, ResponseT> op, RequestT request);

SOX assumes the to-be-invoked service already has its local proxy, which provides the typed interface, and all the XML message types needed by the service have been data-bound as the C# classes. The service can be any regular web service or DSS service, for which WCF can generate the proxy, for which the MSRS can generate the proxy.  ServiceOperation<RequestT, ResponseT>  is a generic delegator, whose signature is :

delegate PortSet<ResT, W3C.Soap.Fault> ServiceOperation<ReqT, ResT>(ReqT req);

Most DSS proxy operations conform to this signature.

Invoke(op, request) function takes this delegator as the parameter as well as the typed request object and will return the instance of the invocation activity. Once the invocation is done, the result (typed) will be saved in the OutputVar member of this instance.

 

Orchestrate.Sequence

SOX represents the  BPEL <Sequence> activity, in which each enclosed sub activities will be executed sequentially, simplly as a C# iterators function with the speicified signature:

 

delegate IEnumerator<AsyncActivity> ActivityIteratorHandler();

delegate IEnumerator<AsyncActivity> ActivityIteratorHandler<T0>(T0 t0);

delegate IEnumerator<AsyncActivity> ActivityIteratorHandler<T0, T1>(T0 t0, T1 t1);

delegate IEnumerator<AsyncActivity> ActivityIteratorHandler<T0, T1, T2>(T0 t0, T1 t1, T2 t2);

 

The below example shows how to sequentially invoke the Add, Minus and Multiply operations of a caculator service (e.g., m_service) to calculate (x+y) * (x-y)

 

IEnumerator<AsyncActivity> BasicSeqWorkflow(int x, int y, st1.Result result)

{

st1.AddRequest addRequest = new st1.AddRequest();

addRequest.X = x; addRequest.Y = y;

 

TypedInvoke<st1.AddRequest, st1.Result>

addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add, addRequest);

yield return addInvoke;

 

st1.MinusRequest minusRequest = new st1.MinusRequest();

minusRequest.X = x; minusRequest.Y = y;

 

TypedInvoke<st1.MinusRequest, st1.Result>

minusInvoke = Orchestrate.Invoke<st1.MinusRequest, st1.Result>(m_service.Minus, minusRequest);

yield return minusInvoke;

 

st1.MultiplyRequest mulRequest = new st1.MultiplyRequest();

mulRequest.X = minusInvoke.OutputVar.Value;

mulRequest.Y = addInvoke.OutputVar.Value;

TypedInvoke<st1.MultiplyRequest, st1.Result>

mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply, mulRequest);

yield return mulInvoke;

 

if (x * x - y * y != mulInvoke.OutputVar.Value)

{

Console.WriteLine("Error!");

yield break;

}

else

Console.WriteLine("[Seq OK]");

 

result.Value = mulInvoke.OutputVar.Value;

yield break;

}

 

Notice that  you doesn’t need to execute the AsynActivity  explicitly,  you just need to yield return the AsynActivity object, and then SOX library will execute the AsynActivity appropraitely. When  the execution of the  AsynActivity completes (e.g., getting response of the invokation), the C# streaming iteartor mechanism will automatically continue the execution of the function , but right after the yield return.

Also notice that the BPEL <Assignment> is not necessary any more. Since it is a regular C# function we can represent the sementic of <Assignment> with the regular C# expression. Furthermore all the assignment expressions are type-safe since all the message types are bound as C# objects.  For example in order to assign the result of Add operation to the input of the Multiply operation, BPEL script would need the XPath expression to represent that.

<assign>

         <copy>

            <from>

               $addResult.Result/ns1:Value

            </from>

            <to>

               $multiplyMsg.RequestMessagePart/ns2:X

            </to>

         </copy>

</assign>

The above BPEL expression is neither intuitive nor type safe. Instead, if the message is data bound we can represent this assignment simply by the type safe language syntax:

 

mulRequest.X = addResult.Value;

 

Last, not the least, is that besides the assignments users are free to do any other operations in the orcheistration as long as they are language-supported or library-provided.  For example, BEPL defines numbers of control blocks such as <If> <For> <While> and <Switch>.  But in SOX we don’t need to provide the extra activities for those ordinary control sementics since C# already provides language syntax for those semantics and we just need to program them as the normal sequantial programming. The below example shows a while loop of invoking the Add service.

 

/// <summary>

/// below sequential workflow is equivelent with BPEL <while> </while>

/// </summary>

/// <returns></returns>

IEnumerator<AsyncActivity> WhileWorkflowExample()

{

st1.AddRequest addRequest = new st1.AddRequest();

addRequest.X = 2; addRequest.Y = 9;

int v = 0;

while (v < 100)

{

TypedInvoke<st1.AddRequest, st1.Result>

addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add, addRequest);

yield return addInvoke;

 

v = addInvoke.OutputVar.Value;

addRequest.Y = v;

}

Console.WriteLine("[While OK]");

yield break;

}

 

With the iterator function, now we can create the Sequence activity simplly by calling:

 

 st1.Result result = new st1.Result();

Orchestrate.Sequence(number_1, number_2, result, this.BasicSeqWorkflow);

 

The Sequence can also be called in another ActivityIteratorHandler so that one ActivityIteartorHandler can be nested in another one. The below example show the nested example.

 

IEnumerator<AsyncActivity> NestedWorkflow(int x, int y, st1.Result result)

{

//Calculate the value of x^2 – y^2

st1.Result result1 = new st1.Result(); result1.Value = 1;

yield return Orchestrate.Sequence(x, y, result1, BasicSeqWorkflow);

 

//Calculate the value of y^2 – x^2

st1.Result result2 = new st1.Result(); result2.Value = 9;

yield return Orchestrate.Sequence(y, x, result2, BasicSeqWorkflow);

 

//x^2 – y^2 + y^2 – x^2 = 0

if (result1.Value + result2.Value != 0)

{

Console.WriteLine("Error! {0} - {1} ", result1.Value, result2.Value);

yield break;

}

Console.WriteLine("[Nested OK]");

yield break;

}

Orchestrate.Parallel

You may notice that in the above example the Add operation is independent with the Minus operation, so they can be executed in parallel. To support the parallel execution of the activities, SOX provides the Parallel activity which is equivelent with the BPEL <Flow> activity without any <Link>.

 

IEnumerator<AsyncActivity> BasicParWorkflow(int x, int y, st1.Result result)

{

st1.AddRequest addRequest = new st1.AddRequest();

addRequest.X = x; addRequest.Y = y;

 

TypedInvoke<st1.AddRequest, st1.Result>

addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add, addRequest);

 

st1.MinusRequest minusRequest = new st1.MinusRequest();

minusRequest.X = x; minusRequest.Y = y;

 

TypedInvoke<st1.MinusRequest, st1.Result>

minusInvoke = Orchestrate.Invoke<st1.MinusRequest, st1.Result>(m_service.Minus, minusRequest);

 

// (x+y) (x-y)

yield return Orchestrate.Parallel(addInvoke, minusInvoke);

 

 

st1.MultiplyRequest mulRequest = new st1.MultiplyRequest();

mulRequest.X = minusInvoke.OutputVar.Value;

mulRequest.Y = addInvoke.OutputVar.Value;

TypedInvoke<st1.MultiplyRequest, st1.Result>

mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply, mulRequest);

yield return mulInvoke;

 

if (x * x - y * y != mulInvoke.OutputVar.Value)

{

Console.WriteLine("Error!");

yield break;

}

else

Console.WriteLine("[Parallel OK]");

 

result.Value = mulInvoke.OutputVar.Value;

yield break;

}

 

As you see, simplly yield return Orchestrate.Parallel(addInvoke, minusInvoke) will have the two Invoke activities execute in parrallel and only when both are done (namly responses are received), the next line of code in the function will be executed. As for the exception handeling for Orchestrate.Parallel, as long as one activity throws exception the other one will be automatically aborted and then the exception will be caught by the parent scope (For more detail, see the Orchestrate.Scope).

 

 

Orchestrate.ParallelForEach

Orchestrate.ParallelForEach allows an actvity be executed in a for-loop manner, but each iteration is executed in parallel.  The whole activity completes only after each iteration completes and as long as one iteration throws the exception all others are aborted and the exception will be thrown to the parent scope. The below example demostrates a ParallelForEach example, in which the square value for the each value in the 0…up will be calculated and saved into the array pForEachResult.

 

/// <summary>

/// a foreach iterator, doing the sqaure (i * i) operation;

/// </summary>

/// <param name="idx"></param>

/// <returns></returns>

IEnumerator<AsyncActivity> Square(int idx)

{

st1.MultiplyRequest mulRequest = new st1.MultiplyRequest();

mulRequest.X = idx; mulRequest.Y = idx;

TypedInvoke<st1.MultiplyRequest, st1.Result>

mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply, mulRequest);

yield return mulInvoke;

 

pForEachResult[idx] = mulInvoke.OutputVar.Value;

//Console.WriteLine(mulInvoke.OutputVar.Value);

yield break;

}

 

/// <summary>

/// an exmpale for the parallel forEach,

/// </summary>

/// <returns></returns>

IEnumerator<AsyncActivity> ForEachExample()

{

int upper = 10;

pForEachResult = new int[upper];

 

yield return Orchestrate.ParallelForEach(0, upper, Square);

 

for (int i = 0; i < upper; ++i)

{

if (pForEachResult[i] != i * i)

{

Console.WriteLine("Error!");

yield break;

}

}

 

Console.WriteLine("[Parallel ForEach OK]");

yield break;

}

 

Note that Orchestrate.ParallelForEach(0, upper, Square) will take the 2 integers to indicate the low bound and the upper bound of the for loop as well as an iterate function delegator, which will be invoke for each iteration with an integer parameter indicating the loop index.

 

 

Orchestrate.Flow

Orchestrate.Flow allows to build the control dependencies among the activities in the form of Directed Acyclic Gragh. It bascially follows the semantic of BPEL <Flow>. Essentially each edge in the graph is represented as a CCR Port and each activity in the Flow will be executed concurrently as long as it gets the signals from each of its incoming edge (i.e., the Port ). The whole Flow activity completes only when all the enclosed activites are done, and whenever one activiy throws exception all the others will be aborted and the exception will be thrown to the parent scope. The below example demostrates how we orchestrate the operations of the calculator service to caculate the value based on the graph.

 

 

IEnumerator<AsyncActivity> FlowExample(int x, int y)

{

// define variables

st1.AddRequest addRequest = new st1.AddRequest();

addRequest.X = x; addRequest.Y = y;

st1.Result addResult = new st1.Result();

st1.Result squareResult = new st1.Result();

st1.Result multiplyResult = new st1.Result();

st1.Result multiplyResult2 = new st1.Result();

 

AsyncActivity addAct = Orchestrate.Sequence(addRequest, addResult, AddExample);

AsyncActivity squareAct = Orchestrate.Sequence(addResult, squareResult, SquareExample);

AsyncActivity mulAct = Orchestrate.Sequence(addResult, squareResult, multiplyResult, MultiplyExample);

AsyncActivity mul2Act = Orchestrate.Sequence(squareResult, multiplyResult, multiplyResult2, MultiplyExample);

 

TypedFlow flow = Orchestrate.Flow();

flow.Connect(addAct, squareAct);

flow.Connect(addAct, mulAct);

flow.Connect(squareAct, mulAct);

flow.Connect(mulAct, mul2Act);

flow.Connect(squareAct, mul2Act);

 

yield return flow;

 

if (multiplyResult2.Value != (x + y) * (x + y) * (x + y) * (x + y) * (x + y))

Console.WriteLine("Error!");

else

Console.WriteLine("[FLOW OK]");

yield break;

}

IEnumerator<AsyncActivity> AddEx