Tuesday, June 5, 2018

OSGi R7 Highlights: Push Streams and Promises 1.1

The OSGi Compendium Release 7 specification contains new and updated specifications from the OSGi Alliance. Today I have the good fortune to be writing about one of each! The OSGi Push Streams specification is a brand new reactive data processing API in R7, and is closely related to the newly updated OSGi Promises 1.1 specification.

OSGi Utility Specifications

Before we dive into the new features provided by both of these specifications I will highlight an important, and often overlooked, fact about the OSGi specifications. You may have noticed that there are some large gaps in the OSGi specification numberings, in particular, that there is a really big gap between chapters 151 and 702.

This numbering isn't an accident. Chapters numbered in the range 700-799 are "utility" packages. These differ from other OSGi compendium specifications in that they don't define OSGi service interfaces that you look up from the service registry, instead they define classes that you use directly. This means that the OSGi utility specification packages are self-implementing, i.e., when you get the specification jar you get the reference implementation at the same time!

Push Streams and Promises are both utility specifications because the types and behaviors they define don't fit a service model, instead, they provide abstractions for asynchronous behaviors. There is, however another important feature of both specifications which makes them particularly special. Neither specification has any dependency on the OSGi Core specification! This means that not only does downloading the specification API give you an implementation, but you can also use that specification outside of an OSGi framework. Push Streams and Promises are therefore great technologies to use in any Java application that you write.

What are Push Streams?

The OSGi Push Streams specification is completely new for OSGi R7, and it defines a model for processing asynchronous streams of data using Reactive Programming techniques. Reactive Programming simply refers to a program which responds to the arrival of data or events, rather than attempting to pull data or events from the source. Reactive systems are also expected to run asynchronously and to process long running (or even infinite) streams of data.

Streaming data with Java Streams

You're probably familiar with the Java Stream API which was added in Java 8. The Stream API provides a rich functional mechanism to process streams of data (typically Java Collections).
List<Person> guests = getWeddingGuestList();

// How much wine will we need to buy?
long adults = guests.stream()
    .map(Person::getAge)
    .filter(age -> age > 21)
    .count();

Processing data using the Stream API allows you to write simple, effective code, however, it does not work well for data that cannot be generated on demand. If the list of wedding guests in the above example were based on email responses then the processing thread would spend a huge amount of time blocked polling the email server!

Streaming event-based data using Push Streams

Event-based data is data that occurs when a specific action happens. This may be a clock tick, a user clicking on a web-page, or it may indicate a train passing a signal. The important thing about event-based data is that it is generated based on an external stimulus, and not because a consumer asked for the data.

Push Streams differ from the Java Stream API because they expect data to be generated and processed asynchronously. The API, however, remains very similar:
PushStream<Person> guests = getWeddingGuestList();

// How much wine will we need to buy?
Promise<Long> adults = guests
    .map(Person::getAge)
    .filter(age -> age > 21)
    .count();

The most important difference is that the return value of a Push Stream's terminal operation is a Promise. This allows the Stream of asynchronously arriving data to be processed in a non-blocking way.

Creating your own Push Streams

Push Streams can be created easily using a Push Stream Provider. A Push Stream Provider can be configured to use specific thread pools, queueing policies, back-pressure, and circuit breaker behaviors.
PushEventSource<Email> emails = getWeddingEmailResponses();

PushStreamProvider psp = new PushStreamProvider();

// How much wine will we need to buy?
long adults = psp.createStream(emails)
    .map(Email::getFromAddress)
    .map(this::lookupPersonByEmail)
    .map(Person::getAge)
    .filter(age -> age > 21)
    .count();

Once you have a Push Stream Provider you can use (and re-use) it to create a Push Stream from any Push Event Source. While it is possible (and sometimes desirable) to create your own Push Event Source implementation, in most cases, you can create a Simple Push Event Source from the Push Stream Provider. Events can then be pushed into the  Simple Push Event Source as they occur, and they will be passed into any connected streams.
PushStreamProvider psp = new PushStreamProvider();

SimplePushEventSource<Email> spes = 
        psp.createSimpleEventSource(Email.class);

// When an "email" event occurs
spes.publish(email);

// When an error occurs
spes.error(anException);

// If/When the data stream finishes
spes.endOfStream();

Buffering and Back Pressure

In the Java Stream API the rate of data processing is determined by how fast the consumer can pull data from the source. This places a natural "brake" on the system, where the consumer cannot be overwhelmed by incoming data. In an event-based system this is no longer the case! Data events may occur far more rapidly than they can be consumed.

You can always attempt to consume events using more threads, but sooner or later your system will run out of capacity, and events will have to be queued until they can be processed. A queue of events is usually called a Buffer, and buffering is natively supported by the Push Stream specification

Buffers are useful for dealing with short-term spikes in the flow of events, but they cannot help if the long-term event arrival rate is higher than the long-term consumption rate. In this case the only options are to:
  • Discard some of the events
  • Fail the stream
  • Communicate to the event source that it should slow down
Deciding whether to discard events or fail the stream is the job of the buffer's Queue Policy. There are several built-in Queue Policy Options which provide basic behaviors, but you can implement your own behaviors if desired. Telling the producer to slow down, however, is the job of back pressure.

Back pressure in a Push Stream is a long indicating the number of milliseconds for which the event source should stop sending events in order to give the consumer time to catch up. Back pressure can be provided in a number of ways.
This back pressure is then sent back to the event producer, which may (or may not) slow down as a result.

Using Push Streams with Real Data

The UK's rail network produces data events every time a train passes a monitoring point. This information is used to help manage signalling, report delays, and for all sorts of other reasons. This data is also part of the UK government's open data program, and has a feed streaming data to anyone who signs up for an account!

There is a public example on GitHub using Push Streams to consume this data. The data can be consumed live, however, to avoid users having to sign up for an account this example replays events recorded from the real stream.

The incoming events are JSON arrays containing batches of data events. The main stream pipeline can be seen here. It uses Jackson to read the JSON, filter the data for interesting event types, map the data into Java types and then turns those data events into train reporting locations.

What's New in Promises 1.1?

The 1.1 update to Promises addresses a number of different areas.

Time-based behaviors

The first version of the OSGi Promise API gave you no way to deal with the passage of time. For example what should happen if a Promise isn't resolved for a very long time, or if it is resolved much faster than we expect?

The timeout and delay methods have been added to the Promise API to allow more sophisticated time-based behaviors to be built using Promises.

  • The timeout method returns a Promise which fails with a TimeoutException if the original Promise doesn't resolve before the supplied timeout elapses.
  • The delay method returns a Promise which does not resolve until the supplied delay has elapsed after the original promise resolves.
Using a combination of timeouts and delays can throttle a busy system, or ensure a live feel in a system that is receiving little input.

Thread management

Probably the largest change in the Promises specification is the introduction of the PromiseFactory. The Promise Factory is a powerful type which can be used as a factory for Deferred instances and resolved Promises. In addition, the Promise Factory can also be used to define the threads that should be used to execute callbacks, and to manage the time-based behaviors of the Promises. By changing the properties of the thread pool used by the Promise Factory you can ensure your callbacks are executed serially, or in parallel, or on the same thread that initiated the callback.

Usability improvements

Some of the less obvious improvements to Promises are the small usability enhancements that have been made to the API.
  • The OSGi callback functions now declare throws Exception so that your promise usage is more lambda friendly!
Promise<String> pString = getStringPromise();

// No need to worry about the URISyntaxException
Promise<URI> pURI = p.map(URI::new);
Promise<String> pString = getStringPromise();

// Doing work onResolve used to be a bit messy
pString.onResolve(() -> {
    Throwable t = pString.getFailure();
    if(t != null) {
      try { finished(pString.getValue()); } catch (Exception e) {}
    } else {
      logFailure(t);
    }
  });

// It is much simpler now!
pString.onSuccess(this::finished)
       .onFailure(this::logFailure);
  • A new thenAccept method for when you don't need the full power of then 
Promise<String> pString = getStringPromise();

// This behavior
pString.then(p -> {
    storeValue(p.getValue());
    return null;
  });

// becomes
pString.thenAccept(this::storeValue);

In Summary

OSGi Push Streams and Promises are powerful tools for asynchronous programming, even if you're not an OSGi user! If you're interested in using Push Streams or Promises then the implementations are freely available for you to start using now.



Want to find out more about OSGi R7?

This is one post in a series of 12 focused on OSGi R7 Highlights.  Previous posts are:
  1. Proposed Final Draft Now Available
  2. Java 9 Support
  3. Declarative Services
  4. The JAX-RS Whiteboard
  5. The Converter
  6. Cluster Information
  7. Transaction Control
  8. The Http Whiteboard Service
Be sure to follow us on Twitter or LinkedIn or subscribe to our Blog feed to find out when it's available.