ReactiveX/RxJava Tutorial: Compute the Fibonacci Numbers using RxJava.Flowable Asynchronously


RxJava (https://github.com/ReactiveX/RxJava) is the Reactive Extension for Java Virtual Machine (JVM). It allows you to develop asynchronous code in the reactive manner. If you come from imperative or OOP, it might take you a while to get used to the different terminologies of the ReactiveX – but once you get used to it – you will realise how cool, easy, and powerful the RxJava is.

rxjava ReactiveX/RxJava Tutorial: Compute the Fibonacci Numbers using RxJava.Flowable Asynchronously java RxJava

ReactiveX/RxJava

This tutorial will present you a simple RxJava code that computes the Fibonacci numbers asynchronously using the Flowable – which represents 0 to N flows and supports the Reactive Streams and backpressure.

Static Java Function to Compute the Fibonacci Numbers

We can compute the Fibonacci numbers using iterative manner:

1
2
3
4
5
6
7
8
9
private static int getFib(int n) {
    int a = 0, b  = 1;
    while (n -- > 0) {
        int t = a + b;
        a = b;
        b = t;
    }
    return a;
}
private static int getFib(int n) {
    int a = 0, b  = 1;
    while (n -- > 0) {
        int t = a + b;
        a = b;
        b = t;
    }
    return a;
}

Nothing fancy – let’s assume we want to compute the 1000th and 100000th Fibonacci – this iterative will need to both 1000 and 100000 times. In other words, computing each Fiboancci number is independent from each other. We can use Recursive implementation with a Hash Map to cut down the runtime complexity – which is known as the memoization technology.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// memo table
private static ConcurrentHashMap<Integer, Integer> memo = new ConcurrentHashMap<>();
 
private static int getFib(int n) {
    // computed already, return cached result
    if (memo.containsKey(n)) {
        return memo.get(n);
    }
    if (n == 1) return 1;
    if (n == 2) return 1;
    var fib = getFib(n - 1) + getFib(n - 2);
    // put result in cache
    memo.put(n, fib);
    return fib;
}
// memo table
private static ConcurrentHashMap<Integer, Integer> memo = new ConcurrentHashMap<>();

private static int getFib(int n) {
    // computed already, return cached result
    if (memo.containsKey(n)) {
        return memo.get(n);
    }
    if (n == 1) return 1;
    if (n == 2) return 1;
    var fib = getFib(n - 1) + getFib(n - 2);
    // put result in cache
    memo.put(n, fib);
    return fib;
}

Here we use the ConcurrentHashMap or the regular HashMap because we want to fetch and store the values concurrently in the Rx threads

Using Flowable to Do the Simple Background Processing

Flowable is one of the Producer in RxJava world. You can generate 0 to N flows with Flowable. Then we can chain the stream and finally subscribe to a consumer – which will eat these data in the flow. For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.helloacm;
 
import io.reactivex.*;
import java.util.concurrent.ConcurrentHashMap;
 
public class Main {
    private static ConcurrentHashMap<Integer, Integer> memo = new ConcurrentHashMap<>();
 
    private static int getFib(int n) {
        if (memo.containsKey(n)) {
            return memo.get(n);
        }
        if (n == 1) return 1;
        if (n == 2) return 1;
        var fib = getFib(n - 1) + getFib(n - 2);
        memo.put(n, fib);
        return fib;
    }
 
    public static void main(String[] args) {
        Flowable.range(1, 10)
                .map((x) -> getFib(x))
                .blockingSubscribe(System.out::println);
    }
}
package com.helloacm;

import io.reactivex.*;
import java.util.concurrent.ConcurrentHashMap;

public class Main {
    private static ConcurrentHashMap<Integer, Integer> memo = new ConcurrentHashMap<>();

    private static int getFib(int n) {
        if (memo.containsKey(n)) {
            return memo.get(n);
        }
        if (n == 1) return 1;
        if (n == 2) return 1;
        var fib = getFib(n - 1) + getFib(n - 2);
        memo.put(n, fib);
        return fib;
    }

    public static void main(String[] args) {
        Flowable.range(1, 10)
                .map((x) -> getFib(x))
                .blockingSubscribe(System.out::println);
    }
}

The Flowable.range(1, 10) will yield a flow containing from start 1 and count 10 which makes 1, 2 to 10. Then we use the map (similar to the Map/Reduce in Javascript or Python) to invoke the Fibonacci number computation in background (although Fiboancci numbers are computed sequentially). Finally we subscribe to System.out::println which consumes the data.

We can use subscribe as well which will not block the main thread – however, we might need to wait the flowable to finish, if not, once the main thread is terminated, the flowable thread will also be killed.

–EOF (The Ultimate Computing & Technology Blog) —

GD Star Rating
loading...
677 words
Last Post: What are Big4 Tech Companies looking for in the technical interviews (Phone Screening)?
Next Post: Greedy Algorithm to Find the Lexicographically Smallest Sequences After At Most K Element Swaps

The Permanent URL is: ReactiveX/RxJava Tutorial: Compute the Fibonacci Numbers using RxJava.Flowable Asynchronously

Leave a Reply