JavaFX and RxJava
Netflix has released a Java port of .NET’s Reactive Extensions called RxJava. I’m new to RxJava although I’ve played around a little with its .NET counterpart.
In a nutshell, reactive programming is a not unlike the traditional days of unix shell scripting where you’d pipe streams of text from simple primitives to one another. Here are a couple of examples from the Bash Manual.
ls -l | grep "\.txt$" ls -l | sed -e "s/[aeio]/u/g"
You can also combine two streams into one and then do processing on them.
# Look for ERROR string in both stdout and stderr. foo 2>&1 | grep ERROR
Similarly, if you were doing GUI programming for select objects on a screen, you might wish to implement CTRL+Click as a way to add items to a selection. Doing this using event handlers can lead to a tangle of logic very quickly, especially if CTRL key is also used for other functionality.
Using Reactive programming, you’d essentially express your logic this way: listen to key down event and filter for CTRL key, then launch a listener for the click event, and after that launch listener for KeyUpEvent. This can be all done in one method, so that the logic stays together.
cat keyDownEvents | grep -l CTRL | xargs 'cat clickEvent' | xargs 'cat keyUpEvents | grep -l CTRL'
More examples would be forthcoming, but in the meantime, I’d like to introduce a way to transform an event handler into an observable stream.
/* the following defines an observable stream of click events on a button btn */
/* the button's event handler is not hooked up until this observable stream is */
/* subscribed by an Observer. (We pass in a routine to hook up the event */
/* handler just-in-time. */
Observable<ActionEvent> clickEvents = OperationToObservableEvent.toObservable(new Action1<EventHandler<ActionEvent>>() {
@Override
public void call(EventHandler<ActionEvent> eventHandler) {
btn.setOnAction(eventHandler);
}
});
The syntax is rather ugly without type inference, but I’d expect Java 8′s lambdas to improve the readability significantly.
/* We subscribe to the stream here. Make sure you call subscription.close() */
/* when you are done, or the event handler will cause a memory leak. */
Subscription subscription = clickEvents.subscribe(new Action1<ActionEvent>() {
@Override
public void call(ActionEvent t1) {
System.out.println("Event Handled" + t1);
}
});
And here’s the supporting code:
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package rx.operators;
import javafx.event.Event;
import javafx.event.EventHandler;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
/**
*
* @author Chui
*/
public class OperationToObservableEvent {
private static class ToObservableEvent<T extends Event>
implements Func1<Observer<T>, Subscription>,
Subscription,
EventHandler<T> {
private final Action1<EventHandler<T>> addListener;
private final Action1<EventHandler<T>> removeListener;
private Observer<T> observer;
public ToObservableEvent(
Action1<EventHandler<T>> addListener,
Action1<EventHandler<T>> removeListener) {
this.addListener = addListener;
this.removeListener = removeListener;
}
@Override
public Subscription call(final Observer<T> observer) {
this.observer = observer;
this.addListener.call(this);
return this;
}
@Override
public void handle(T event) {
observer.onNext(event);
}
@Override
public void unsubscribe() {
if (this.removeListener == null)
{
// there is no removeEventListener function,
// so we assume this is a setEventListener,
// and set it to null
this.addListener.call(null);
}
else
{
this.removeListener.call(this);
}
}
}
/**
* Returns an observable stream of events.
* @param <T> the type of events in the stream
* @param setListener a callback to hook up an event handler when an Observer subscribes to the stream.
* @return an Observable stream of events of type T
*/
public static <T extends Event> Observable<T> toObservable(
Action1<EventHandler<T>> setListener)
{
return Observable.create(toObservableEvent(setListener, null));
}
/**
*
* @param <T> the type of events in the stream
* @param addListener a callback to hook up event handler when an Observer subscribes to the stream
* @param removeListener a callback to unhook the event handler when the subscription is done
* @return an Observable stream of events of type T
*/
public static <T extends Event> Observable<T> toObservable(
Action1<EventHandler<T>> addListener,
Action1<EventHandler<T>> removeListener)
{
return Observable.create(toObservableEvent(addListener, removeListener));
}
/**
* This is an internal routine.
* @param <T>
* @param addListener
* @param removeListener
* @return
*/
public static <T extends Event> Func1<Observer<T>, Subscription> toObservableEvent(
Action1<EventHandler<T>> addListener,
Action1<EventHandler<T>> removeListener)
{
return new ToObservableEvent(addListener, removeListener);
}
}
/*
* CompositeDisposable is a port of the .NET equivalent
* Add any subscriptions to a CompositeDisposable, and when
* the unsubscribe() is called, all the subscriptions will
* be unsubscribed at the same time.
*/
package rx.disposables;
import java.util.ArrayList;
import rx.Subscription;
public class CompositeDisposable extends ArrayList<Subscription>
implements Subscription, AutoCloseable
{
@Override
public void unsubscribe() {
for(Subscription item: this)
{
item.unsubscribe();
}
}
@Override
public void close() {
this.unsubscribe();
}
}
No comments
Jump to comment form | comments rss [?] | trackback uri [?]