JavaFX and RxJava

Netflix has released a Java port of .NET’s Reactive Extensions called RxJava. I’m new to RxJava although I’ve played around a little with its .NET counterpart.

In a nutshell, reactive programming is a not unlike the traditional days of unix shell scripting where you’d pipe streams of text from simple primitives to one another. Here are a couple of examples from the Bash Manual.

ls -l | grep "\.txt$"
ls -l | sed -e "s/[aeio]/u/g" 

You can also combine two streams into one and then do processing on them.

# Look for ERROR string in both stdout and stderr.
foo 2>&1 | grep ERROR

Similarly, if you were doing GUI programming for select objects on a screen, you might wish to implement CTRL+Click as a way to add items to a selection. Doing this using event handlers can lead to a tangle of logic very quickly, especially if CTRL key is also used for other functionality.

Using Reactive programming, you’d essentially express your logic this way: listen to key down event and filter for CTRL key, then launch a listener for the click event, and after that launch listener for KeyUpEvent. This can be all done in one method, so that the logic stays together.

     cat keyDownEvents | grep -l CTRL | xargs 'cat clickEvent' | xargs 'cat keyUpEvents | grep -l CTRL'

More examples would be forthcoming, but in the meantime, I’d like to introduce a way to transform an event handler into an observable stream.

/* the following defines an observable stream of click events on a button btn  */
/* the button's event handler is not hooked up until this observable stream is */
/* subscribed by an Observer. (We pass in a routine to hook up the event       */
/* handler just-in-time.                                                       */
Observable<ActionEvent> clickEvents = OperationToObservableEvent.toObservable(new Action1<EventHandler<ActionEvent>>() {
  @Override
  public void call(EventHandler<ActionEvent> eventHandler) {
    btn.setOnAction(eventHandler);
  }
});

The syntax is rather ugly without type inference, but I’d expect Java 8′s lambdas to improve the readability significantly.

/* We subscribe to the stream here. Make sure you call subscription.close()  */
/* when you are done, or the event handler will cause a memory leak.         */
Subscription subscription = clickEvents.subscribe(new Action1<ActionEvent>() {
        @Override
        public void call(ActionEvent t1) {
          System.out.println("Event Handled" + t1);
        }
      });

And here’s the supporting code:

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package rx.operators;

import javafx.event.Event;
import javafx.event.EventHandler;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

/**
 *
 * @author Chui
 */
public class OperationToObservableEvent {

  private static class ToObservableEvent<T extends Event>
          implements Func1<Observer<T>, Subscription>,
          Subscription,
          EventHandler<T> {

    private final Action1<EventHandler<T>> addListener;
    private final Action1<EventHandler<T>> removeListener;
    private Observer<T> observer;

    public ToObservableEvent(
            Action1<EventHandler<T>> addListener,
            Action1<EventHandler<T>> removeListener) {
      this.addListener = addListener;
      this.removeListener = removeListener;
    }

    @Override
    public Subscription call(final Observer<T> observer) {
      this.observer = observer;
      this.addListener.call(this);
      return this;
    }

    @Override
    public void handle(T event) {
      observer.onNext(event);
    }

    @Override
    public void unsubscribe() {
      if (this.removeListener == null)
      {
        // there is no removeEventListener function,
        // so we assume this is a setEventListener,
        // and set it to null
        this.addListener.call(null);
      }
      else
      {
        this.removeListener.call(this);
      }
    }
  }
  
  /**
   * Returns an observable stream of events. 
   * @param <T> the type of events in the stream
   * @param setListener a callback to hook up an event handler when an Observer subscribes to the stream.
   * @return an Observable stream of events of type T
   */
  public static <T extends Event> Observable<T> toObservable(
          Action1<EventHandler<T>> setListener)
  {
    return Observable.create(toObservableEvent(setListener, null));
  }
  
  /**
   * 
   * @param <T> the type of events in the stream
   * @param addListener a callback to hook up event handler when an Observer subscribes to the stream
   * @param removeListener a callback to unhook the event handler when the subscription is done
   * @return an Observable stream of events of type T
   */
  public static <T extends Event> Observable<T> toObservable(
          Action1<EventHandler<T>> addListener,
          Action1<EventHandler<T>> removeListener)
  {
    return Observable.create(toObservableEvent(addListener, removeListener));
  }
  
  /**
   * This is an internal routine.
   * @param <T>
   * @param addListener
   * @param removeListener
   * @return 
   */
  public static <T extends Event> Func1<Observer<T>, Subscription> toObservableEvent(
          Action1<EventHandler<T>> addListener,
          Action1<EventHandler<T>> removeListener)
  {
    return new ToObservableEvent(addListener, removeListener);
  }
}
/*
 * CompositeDisposable is a port of the .NET equivalent
 * Add any subscriptions to a CompositeDisposable, and when 
 * the unsubscribe() is called, all the subscriptions will
 * be unsubscribed at the same time.
 */
package rx.disposables;

import java.util.ArrayList;
import rx.Subscription;

public class CompositeDisposable extends ArrayList<Subscription> 
  implements Subscription, AutoCloseable
{

  @Override
  public void unsubscribe() {
    for(Subscription item: this)
    {
      item.unsubscribe();
    }
  }

  @Override
  public void close() {
    this.unsubscribe();
  }
  
}

About this entry