SOX (Service Orchestration eXtension for .NET)

Concurrent Building Blocks for Manycore Service Orchestration

Introduction

    SOX is a concurrent service orchestration library for .NET.  The principle goal of SOX is to improve the programming productivity of r the service-as-component software, particularly on the multicore/manycore hardware platform. The service orchestration constructs in SOX is based on WS-BPEL, which is a standard Web Service workflow language. However SOX itself is not a BPEL engine or interpreter. Instead, SOX is a C# library, which contains numbers of concurrent programming patterns for the service orchestration. We prefer the fine-grained library over a monolithic workflow engine is because that the library can provide a more flexible and intuitive programming environment by leveraging the C# language and .NET libraries. While the verbose/awkward WS-BPEL language is daunting and unsafe, programming in C# enables us to take huge advantage from the .NET platform and language itself (such as type safety and numerous .NET libraries).

SOX is built upon Microsoft CCR (Concurrency and Coordination Runtime) of MS Robotic Studio, and it heavily relies on the concurrency abstractions (e.g., Port and Arbiters) of CCR.

AsynActivity

    The basic building block in SOX is class AsynActivity, which is an abstract class for all potential service-orchestration activities, either atomic or compound ones. An AsynActivity can be executed by invokes its Execute method, which will return an AsynActivityInstance object.  Basically AsynActivityInstance is just a handle to the execution of the activity and it contains at least three CCR ports, via which users can tell if the execution is completed or faulted or can abort the execution at any moment.

Below code shows the general programming pattern of using AsynActivity.

// execute the activity asynchronously

AsynInstance instance = act.Execute(parentScope);

 

// register the callback for what to do when the exeuction is done

Arbiter.Activate(dq, Arbiter.Choice(

Arbiter.Receive(false, instance.CompletionPort.Ports.P0,

delegate(bool b)

{

//the exeuction of the instance is done;

}),

Arbiter.Receive(false, instance.CompletionPort.Ports.P1,

delegate(Exception exp)

{

//exception is thrown during the exeuction;

}),

Arbiter.Receive(false, myAbortPort,

delegate(EmptyValue abort)

{

// the execution is asked to be aborted

// then aborts the execution of the instance;

instance.AbortPort.Post(EmptyValue.SharedInstance);

})

));

 

However users usually don’t need to program in this way, all those detail will be hidden by SOX library. To simplify the programming, SOX provides a static class Orchestrate which contains a collection of static methods as the factory methods for the each AsynActivity class.

 

·         Orchestrate.Invoke

·         Orchestrate.Sequence

·         Orchestrate.Parallel

·         Orchestrate.ParallelForEach

·         Orchestrate.Flow

·         Orchestrate.Compensate

·         Orchestrate.Scope

·         Orchestrate.FromHandler

 

Orchestrate. Invoke

TypedInvoke<RequestT, ResponseT>  Orchestrate.Invoke<RequestT, ResponseT>(ServiceOperation<RequestT, ResponseT> op, RequestT request);

SOX assumes the to-be-invoked service already has its local proxy, which provides the typed interface, and all the XML message types needed by the service have been data-bound as the C# classes. The service can be any regular web service or DSS service, for which WCF can generate the proxy, for which the MSRS can generate the proxy.  ServiceOperation<RequestT, ResponseT>  is a generic delegator, whose signature is :

delegate PortSet<ResT, W3C.Soap.Fault> ServiceOperation<ReqT, ResT>(ReqT req);

Most DSS proxy operations conform to this signature.

Invoke(op, request) function takes this delegator as the parameter as well as the typed request object and will return the instance of the invocation activity. Once the invocation is done, the result (typed) will be saved in the OutputVar member of this instance.

 

Orchestrate.Sequence

SOX represents the  BPEL <Sequence> activity, in which each enclosed sub activities will be executed sequentially, simplly as a C# iterators function with the speicified signature:

 

delegate IEnumerator<AsyncActivity> ActivityIteratorHandler();

delegate IEnumerator<AsyncActivity> ActivityIteratorHandler<T0>(T0 t0);

delegate IEnumerator<AsyncActivity> ActivityIteratorHandler<T0, T1>(T0 t0, T1 t1);

delegate IEnumerator<AsyncActivity> ActivityIteratorHandler<T0, T1, T2>(T0 t0, T1 t1, T2 t2);

 

The below example shows how to sequentially invoke the Add, Minus and Multiply operations of a caculator service (e.g., m_service) to calculate (x+y) * (x-y)

 

IEnumerator<AsyncActivity> BasicSeqWorkflow(int x, int y, st1.Result result)

{

st1.AddRequest addRequest = new st1.AddRequest();

addRequest.X = x; addRequest.Y = y;

 

TypedInvoke<st1.AddRequest, st1.Result>

addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add, addRequest);

yield return addInvoke;

 

st1.MinusRequest minusRequest = new st1.MinusRequest();

minusRequest.X = x; minusRequest.Y = y;

 

TypedInvoke<st1.MinusRequest, st1.Result>

minusInvoke = Orchestrate.Invoke<st1.MinusRequest, st1.Result>(m_service.Minus, minusRequest);

yield return minusInvoke;

 

st1.MultiplyRequest mulRequest = new st1.MultiplyRequest();

mulRequest.X = minusInvoke.OutputVar.Value;

mulRequest.Y = addInvoke.OutputVar.Value;

TypedInvoke<st1.MultiplyRequest, st1.Result>

mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply, mulRequest);

yield return mulInvoke;

 

if (x * x - y * y != mulInvoke.OutputVar.Value)

{

Console.WriteLine("Error!");

yield break;

}

else

Console.WriteLine("[Seq OK]");

 

result.Value = mulInvoke.OutputVar.Value;

yield break;

}

 

Notice that  you doesn’t need to execute the AsynActivity  explicitly,  you just need to yield return the AsynActivity object, and then SOX library will execute the AsynActivity appropraitely. When  the execution of the  AsynActivity completes (e.g., getting response of the invokation), the C# streaming iteartor mechanism will automatically continue the execution of the function , but right after the yield return.

Also notice that the BPEL <Assignment> is not necessary any more. Since it is a regular C# function we can represent the sementic of <Assignment> with the regular C# expression. Furthermore all the assignment expressions are type-safe since all the message types are bound as C# objects.  For example in order to assign the result of Add operation to the input of the Multiply operation, BPEL script would need the XPath expression to represent that.

<assign>

         <copy>

            <from>

               $addResult.Result/ns1:Value

            </from>

            <to>

               $multiplyMsg.RequestMessagePart/ns2:X

            </to>

         </copy>

</assign>

The above BPEL expression is neither intuitive nor type safe. Instead, if the message is data bound we can represent this assignment simply by the type safe language syntax:

 

mulRequest.X = addResult.Value;

 

Last, not the least, is that besides the assignments users are free to do any other operations in the orcheistration as long as they are language-supported or library-provided.  For example, BEPL defines numbers of control blocks such as <If> <For> <While> and <Switch>.  But in SOX we don’t need to provide the extra activities for those ordinary control sementics since C# already provides language syntax for those semantics and we just need to program them as the normal sequantial programming. The below example shows a while loop of invoking the Add service.

 

/// <summary>

/// below sequential workflow is equivelent with BPEL <while> </while>

/// </summary>

/// <returns></returns>

IEnumerator<AsyncActivity> WhileWorkflowExample()

{

st1.AddRequest addRequest = new st1.AddRequest();

addRequest.X = 2; addRequest.Y = 9;

int v = 0;

while (v < 100)

{

TypedInvoke<st1.AddRequest, st1.Result>

addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add, addRequest);

yield return addInvoke;

 

v = addInvoke.OutputVar.Value;

addRequest.Y = v;

}

Console.WriteLine("[While OK]");

yield break;

}

 

With the iterator function, now we can create the Sequence activity simplly by calling:

 

 st1.Result result = new st1.Result();

Orchestrate.Sequence(number_1, number_2, result, this.BasicSeqWorkflow);

 

The Sequence can also be called in another ActivityIteratorHandler so that one ActivityIteartorHandler can be nested in another one. The below example show the nested example.

 

IEnumerator<AsyncActivity> NestedWorkflow(int x, int y, st1.Result result)

{

//Calculate the value of x^2 – y^2

st1.Result result1 = new st1.Result(); result1.Value = 1;

yield return Orchestrate.Sequence(x, y, result1, BasicSeqWorkflow);

 

//Calculate the value of y^2 – x^2

st1.Result result2 = new st1.Result(); result2.Value = 9;

yield return Orchestrate.Sequence(y, x, result2, BasicSeqWorkflow);

 

//x^2 – y^2 + y^2 – x^2 = 0

if (result1.Value + result2.Value != 0)

{

Console.WriteLine("Error! {0} - {1} ", result1.Value, result2.Value);

yield break;

}

Console.WriteLine("[Nested OK]");

yield break;

}

Orchestrate.Parallel

You may notice that in the above example the Add operation is independent with the Minus operation, so they can be executed in parallel. To support the parallel execution of the activities, SOX provides the Parallel activity which is equivelent with the BPEL <Flow> activity without any <Link>.

 

IEnumerator<AsyncActivity> BasicParWorkflow(int x, int y, st1.Result result)

{

st1.AddRequest addRequest = new st1.AddRequest();

addRequest.X = x; addRequest.Y = y;

 

TypedInvoke<st1.AddRequest, st1.Result>

addInvoke = Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add, addRequest);

 

st1.MinusRequest minusRequest = new st1.MinusRequest();

minusRequest.X = x; minusRequest.Y = y;

 

TypedInvoke<st1.MinusRequest, st1.Result>

minusInvoke = Orchestrate.Invoke<st1.MinusRequest, st1.Result>(m_service.Minus, minusRequest);

 

// (x+y) (x-y)

yield return Orchestrate.Parallel(addInvoke, minusInvoke);

 

 

st1.MultiplyRequest mulRequest = new st1.MultiplyRequest();

mulRequest.X = minusInvoke.OutputVar.Value;

mulRequest.Y = addInvoke.OutputVar.Value;

TypedInvoke<st1.MultiplyRequest, st1.Result>

mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply, mulRequest);

yield return mulInvoke;

 

if (x * x - y * y != mulInvoke.OutputVar.Value)

{

Console.WriteLine("Error!");

yield break;

}

else

Console.WriteLine("[Parallel OK]");

 

result.Value = mulInvoke.OutputVar.Value;

yield break;

}

 

As you see, simplly yield return Orchestrate.Parallel(addInvoke, minusInvoke) will have the two Invoke activities execute in parrallel and only when both are done (namly responses are received), the next line of code in the function will be executed. As for the exception handeling for Orchestrate.Parallel, as long as one activity throws exception the other one will be automatically aborted and then the exception will be caught by the parent scope (For more detail, see the Orchestrate.Scope).

 

 

Orchestrate.ParallelForEach

Orchestrate.ParallelForEach allows an actvity be executed in a for-loop manner, but each iteration is executed in parallel.  The whole activity completes only after each iteration completes and as long as one iteration throws the exception all others are aborted and the exception will be thrown to the parent scope. The below example demostrates a ParallelForEach example, in which the square value for the each value in the 0…up will be calculated and saved into the array pForEachResult.

 

/// <summary>

/// a foreach iterator, doing the sqaure (i * i) operation;

/// </summary>

/// <param name="idx"></param>

/// <returns></returns>

IEnumerator<AsyncActivity> Square(int idx)

{

st1.MultiplyRequest mulRequest = new st1.MultiplyRequest();

mulRequest.X = idx; mulRequest.Y = idx;

TypedInvoke<st1.MultiplyRequest, st1.Result>

mulInvoke = Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply, mulRequest);

yield return mulInvoke;

 

pForEachResult[idx] = mulInvoke.OutputVar.Value;

//Console.WriteLine(mulInvoke.OutputVar.Value);

yield break;

}

 

/// <summary>

/// an exmpale for the parallel forEach,

/// </summary>

/// <returns></returns>

IEnumerator<AsyncActivity> ForEachExample()

{

int upper = 10;

pForEachResult = new int[upper];

 

yield return Orchestrate.ParallelForEach(0, upper, Square);

 

for (int i = 0; i < upper; ++i)

{

if (pForEachResult[i] != i * i)

{

Console.WriteLine("Error!");

yield break;

}

}

 

Console.WriteLine("[Parallel ForEach OK]");

yield break;

}

 

Note that Orchestrate.ParallelForEach(0, upper, Square) will take the 2 integers to indicate the low bound and the upper bound of the for loop as well as an iterate function delegator, which will be invoke for each iteration with an integer parameter indicating the loop index.

 

 

Orchestrate.Flow

Orchestrate.Flow allows to build the control dependencies among the activities in the form of Directed Acyclic Gragh. It bascially follows the semantic of BPEL <Flow>. Essentially each edge in the graph is represented as a CCR Port and each activity in the Flow will be executed concurrently as long as it gets the signals from each of its incoming edge (i.e., the Port ). The whole Flow activity completes only when all the enclosed activites are done, and whenever one activiy throws exception all the others will be aborted and the exception will be thrown to the parent scope. The below example demostrates how we orchestrate the operations of the calculator service to caculate the value based on the graph.

 

 

IEnumerator<AsyncActivity> FlowExample(int x, int y)

{

// define variables

st1.AddRequest addRequest = new st1.AddRequest();

addRequest.X = x; addRequest.Y = y;

st1.Result addResult = new st1.Result();

st1.Result squareResult = new st1.Result();

st1.Result multiplyResult = new st1.Result();

st1.Result multiplyResult2 = new st1.Result();

 

AsyncActivity addAct = Orchestrate.Sequence(addRequest, addResult, AddExample);

AsyncActivity squareAct = Orchestrate.Sequence(addResult, squareResult, SquareExample);

AsyncActivity mulAct = Orchestrate.Sequence(addResult, squareResult, multiplyResult, MultiplyExample);

AsyncActivity mul2Act = Orchestrate.Sequence(squareResult, multiplyResult, multiplyResult2, MultiplyExample);

 

TypedFlow flow = Orchestrate.Flow();

flow.Connect(addAct, squareAct);

flow.Connect(addAct, mulAct);

flow.Connect(squareAct, mulAct);

flow.Connect(mulAct, mul2Act);

flow.Connect(squareAct, mul2Act);

 

yield return flow;

 

if (multiplyResult2.Value != (x + y) * (x + y) * (x + y) * (x + y) * (x + y))

Console.WriteLine("Error!");

else

Console.WriteLine("[FLOW OK]");

yield break;

}

IEnumerator<AsyncActivity> AddExample(st1.AddRequest addRequest, st1.Result addResult)

{

TypedInvoke<st1.AddRequest, st1.Result> addInvoke =

Orchestrate.Invoke<st1.AddRequest, st1.Result>(m_service.Add, addRequest);

yield return addInvoke;

 

addResult.Value = addInvoke.OutputVar.Value;

}

 

IEnumerator<AsyncActivity> MultiplyExample(st1.Result addResult, st1.Result squareResult, st1.Result output)

{

st1.MultiplyRequest request = new st1.MultiplyRequest();

request.X = addResult.Value; request.Y = squareResult.Value;

 

TypedInvoke<st1.MultiplyRequest, st1.Result> invoke =

Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply, request);

yield return invoke;

 

output.Value = invoke.OutputVar.Value;

}

 

IEnumerator<AsyncActivity> SquareExample(st1.Result addResult, st1.Result squareResult)

{

st1.MultiplyRequest request = new st1.MultiplyRequest();

request.X = addResult.Value; request.Y = addResult.Value;

 

TypedInvoke<st1.MultiplyRequest, st1.Result> invoke =

Orchestrate.Invoke<st1.MultiplyRequest, st1.Result>(m_service.Multiply, request);

yield return invoke;

 

squareResult.Value = invoke.OutputVar.Value;

}

 

Orchestrate.Scope

<Scope> may be the most sophiescated construct in WS-BPEL, it prvoides the isolation mechansim for the BPEL activiteis. It consists of a root activity, a set of variables and a set of handlers(e.g. event handler and fault handler). All the enclosed activities share those resource.

 

<scope >

< v a r i a b l e s >?

. . .

< e v e n tHa n d l e r s >?

< f a u l tHa n d l e r s >?

< compens a t ionHandl e r >?

. . .

a c t i v i t y

</ scope >

 

The basic concurrent semantics of <Scope> are:

  • A scope can have multiple event handlers, each of which waits the specified incoming events and then runs the handler activity concurrently agaits the execution of the root activity.
  • A scope can have mulitple fault handlers, each of which is designed to catch a special fault type. Whenever a fault is thrown from the execution of the root activity and any of its sub-activities,  all of enclosed activites should be aborted and the event handlers should be disabled. Then the fault handler, if there is a matched one, will begin the execution.  The default fault handler will compensate each successfully completed sub activtiy in the scope amd rethrow the exception.        
  • A scope can have a compensation handler, which basically reverse the effect of the successful excetion of this scope. When this scope completes the execution successfully, it compensation handler will be installed for the potential request for the “undo”.

 

SOX provides to construct Orchestrate.Scope  to represent the BPEL <Scope>.   Orchestrate.Scope( ) takes 4 parameters

 

TypedScope Scope(AsyncActivity root, FaultHandlerGroup fhg, EventHandlerGroup ehg, CompensationHandler ch);

 

  1. root refers the root activity enclosed in the scope
  2. FaultHandlerGroup basically is just a collection of FaultHandlers
  3. EventHandlerGroup is a collection of the EventHandlers
  4. CompensationHandler refers to the compensation activity for this scope;

 

Fault Handling

Every Faut Handler is just a generic iterator function conforming this  signature:  IEnumerator<AsyncActivity> ActivityIteratorHandler<T0>(T0 t0) where T : Exception

Below example demonstrates two dummy fault handlers for handeling the NotSupportedException and OutofMemoryException respectivly. In the fault handler function,  besides the AsynActivity we have introduced you also can yield return Orchestrate.Compensate(), which will do the compensation action for every successfully completed sub-scope in this scope.

 

IEnumerator<AsyncActivity> DummyFaultHandle1(NotSupportedException exp)

{

Console.WriteLine("Get the NotsupportedException error \n" + exp);

//yield return Orchestrate.Compensate();

yield break;

}

 

IEnumerator<AsyncActivity> DummyFaultHandle2(OutOfMemoryException exp)

{

Console.WriteLine("Get the OutofMemoryException error \n" + exp);

yield return Orchestrate.Compensate();

}

 

Notice that since we are building C# library to throw an exception you don’t need the speical activity as BPEL does (e.g., the <Throw> activity). You can throw the exception by using throw keyword as you does in the general C# programming. SOX will somehow catch any exception you throws and try to find a matching handlering to handle the exception.

 

Compensation Handling

A compensation handler is represented just as a normal ActivityIteratorHandler. Below example shows the usage of the compensation in the Scope.  Assuming the Calculator service internal maintains a counter whose initial value is 0.  The execution the below workflow follows the steps:

  1. The outer scope in the ExceptionExample executes its root activity, which is Root()  function.
  2. The inner scope in the Root ( ) just invokes the Increment operation of the Calculator service, while its compensation handler just invokes the Decrease operation to undo the incensement.
  3. When the invocation to Increment operation is done, its compensation handler (i..e., DecreseByOne() )  is installed within the outer scope.
  4. Then the inner scope throws the exception, which will be caught by the outer scope in ExceptionExample().
  5. If the exception type is   NotSupoortedException, then DummyFaultHanle1( ) will be invoked, and it in turn yield return the Orchestrate.Compensate() , which will invoke the DecreaseByOne() so the counter of the Calculator service back to 0;
  6. However if the exception type is OutofMemoryException, then DummyFaultHanle2( ) will be invoked. Since no Compensate is yield returned, the counter of Calculator will keep as 1.

 

IEnumerator<AsyncActivity> DecreaseByOne()

{

st1.IncreaseRequest request = new st1.IncreaseRequest();

request.Val = -1;

yield return Orchestrate.Invoke<st1.IncreaseRequest, DefaultUpdateResponseType>(m_service.Increase, request);

}

 

 

IEnumerator<AsyncActivity> Root()

{

st1.IncreaseRequest request = new st1.IncreaseRequest();

request.Val = 1;

 

yield return

Orchestrate.Scope(

Orchestrate.Invoke<st1.IncreaseRequest, DefaultUpdateResponseType>(m_service.Increase, request),

new FaultHandlerGroup(),

new EventHandlerGroup(),

new CompensationHandler(

DecreaseByOne

)

);

 

Thread.Sleep(1000);

throw new NotSupportedException("test!");

}

 

 

IEnumerator<AsyncActivity> ExceptionExample()

{

yield return

Orchestrate.Scope(

Orchestrate.Sequence(Root),

new FaultHandlerGroup(

new GenericFaultHandler<NotSupportedException>(DummyFaultHandle1),

new GenericFaultHandler<OutOfMemoryException>(DummyFaultHandle2)

),

new EventHandlerGroup()

);

 

yield break;

}

 

IEnumerator<AsyncActivity> DummyFaultHandle1(NotSupportedException exp)

{

Console.WriteLine("Get the NotsupportedException error \n" + exp);

yield return Orchestrate.Compensate();

}

 

IEnumerator<AsyncActivity> DummyFaultHandle2(OutOfMemoryException exp)

{

Console.WriteLine("Get the OutofMemoryException error \n" + exp);

yield break;

}

 

Event handling

TODO

 

The Tutorial source code can be downloaded from here

 

Wei Lu 04/16/2008