next up previous
Next: Salient Features Up: XEVENTS/XMESSAGES: Application Events and Previous: Events and Messaging: Desiderata

Subsections

Architecture and Implementation

We have followed a layered approach while developing the XEvents/XMessages framework [#!fig:framework_diagram!#]. The application program lies at the top of the layer and uses the event listener and publisher interfaces. The event listener and publisher can interact with each other directly, through an event channel (as in the case of SoapRMI-Events) or use the XEvents agent middleware comprising of the event listener and publisher agents. They ensure exception handling and provide a wrapper for the lower level messaging layer. The XMessages agent middleware enhances reliability by maintaining a log of messages that are to be sent so that none of them are lost if they cannot be sent immediately. It also provides automated lease renewal with the channel on behalf of higher level layers. The message channel has sink and source interfaces for accepting and forwarding messages, in addition to storing the messages in a persistent media for querying. Messages may be defined as any XML content that needs to be transmitted between source and sink while events are also XML strings but have the typed fields we discussed previously in SoapRMI-Events. Overall, we can discern the XMessages API providing dependable messaging service and the XEvents system that uses this for transmitting events.

Figure 1: Architecture of XMessages/XEvents framework
\begin{figure}\begin{center}
\epsfig{file=framework_diagram.eps, height=7cm}\end{center}\end{figure}

XEvents

XEvents provides a simple API for publishing and listening for events. It defines interfaces for event publishers and event listeners which grid applications use for exchanging events. A publisher is the source for an event and a listener is interested in consuming these events. A publisher and a listener can either interact directly with each other, exchanging events or use an XMessage channel that allows multiple publishers and listeners to communicate asynchronously. The interaction with the channel is achieved through agents which provide value added services.

As in SoapRMI-Events, we continue to use SOAP calls for method invocation during messaging and XML is used to describe the events. We have an implementation of SOAP, know as XSoap, which is used. We also retain the structure of the events fields as is used in SoapRMI events since they continue to serve the purpose adequately.

Publishing Events

Publishing an event can be as simple as getting the reference to a listener and sending preformatted XML to it by opening a socket connection to it. The listener defines a handleEvent method which is invoked using a SOAP call. But, as we saw in SoapRMI-Events, such an invocation has the disadvantage of making the application programmer take care of exceptions that may occur if the listener cannot be contacted or if it is not available while sending the event.

EventListener listener = 
  xevents.xsoap_impl.lookupEventListener(url, context);
...
try{
  listener.handleEvent(myEvent);
} catch(Exception ex){
  ...
}

So we introduce a publisher agent that acts on the behest of the event publisher. The publisher agent implements the event listener interface, the difference being that it does not throw an exception and neither does it block on the handleEvent call. The publisher creates an XEvents publisher agent and associates it with the channel to which it is sending events. The XEvents publisher agent is just a wrapper for the XMessages publisher agent and does not provide any value added features. It is present to retain the symmetry of the layers.

// Create a publisher agent from the 
//  (default) XSoap implemetation
EventPublisherAgent pAgent = 
  XEventsImpl.getDefault().createPublisherAgentFor(location);
...
// Send the event to the publisher agent
pAgent.handleEvent(myEvent);

The XMessages publisher agent has two main functions. One is to log the event and the other to enable non-blocking sending of the event. The logging interface allows for storing the XML content of the handleEvent SOAP call invoked on the channel by the agent. It also keeps track of the status of the event as pending, sent successfully or error in the send (caused by the event being rejected by the channel). The default log is maintained in the file system by using a message file that has the SOAP RPC's content and an index file with the location of each event in the message file, its size and status. The events that have been sent successfully are removed while garbage collecting. It is possible to inspect the store to check the status of each message at a later stage. Events from the log can be delivered to their destination even after the application has finished executing. Such postponed sending of events can be accomplished with a simple utility that reads the event log and sends unsent events.

The second functionality is one of 'fire-and-forget' non-blocking call of the handleEvent method by the application. This is achieved by using a separate thread for sending the events as they are received from the application and have been stored in the log. This uses a mechanism similar to the sliding window protocol in TCP. The events to be sent are stored in a queue with two pointers denoting the last event sent (LES) and the last acknowledged event (LAE). The LES is incremented as events are sent and the LAE incremented as the acknowledgments for the receipt of the event is received from the message sink. In case an exception occurs, the LES is reset to LAE+1 so that all events since the last event acknowledged is resent. This causes duplicate events to be transmitted. So the message sink has to have the ability to detect and remove duplicates.

Since the publisher agent uses additional threads for sending of events, the handleEvent call is non-blocking. Therefore, the events are sent in the background and the overhead associated with the application waiting for the events to be sent it minimized. Moreover, there is no exception to check for - the publisher agent handles event sending and will log errors while keeping unsent events in a persistent event log.

Other than the Java implementation, we provide an implementation of the event publisher and agent libraries in C++ too. The C++ agent can be created as shown below. This enables applications written in C or C++ to publish events to an XMessage channel.

/* Return the XSoap (default) implementation */
XEVENTS_XEventsImpl* eventImpl =
  XEVENTS_XEventsImpl_getDefault();
/* "NOWAIT" is default */
eventImpl->setTimeoutAtExit(XEVENTS_WAITFORTHREADS);
/* Get the non-blocking, persistent agent */
pAgent 
  = eventImpl->createPublisherAgentFor(eventImpl, locationUrl);
...
/* Send the event to the publisher agent */
pAgent->handleEvent(pAgent, myEvent);

Retrieving Events

Applications can retrieve events by implementing the event listener interface. The retrieval can be through ``pull'' or ``push'' or a combination of the two. In a ``pull'', the events are fetched from the publisher upon initiation by the listener. In a ``push'', the listener subscribes to the publisher for events matching some filter and the publisher sends events to the listener as and when events of the requested type are generated. The push model is well suited for delivering ``live events'', as and when they happen. But they have the drawback of the publisher having to keep the state information and listener location with it. So, if the publisher goes down and is brought up again, this information is lost and any events that took place in between are also missed. Also, we have the problem of the listener possibly being behind firewalls. In such a case, the pull model is the preferred alternative. But the pull model necessitates the application to periodically contact the publisher, possibly in a loop, for retrieving successive batches of events. In either case, the publisher will have to have a persistent store of the events in order to enable the listener to receive all events in the face of protracted network failures.

As we saw in SoapRMI-Events, the key problems are caused due to network failures while subscription is to be renewed and the requirement of the publisher having to maintain state information about the listener. These are addressed by using the listener agent middleware. The listener agent duplicates the interface exposed by the event publisher so that the listener can invoke methods just as it would on a publisher. The XEvents listener agent is essentially an adapter between events and messages and acts in tandem with the XMessages listener agent that provides the prime functionality.

// declare listener for events
public class MyListener 
  implements EventListener 
{
  public handleEvent(Event ev) throws Exception {
    System.out.println("received even from "+ev.getSource());
  }
}
EventListener eListener = new MyListener();
...
// get handle to event publisher
EventListenerAgent lAgent = 
  XEventsImpl.getDefault.createListenerAgentFor(location);
EventSubscriptionLease lease = lAgent.subscribeLease(eListener);
try {
  // Sleep for 10 seconds while events are pushed
  Thread.currentThread().sleep(10 * 1000);

  // Switch from push to pull for next 10 seconds
  agent.renewSubscription(lease, 10 * 1000 /* duration */ );
  Event[] events = 
    lAgent.pullEvents(lease, 
                      10, /* how many events to pull */
                      10 * 1000 /* timeout */ );
  ...
} finally {
  lAgent.cancelLease(lease);
}

The application initially creates an XEvents agent that internally starts an XMessage to XEvents adapter for the events retrieved and an XMessage listener agent. The client application starts a lease with the publisher. The agent in turn negotiates a remote lease with the publisher for the requested or maximum allowed duration and returns a local lease for the requested duration to the client. The agent then makes sure that the lease with the publisher is kept valid by resubscription as long as the applications maintains the local lease. It also begins to cache events by starting a thread that pulls events at regular intervals from the publisher. If the client has requested for a push subscription, then the agent sends events that it pulls from the publisher to the listener. If the client has a pull subscription and invokes the pull method, the agent returns the matching events from the cache. As an additional feature, it is also possible to dynamically switch between a push model and a pull model seamlessly without loosing any events in between.

In the example given above, a push lease is initially subscribed to for the first 10 seconds. Events that take pace during this time are sent to the listener by invoking the handleEvent callback on the listener with the events. Then the subscription is changed to a pull model by renewing the lease to reflect this. The pullEvents method is called with the maximum number of events to return and the timeout for which to wait for events. This allows the listener to throttle the number of events it pulls from the publisher at one given time. Successive calls to the method will return the next set of events. Once the application is finished with retrieving the events, it cancels the subscription so that the agent may free up resources it has allocated for the listener and release the lease it has with the publisher.

Thus, in both pull and push subscription mechanisms, the agent uses the pull model for retrieving events from the publisher. This overcomes the problem of dealing with firewalls. The caching also improves performance for pulls but when push subscription is used, the events are sent to the listener in pseudo-realtime ie. not as they are generated but when the agent does a periodic pull on the publisher. In addition, the listener is also able to get a persistent subscription with the agent for as long as it requires since the agent manages the subscription to the publisher to match the needs. If the publisher cannot be contacted, the agent retries connecting to it and retrieves events when the subscription is reestablished. Hence, the entire process of establishing and maintaining a connection with the publisher in the face of network failure is kept transparent to the listener and managed by the agent.

Figure 2: API of XMessages/XEvents framework
\begin{figure}\begin{center}
\epsfig{file=framework_api.eps, height=7cm}\end{center}\end{figure}

Generic XMessages API

The XMessages API provides a generic messaging platform with WSDL support, on top of which XEvents is built. Other than the XMessages listener and publisher agents which have been discussed above, the API defines the XMessageSource and XMessageSink interfaces and two structures, XMessageRequest and XMessageBatch, for requesting and returning messages. The WSDL port types for the interfaces and XML schema for the structures are listed in the XMessages/XEvents whitepaper. These can be used for accessing the messaging framework as a web-service. The WSDL files describing port types and XML schemas for message containers are available at http://www.extreme.indiana.edu/xgws/xevents/v10/xml/

Structure: XMessageBatch

XMessageBatch is the basic unit of data exchange in XMessages. We wrap a set of messages into this auxiliary structure, in addition to fields that describe the messages. We could leverage SOAP to include this metadata in the SOAP header but having one object encapsulate both data(messages) and metadata makes it easier to transport these entities through multiple protocols.

XMessageBatch contains the following fields:

Structure: XMessageRequest

XMessageRequest is used to encapsulate the parameters required by the message source for message retrieval. XMessageBatch contains the following fields:

Interface: XMessageSink

This interface defines how messages can be consumed. It contains a single method, handleMessages, that must be called in order for the messageBatch to be consumed. The way in which XMessageSink processes the messages in the messageBatch depends on the implementation; it could handle the messages and forget them, it could forward it to one or more XMessage sinks or store them in a database for future retrieval. If an error occurs while handling the message batch, an exception is returned.

public void handleMessages (XMessageBatch messages) throws Exception;

Interface: XMessageSource

The XMessageSource interface either produces messages itself or has messages available through other means for retrieval. Messages can be retrieved through the requestMessages method that initiates, continues and terminates the process of requesting the messages. The XMessageRequest parameter is used to determine the messages that will be selected for returning in the message batch. The query field in the message request argument is interpreted either as a token, in which case the messages corresponding to the token are chosen, or as a generic query, which is processed as defined in the implementation of the message source. In both cases, the message batch is populated with the messages matching the query, tokens identifying current and next set of messages, and other fields present in the message batch object. Further messages can be requested using the nextToken field for the message batch as the query string in the message request parameter. The session for this request operation can be cancelled by passing a lease request with zero duration.

public XMessageBatch requestMessages ( XMessageRequest req ) throws Exception;

Message Channel

The XMessageSource and XMessageSink can be grouped together to form an intermediary object known as the message channel. This channel can provide a host of value added services like multiplexing and demultiplexing messages between multiple listeners and publishers, maintain messages in a storage media to enable querying and analysis of past messages, provide ubiquitous messaging service to a cluster of related grid applications or connect a number of message channels to form a large event system. Such an intervening object makes it possible to extend the features of the messaging system to meet the characteristic needs of diverse grid applications.

In the current implementation, we have used the channel to save messages in a database to enable simple querying of past messages and to detect duplicate messages that may be sent by the publisher agents. The current and next tokens are used to ensure the same message is not processed again even if it is sent again. Other useful additions that could be made are providing hooks for triggers that match a certain criteria of messages, do accounting and gathering statistics about messages that pass through and act as bridges to other messaging systems.


next up previous
Next: Salient Features Up: XEVENTS/XMESSAGES: Application Events and Previous: Events and Messaging: Desiderata
Aleksander Andrzej Slominski 2002-09-20