XEvents provides a simple API for publishing and listening for events. It defines interfaces for event publishers and event listeners which grid applications use for exchanging events. A publisher is the source for an event and a listener is interested in consuming these events. A publisher and a listener can either interact directly with each other, exchanging events or use an XMessage channel that allows multiple publishers and listeners to communicate asynchronously. The interaction with the channel is achieved through agents which provide value added services.
As in SoapRMI-Events, we continue to use SOAP calls for method invocation during messaging and XML is used to describe the events. We have an implementation of SOAP, know as XSoap, which is used. We also retain the structure of the events fields as is used in SoapRMI events since they continue to serve the purpose adequately.
EventListener listener =
xevents.xsoap_impl.lookupEventListener(url, context);
...
try{
listener.handleEvent(myEvent);
} catch(Exception ex){
...
}
So we introduce a publisher agent that acts on the behest of the event publisher. The publisher agent implements the event listener interface, the difference being that it does not throw an exception and neither does it block on the handleEvent call. The publisher creates an XEvents publisher agent and associates it with the channel to which it is sending events. The XEvents publisher agent is just a wrapper for the XMessages publisher agent and does not provide any value added features. It is present to retain the symmetry of the layers.
// Create a publisher agent from the // (default) XSoap implemetation EventPublisherAgent pAgent = XEventsImpl.getDefault().createPublisherAgentFor(location); ... // Send the event to the publisher agent pAgent.handleEvent(myEvent);
The XMessages publisher agent has two main functions. One is to log the event and the other to enable non-blocking sending of the event. The logging interface allows for storing the XML content of the handleEvent SOAP call invoked on the channel by the agent. It also keeps track of the status of the event as pending, sent successfully or error in the send (caused by the event being rejected by the channel). The default log is maintained in the file system by using a message file that has the SOAP RPC's content and an index file with the location of each event in the message file, its size and status. The events that have been sent successfully are removed while garbage collecting. It is possible to inspect the store to check the status of each message at a later stage. Events from the log can be delivered to their destination even after the application has finished executing. Such postponed sending of events can be accomplished with a simple utility that reads the event log and sends unsent events.
The second functionality is one of 'fire-and-forget' non-blocking call of the handleEvent method by the application. This is achieved by using a separate thread for sending the events as they are received from the application and have been stored in the log. This uses a mechanism similar to the sliding window protocol in TCP. The events to be sent are stored in a queue with two pointers denoting the last event sent (LES) and the last acknowledged event (LAE). The LES is incremented as events are sent and the LAE incremented as the acknowledgments for the receipt of the event is received from the message sink. In case an exception occurs, the LES is reset to LAE+1 so that all events since the last event acknowledged is resent. This causes duplicate events to be transmitted. So the message sink has to have the ability to detect and remove duplicates.
Since the publisher agent uses additional threads for sending of events, the handleEvent call is non-blocking. Therefore, the events are sent in the background and the overhead associated with the application waiting for the events to be sent it minimized. Moreover, there is no exception to check for - the publisher agent handles event sending and will log errors while keeping unsent events in a persistent event log.
Other than the Java implementation, we provide an implementation of the event publisher and agent libraries in C++ too. The C++ agent can be created as shown below. This enables applications written in C or C++ to publish events to an XMessage channel.
/* Return the XSoap (default) implementation */ XEVENTS_XEventsImpl* eventImpl = XEVENTS_XEventsImpl_getDefault(); /* "NOWAIT" is default */ eventImpl->setTimeoutAtExit(XEVENTS_WAITFORTHREADS); /* Get the non-blocking, persistent agent */ pAgent = eventImpl->createPublisherAgentFor(eventImpl, locationUrl); ... /* Send the event to the publisher agent */ pAgent->handleEvent(pAgent, myEvent);
As we saw in SoapRMI-Events, the key problems are caused due to network failures while subscription is to be renewed and the requirement of the publisher having to maintain state information about the listener. These are addressed by using the listener agent middleware. The listener agent duplicates the interface exposed by the event publisher so that the listener can invoke methods just as it would on a publisher. The XEvents listener agent is essentially an adapter between events and messages and acts in tandem with the XMessages listener agent that provides the prime functionality.
// declare listener for events
public class MyListener
implements EventListener
{
public handleEvent(Event ev) throws Exception {
System.out.println("received even from "+ev.getSource());
}
}
EventListener eListener = new MyListener();
...
// get handle to event publisher
EventListenerAgent lAgent =
XEventsImpl.getDefault.createListenerAgentFor(location);
EventSubscriptionLease lease = lAgent.subscribeLease(eListener);
try {
// Sleep for 10 seconds while events are pushed
Thread.currentThread().sleep(10 * 1000);
// Switch from push to pull for next 10 seconds
agent.renewSubscription(lease, 10 * 1000 /* duration */ );
Event[] events =
lAgent.pullEvents(lease,
10, /* how many events to pull */
10 * 1000 /* timeout */ );
...
} finally {
lAgent.cancelLease(lease);
}
The application initially creates an XEvents agent that internally starts an XMessage to XEvents adapter for the events retrieved and an XMessage listener agent. The client application starts a lease with the publisher. The agent in turn negotiates a remote lease with the publisher for the requested or maximum allowed duration and returns a local lease for the requested duration to the client. The agent then makes sure that the lease with the publisher is kept valid by resubscription as long as the applications maintains the local lease. It also begins to cache events by starting a thread that pulls events at regular intervals from the publisher. If the client has requested for a push subscription, then the agent sends events that it pulls from the publisher to the listener. If the client has a pull subscription and invokes the pull method, the agent returns the matching events from the cache. As an additional feature, it is also possible to dynamically switch between a push model and a pull model seamlessly without loosing any events in between.
In the example given above, a push lease is initially subscribed to for the first 10 seconds. Events that take pace during this time are sent to the listener by invoking the handleEvent callback on the listener with the events. Then the subscription is changed to a pull model by renewing the lease to reflect this. The pullEvents method is called with the maximum number of events to return and the timeout for which to wait for events. This allows the listener to throttle the number of events it pulls from the publisher at one given time. Successive calls to the method will return the next set of events. Once the application is finished with retrieving the events, it cancels the subscription so that the agent may free up resources it has allocated for the listener and release the lease it has with the publisher.
Thus, in both pull and push subscription mechanisms, the agent uses the pull model for retrieving events from the publisher. This overcomes the problem of dealing with firewalls. The caching also improves performance for pulls but when push subscription is used, the events are sent to the listener in pseudo-realtime ie. not as they are generated but when the agent does a periodic pull on the publisher. In addition, the listener is also able to get a persistent subscription with the agent for as long as it requires since the agent manages the subscription to the publisher to match the needs. If the publisher cannot be contacted, the agent retries connecting to it and retrieves events when the subscription is reestablished. Hence, the entire process of establishing and maintaining a connection with the publisher in the face of network failure is kept transparent to the listener and managed by the agent.
The XMessages API provides a generic messaging platform with WSDL support, on top of which XEvents is built. Other than the XMessages listener and publisher agents which have been discussed above, the API defines the XMessageSource and XMessageSink interfaces and two structures, XMessageRequest and XMessageBatch, for requesting and returning messages. The WSDL port types for the interfaces and XML schema for the structures are listed in the XMessages/XEvents whitepaper. These can be used for accessing the messaging framework as a web-service. The WSDL files describing port types and XML schemas for message containers are available at http://www.extreme.indiana.edu/xgws/xevents/v10/xml/
XMessageBatch contains the following fields:
While monitoring live messages i.e. messages that are being retrieved as they are generated, if the request does not return any messages, then the same token needs to be sent. Otherwise, the nextToken from the XMessageBatch should be used to get the subsequent messages batch.
This interface defines how messages can be consumed. It contains a single method, handleMessages, that must be called in order for the messageBatch to be consumed. The way in which XMessageSink processes the messages in the messageBatch depends on the implementation; it could handle the messages and forget them, it could forward it to one or more XMessage sinks or store them in a database for future retrieval. If an error occurs while handling the message batch, an exception is returned.
public void handleMessages (XMessageBatch messages) throws Exception;
public XMessageBatch requestMessages ( XMessageRequest req ) throws Exception;
In the current implementation, we have used the channel to save messages in a database to enable simple querying of past messages and to detect duplicate messages that may be sent by the publisher agents. The current and next tokens are used to ensure the same message is not processed again even if it is sent again. Other useful additions that could be made are providing hooks for triggers that match a certain criteria of messages, do accounting and gathering statistics about messages that pass through and act as bridges to other messaging systems.