I have been banging my head on this one for a while, and now that I finally get it I thought I might write this down.
When using RxJava it is generally not a great idea to be using
Observable.create() because it doesn’t doesn’t handle backpressure.
But sometimes I find myself in need of a custom
Observable which I can’t create using
fromCallable() or other methods. For example, I might
have a process that emits multiple events (something that
fromCallable() can’t handle). Or I might have something that produces the event in an asynchronous fashion, such as when
wrapping an existing API that takes a callback.
So the trick is to
create() an observable that does support backpressure in a proper way. It turns out that there’s a helper class called
SyncOnSubscribe that does this.
But how do you use it? After looking at it a couple of times over the past months, it finally clicked (with a little help from Paul, cheers for that!).
SyncOnSubscribe is what you should pass to
Observable.create() in stead of your own
OnSubscribe<T>. So far, so good. Now how do we implement it?
The javadoc is quite daunting, at least to me.
There are a bunch of static methods to create an
OnSyncSubscribe instance with various overloads:
Let’s ignore all of the static helper methods and create a subclass that implements the required methods.
generateState(). There’s that state thing again. The javadoc says:
Executed once when subscribed to by a subscriber … to produce a state value.
The important thing here is the executed once part. This method allows to initialize or start whatever you need to do for the
Observable to start producing
items. Secondly, you can return anything here which will serve as the state. This comes into play in the
next(state, observer) method is called every time an item is requested from the
Observable. When this method is called, your
SyncOnSubscribe should produce exactly
one item (potentially blocking) and call
observer.onNext() once, or call
observer.onError() to indicate completion or error.
next() method receives the state that was created in
generateState() (if any) and returns the new state value. So although this variable is called state it can be any object that you would
require in your logic to produce the next value when
next() is called. For example, state could be an
InputStream you are reading items from.
onUnsubscribe(state) is a method you can optionally implement to clean up when the
Observable is unsubscribed, and also receives the state again. This is where you could close the
InputStream in the previous example.
What about AsyncOnSubscribe?
AsyncOnSubscribe which is basically the same thing, but has a slightly different method signature for
next(S state, long requested, Observer<Observable<? extends T>> observer)
This variant takes a count of
requested items. Note that
Long.MAX_VALUE means that anything goes, no upper bound or all available items. The observer expects an Observable that produces the requested amount of items, which is what makes it async.
Note that creating an observable like that can still be tricky and thus does not completely solve the case where you are wrapping some kind of async API that works with callbacks of some kind.
Fortunately the new
fromAsync() operator in the 1.1.7 release helps with that and that’s probably what you’d generally use.
As I’m writing this down now, I realise that all of the above isn’t that complicated once you figure it out, but it took quite some time for me to wrap my head around it anyway, so let’s recap:
generateStateallows you to initialise whatever you call your state and serves as a callback that your
Observableis being subscribed to.
SyncOnSubscribegenerates one item in it’s
requesteditems that you create.
onUnsubscribeis called when the
Observableis unsubscribed, and allows for clean up.
Observables wrapping a callback use