Splitting up “Streams” using jOOλ, Kotlin, and ReactiveX

The Java 8 Stream API is pretty cool as you can see in my last post BFS with Streams. But there is a catch. You cannot reuse Streams. Consider the following code Java-Code:

final List<String> helloList =
    Arrays.asList("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!");

final Stream<String> helloStream = helloList.stream();
final Predicate<String> checkUpper = s -> !s.isEmpty()
    && !s.substring(0,1).toUpperCase().equals(s.substring(0, 1));
helloStream.filter(checkUpper);
helloStream.filter(s -> !checkUpper.test(s));

This results in IllegalStateException: stream has already been operated upon or closed.
The problem with Java 8’s Stream API is that they are designed for parallel execution. This decision introduced some constraints. Unfortunately, there is no sequential only-switch. What you can do is collect the results with groupBy into a map and then create two new streams from that. But collecting is a terminal operation, not lazy, and therefore inefficient (especially in combination, with early-exit operations like limit). You can also try to do the first filter, chain it with a peek, and finally do the second filter. But since only elements matching the first filter will reach the second filter (i.e. a && !a which is equal to false), you won’t get any elements past the second filter. If you have a so called cold source (i.e. like a collection), you can just use two different streams which results in two iterations. But for hot sources (like a network or file i/o stream), this is not that easy. A possible solution is to cache the input in a collection, i.e., cool it down. But this comes with a space and performance penalty. So let us see, what our options are…

I couldn’t resist and tried it in Kotlin 1.0 with their sequences:

val helloArray = arrayOf("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!")

val helloSeq = helloArray.asSequence()
val checkUpper: (String) -> Boolean = { it.firstOrNull()?.isUpperCase() ?: false }
val upperSeq = helloSeq.filter(checkUpper)
val lowerSeq = helloSeq.filter { !checkUpper(it) }.map(String::toUpperCase)

println(upperSeq.joinToString())
println(lowerSeq.joinToString())

And hey, it works. Output:

H, W
E, L, L, O, , , O, R, L, D, !

But for Java we have a solution, too. You can use jOOλ, which provides some extensions to the Stream API in particular for sequential streams via the Seq interface. Reusing streams does not work here either. But we have nice additional (lazy) operations like partition:

final Tuple2<Seq<String>, Seq<String>> partition = Seq.of("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!").partition(checkUpper);
System.out.println(partition.v1().map(String::toUpperCase).toString(" "));
System.out.println(partition.v2().map(String::toLowerCase).toString(" "));

This works and results in the following output:

E L L O O R L D
h ,  w !

Of course we can grab even deeper into the toy/tool box and use RXJava and the groupBy feature or more precisely RXKotlin in this case (a Java implementation is quite similar, but uses much uglier syntax):

val helloArray = arrayOf("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!")
Observable.from(helloArray)
        .groupBy {
            it.firstOrNull()?.isUpperCase() ?: false
        }.subscribe {
    if (it.key) {
       it.subscribe { print("$it") }
    }
    else {
        it.map { it.toUpperCase() }.subscribe {
            print("$it")
        }
    }
}

The output is:

hELLO, wORLD!

Using ReactiveX might be overkill in many situations, especially, if you would like to collect the result into some kind of collection. ReactiveX is more a (data-driven) programming paradigm, where observers can react to emissions of observables. So check first what you would like to accomplish before you switch to reactiveX as (after beginning to use it) it eventually will change the complete control flow of your application. This might be good or not :).

Exciting.

Leave a Reply

Your email address will not be published. Required fields are marked *