RxJava 2 migration tips

Most projects that use RxJava should have been migrated to RxJava 2 or RxJava 3 by now as RxJava 1 is officially announced to be end-of-life since March 31st 2018. Nevertheless some projects are still using RxJava 1 and I’m currently involved in a project with a pretty big codebase that needs to be migrated from RxJava 1 to RxJava 2. It's worth noting that support for RxJava 2 ends on February 28th 2021.

So why not migrate directly to RxJava 3? Well this could be a plan, but our application depends heavily on Vert.x 3.9 which has a dependency on RxJava 2. So in order to migrate to RxJava 3 we should first upgrade to Vert.x 4.1.0 which has support for RxJava 3. We decided to upgrade in small chunks and first tackle the upgrade with the biggest impact: RxJava 1 to RxJava 2.

In this blog post I’ll provide you with tips that can smoothen your migration path to RxJava 2. All code examples used in this blog can be found in my GitHub repository.


Context

Let's sketch some context first. The application in question has an Angular frontend that communicates via REST APIs to multiple Vert.x services backed by a MongoDB cluster. The whole runs in the AWS cloud. The backend is fully reactive, while the application as a whole is not, because Angular communicates via synchronous HTTP 1.x requests to the Vert.x services in the backend.

RxJava migration stack

In this blog post we’ll be focusing on the reactive code in the backend. Vert.x is event-driven and non-blocking and supports several asynchronous programming models. In our project the choice was made (before I joined the project) to use the advanced reactive programming model with RxJava. We use the Rx-ified APIs that Vert.x offers, at that time this was Vert.x API for RxJava 1. An alternative asynchronous programming model that could be used nowadays is Kotlin coroutines or Project Loom.

The communication to MongoDB was implemented with the MongoDB RxJava Driver which is also end of life, because it depends on RxJava 1.x. It’s successor is the MongoDB Reactive Streams Java Driver which is based on the Reactive Streams API.

The goal is to migrate our codebase to the following:


RxJava 2 ≠ RxJava 1

The differences between RxJava 2 and RxJava 1 are explained very well in the official documentation. Important to highlight is that RxJava 2 has been completely rewritten from scratch and is now based on the Reactive Streams specification.

Tip 1: Don't underestimate your RxJava 2 upgrade

RxJava 2 contains several new reactive base types, many API changes and several behavioural changes that impacts an existing RxJava 1 codebase significantly.


Coexistence

RxJava 2 and RxJava 1 can be used side by side. This can be useful when gradually migrating to RxJava 2.

The artifacts for RxJava 1 and RxJava 2 are specified as follows in Maven:

<!-- RxJava 1 -->
<dependency>
   <groupId>io.reactivex</groupId>
   <artifactId>rxjava</artifactId>
   <version>1.3.8</version>
</dependency>

<!-- RxJava 2 -->
<dependency>
   <groupId>io.reactivex.rxjava2</groupId>
   <artifactId>rxjava</artifactId>
   <version>2.20.0</version>
</dependency>
Tip 2: Mind your package

When using RxJava 1 and RxJava 2 in the same project, be careful not to mix up the package names of both libraries. E.g the RxJava 2 functional interface:

io.reactivex.functions.Function

is easily confused with

java.util.function.Function

or

rx.functions.Function

Typically these functional interfaces are used as lambdas inside e.g flatMap operators and the correct types are inferred. But in some cases we found code such as the following:

import io.reactivex.functions.Function;
import io.reactive.Observable;

Function<String, String> rxFunction = String::toUpperCase;

Observable.just("something")
       .map(rxFunction)
       .subscribe(log::info);

In this simple example it would be easy to spot why the code doesn’t compile when the wrong Function is used, but in a situation with multiple chained (and potentially nested) RxJava streams the compiler error may be less intuitive.

RxJava 2 requires its own functional interfaces due to the required Java 6 support. Despite that RxJava 3 is based on Java 8, it still has its own functional interfaces, due to the inability of the standard Java 8 functional interfaces to throw checked exceptions.


Clean code

In RxJava 1 most of the reactive code is created by using one reactive base type: rx.Observable. This is the main cause that RxJava 1 code is often hard to read. Take as an example the following code:

import rx.Observable;

public interface RxJava1ProfileService {

   Observable<Profile> findById(String id);

   Observable<Profile> findByName(String name);

   Observable<Profile> findAll();

   Observable<String> createAccessToken();

   Observable<Void> insert(Profile profile);
}

From reading this code it’s hard to tell how many items will be emitted. E.g. the findById method will probably emit 1 Profile, or maybe no Profile at all? The findByName method may emit more than 1 Profile, or should it emit at most 1 Profile? The createAccessToken implies that exactly 1 access token is emitted, but based on the Observable type this can be multiple. Then there is the notorious Observable<Void> which tells us that no item will be emitted at all when the insert method is subscribed to. I’ll explain more about Observable<Void> and the peculiarities that comes with its use later.

Tip 3: Use the new Reactive base types

RxJava 2 allows us to write clean code due to its improved and extended set of reactive base types: Completable, Single, Maybe, Observable and Flowable. Using these reactive types allows us to write more readable code, but it also enforces us to think about the semantics of stream processing. Let’s have a look at the RxJava 2 version of the previous code example:

import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;

public interface RxJava2ProfileService {

   Maybe<Profile> findById(String id);

   Maybe<Profile> findByName(String name);

   Flowable<Profile> findAll();

   Single<String> createAccessToken();

   Completable insert(Profile profile);
}

So which one of the code examples do you prefer to read...?

In the RxJava 2 code example it’s clear now that the findById method may or may not emit a Profile item. The Maybe means that either no item or a single item is emitted. It’s also clear now that the findByName may emit 0 or 1 Profile item, whereas in the RxJava 1 code there could have been multiple as this cannot be clearly deduced from the method name alone. The findAll method now returns a Flowable which is the backpressure-aware version of an Observable, but more about these differences later. The Single return type of the createAccessToken method now clearly tells us that exactly 1 access token emission can be expected. And finally the notorious Observable<Void> is replaced by the Completable type which tells us that no item is emitted.

Tip 4: Eliminate all null events

RxJava 2 no longer accepts null values. This leads to much cleaner code of course, but this does mean that we need to get rid of many situations where RxJava 1 code (mis)uses these null values. Mainly there are two types of situations where this occurs a lot. The first is related to Observable<Void>, where stream values are being mapped to null. The second is related to the mixing and misuse of null values and empty streams.

The following code shows an example of the first situation which is valid in RxJava 1:

import rx.Observable;

public Observable<Void> insert(Profile profile) {
   return repository.insert(profile)
           .map(success -> null);
}

However in RxJava 2 this will yield an infamous NullPointerException:

import io.reactivex.Observable;

public Observable<Void> insertRxJava1Way(Profile profile) {
   return repository.insert(profile)
           .toObservable()
           .map(success -> null);
}

The correct way to code this in RxJava 2 is by using a Completable as follows:

import io.reactivex.Completable;

public Completable insert(Profile profile) {
   return repository.insert(profile)
           .ignoreElement();
}

Note that the ignoreElement() is needed to ignore the success value of the Single that is returned by repository.insert().

Our RxJava 1 codebase was filled with reactive code that emits nulls similar to this simplified example:

import rx.Observable;

Observable<Void> dinnerTime() {
   return cookMeal()
           .flatMap(x -> setTheTable())
           .flatMap(x -> eat())
           .flatMap(x -> clearTheTable())
           .map(x -> null);
}

In RxJava 2 this looks much better and more intuitive:

import io.reactivex.Completable;

Completable dinnerTime() {
   return cookMeal()
           .andThen(setTheTable())
           .andThen(eat())
           .andThen(clearTheTable());
}

Another nasty situation that we found several times in our codebase was the (mis)use of the singleOrDefault() operator. This is a good example that demonstrates the aforementioned second situation where null values and empty streams are mixed and misused:

rx.Observable.empty()
         .singleOrDefault(null)
         .subscribe(customResponse -> System.out.println("onNext: " + customResponse)
               , throwable -> System.out.println("onError: " + throwable.getMessage())
               , () -> System.out.println("onCompleted"));

Suppose that an Observable stream completes by emitting zero elements, then this RxJava 1 example passes the null value through the singleOrDefault operator which triggers an onNext event and finally the onComplete event. The operator differences section in the official documentation tells us that the singleOrDefault operator is replaced by the single operator in RxJava 2. However the latter will return a Single now and is not allowed to emit null as a default value. So the following code will yield a NullPointerException:

io.reactivex.Observable.empty()
         .single(null)
         .subscribe(customResponse -> System.out.println("onNext: " + customResponse)
               , throwable -> System.out.println("onError: " + throwable.getMessage()));

Looking at the initial RxJava 1 code's behaviour, apparently the source stream is expected to emit a single item or no item at all (due to the singleOrDefault operator). This is exactly what a Maybe does in RxJava 2. But what about the null value that gets emitted as a default item? Well, this is not possible in RxJava 2 and cannot be substituted in a generic manner as it depends on the logic based on the null value that is implemented further downstream. The following RxJava 2 code may therefore not work for your situation, but it shows one of the possible solutions where a Maybe is converted into a Single with a default value in case the source Maybe is empty.

io.reactivex.Maybe.empty()
                .toSingle("success")
                .subscribe(customResponse -> System.out.println("onNext: " + customResponse)
                        , throwable -> System.out.println("onError: " + throwable.getMessage()));

And yes, you are lucky when the situations in your code base are similar to the ones above...


Stream behaviour

The most tricky part of upgrading to RxJava 2 is definitely the potential changed behaviour of the reactive streams.

Tip 5: Verify reactive stream behaviour!

Let’s demonstrate this by a simple example!

import rx.Observable;

Observable<Void> dinnerTimeByMood(String mood) {
   return cookMealByMood(mood)
           .flatMap(x -> setTheTable())
           .flatMap(x -> eat())
           .flatMap(x -> clearTheTable())
           .map(x -> null);
}

Observable<String> cookMealByMood(String mood) {
   return Observable.fromCallable(() -> mood.equals("good") ? "lasagne" : null);
}

In the RxJava 1 example above, the cookMealByMood method emits either a single item (the String 'lasagne') or null. The point is that this Observable stream will always emit an onNext event (albeit by pushing null through the stream) and therefore it will flow through all of the flatMap operators. Emitting null values is of course a bad coding practice, but reality shows that you’ll be confronted with multiple of these situations in a RxJava 1 codebase.

Now suppose that the cookMealByMood method returns an empty Observable (instead of a null element) as is shown in the following code example:

Observable<String> cookMealByMood(String mood) {
   return Observable.defer(() -> mood.equals("good") ? Observable.just("lasagne") : Observable.empty());
}

In a 'bad' mood situation, the onNext event will not be emitted, but instead an onComplete event is sent immediately and all of the succeeding flatMap operators are ignored. So effectively the behaviour of this reactive stream has changed! And this is exactly the changed behaviour you will be confronted with when upgrading your codebase from RxJava 1 to RxJava 2.

Let’s examine the RxJava 2 code for this example:

import io.reactivex.Completable;

Completable dinnerTimeByMood(String mood) {
   return cookMealByMood(mood)
           .flatMapCompletable(x -> setTheTable())
           .andThen(eat())
           .andThen(clearTheTable());
}

Maybe<String> cookMealByMood(String mood) {
   return Maybe.fromCallable(() -> mood.equals("good") ? "lasagne" : null);
}

Here we’ll use the Maybe type which tells us to emit either 0 or 1 String item (or an error). Note that the null handling of the Maybe.fromCallable() indicates that the returned Maybe is empty, which differs from the similar source operators in the other base reactive classes that would signal a NullPointerException. Hence when mode equals 'bad' an onComplete event is emitted, which is similar to the Observable.empty() situation from the RxJava 1 example. However in this example the setTheTable() Completable inside the flatMapCompletable operator will not be subscribed to because there is no item emitted by the Maybe return value of cookMealByMood and therefore the onComplete event is emitted immediately. The defaultIfEmpty or switchIfEmpty operators should be used when you want to deal correctly with these empty situations. Now suppose that the setTheTable() Completable has some side effects (e.g. store something in a database), then this code would not be executed in the new situation while it is executed in the initial RxJava 1 situation using the null value events. So now we've (unintentionally) changed the behaviour of this stream. Again, adding a side effect like this would violate good coding practices, but unfortunately we’ve seen a lot of these imperative code style examples in our codebase.

All of the examples above are extremely simplified versions compared to the actual production code that we’re confronted with, but the message should be clear: verify the behaviour of your stream!

Tip 6: Verify error handling behaviour!

Another important area where the behaviour of your streams may have changed is error handling. The following example throws an IllegalStateException in the function that processes the onNext event. In this situation the stream is not yet in a terminal state and therefore this exception triggers the onError event and the stream terminates.

import rx.Observable;

Observable.just(CustomResponse.ENDED)
                .subscribe(customResponse -> {
                    System.out.println("onNext");
                    throw new IllegalStateException("Response should not have been ended");
                }, throwable -> {
                    System.out.println("onError: " + throwable.getMessage());
                }, () -> System.out.println("onCompleted"));

In RxJava 2 this behaviour is the same for an io.reactivex.Observable, but it changes when an io.reactivex.Single or io.reactivex.Maybe is used:

import io.reactivex.Single;

Single.just(CustomResponse.ENDED)
                .subscribe(customResponse -> {
                    System.out.println("onSuccess");
                    throw new IllegalStateException("Response should not have been ended");
                }, throwable -> {
                    System.out.println("onError: " + throwable.getMessage());
                });

Now the stream reaches a terminal state (onSuccess) before the IllegalStateException is thrown. So in this situation the onError event is not triggered, but instead the current thread’s uncaught exception handler is called. The error handling section in the official documentation provides more information about the changes in RxJava 2 regarding error handling.

In our codebase we had many of these situations, mainly because rx.Observable types are often replaced by io.reactive.Single, io.reactivex.Completable or io.reactivex.Maybe types and that’s when this may become an issue.


Backpressure

RxJava 2 introduces a new reactive base type called Flowable which has support for backpressure. The io.reactivex.Observable is non-backpressured.

Tip 7: Decide if you need Flowable or Observable

When upgrading to RxJava 2, most likely you’ll want to replace rx.Observable with io.reactivex.Flowable unless your stream doesn’t support backpressure at all. The official documentation explains in more detail when to use which type.

The following RxJava 1 code provides an example where an Observable is overproducing the consumer with numbers. This code will fail fast with a MissingBackpressureException, because the RxJava 1 Observable is backpressure-aware.

import rx.Observable;
import rx.schedulers.Schedulers;

Observable.interval(5, MILLISECONDS)
       .doOnNext(number -> System.out.println("i: " + number))
       .observeOn(Schedulers.computation())
       .subscribe(number -> sleep(1000));

The same code in RxJava 2 however will fail slowly, because io.reactivex.Observable has no support for backpressure and will therefore not signal a MissingBackpressureException.

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

Observable.interval(1, MILLISECONDS)
       .doOnNext(number -> System.out.println("i: " + number))
       .observeOn(Schedulers.computation())
       .subscribe(number -> sleep(1000));

This means that blindly replacing rx.Observable with io.reactivex.Observable does not guarantee the same behaviour. So make sure to use Flowable whenever you need support for backpressure:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

Flowable.interval(1, MILLISECONDS)
       .doOnNext(number -> System.out.println("i: " + number))
       .observeOn(Schedulers.computation())
       .subscribe(number -> sleep(1000));

In production code, these types of back pressure problems are often difficult to spot and usually do not occur until a system is under (high) load.


Interoperability

In some projects it may be useful to keep (for the time being) some parts of the codebase in RxJava 1 and migrate gradually to RxJava 2.

Tip 8: Use RxJavaInterop to convert RxJava 1 <-> RxJava 2

We’ve used RxJavaInterop mainly to convert between RxJava 1 and RxJava 2 reactive base types and vice versa. This can be useful when you have to deal with (legacy) third party libraries that still use RxJava 1 (although it’s better not to use these at all of course). This library can also be useful at the bounded contexts of your own system’s code base.

An example that we’ve seen a lot in RxJava 1 code is an Observable that emits a single List of Strings. In RxJava 2 this should become a Single instead and that can be easily done with RxJavaInterop:

import hu.akarnokd.rxjava.interop.RxJavaInterop;
import io.reactivex.Single;
import rx.Observable;

Observable<List<String>> sportsObservable = Observable.just("soccer", "tennis", "padel").toList();

// converts RxJava1 Observable to RxJava1 Single and then convert to RxJava2 Single
RxJavaInterop.toV2Single(sportsObservable.toSingle());
// converts RxJava1 Observable to RxJava2 Observable from which RxJava2 Single is created
Single.fromObservable(RxJavaInterop.toV2Observable(sportsObservable));

It’s also possible to convert from RxJava 2 to RxJava 1. Suppose that for some reason (consider this bad practice though) you’d need to convert from io.reactivex.Completable to rx.Observable<Void> by emitting a null event, then this can be done as follows:

import hu.akarnokd.rxjava.interop.RxJavaInterop;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Completable;

RxJavaInterop.toV1Observable(
        doSomethingAndComplete().toSingle(() -> "").toObservable(), 
        BackpressureStrategy.ERROR
    ).map(s -> null);

private Completable doSomethingAndComplete() {
   return Completable.create(emitter -> {
       System.out.println("doSomething");
       emitter.onComplete();
   });
}

What happens is that the io.reactivex.Completable returned from doSomethingAndComplete() is converted into an io.reactivex.Single by emitting a String element, which is converted to an io.reactivex.Observable. This RxJava 2 Observable is converted via RxJavaInterop to an RxJava 1 Observable (note that RxJava 1 Observable is backpressure-aware hence the BackpressureStrategy.ERROR) and maps the emitted String element to the so called null event.


RxJava 3

Upgrading your codebase from RxJava 1 to RxJava 2 or RxJava 3 can be a huge operation, but the upgrade from RxJava 2 to RxJava 3 is a much smaller beast. The RxJava 3 documentation explains thoroughly what the differences are compared to RxJava 2. The RxJavaBridge helps you to bridge between RxJava 2 and RxJava 3. The RxJavaInterop 3.x library can be used to convert between RxJava 1 and RxJava 3 and vice versa, similar to the RxJavaInterop library we’ve used in this blog.


Conclusion

The transition to RxJava 2 is a long, error-prone road. This blogpost outlines several tips that can make the road to RxJava 2 less bumpy for your project. The most important lesson we have learned is to not underestimate this transition. This is due to the new reactive base types, many API changes but mainly due to the tricky situations where the behaviour of streams is not guaranteed to be the same! All code examples can be found in my GitHub repository.