This is a follow up of my previous article about reactive programming. That article describes a step by step approach of converting a static binary tree in a stream of events. However, only API had reactive nature. This article illustrates one of the ways how can we refactor imperative implementation into a reactive one.

The ways how to refactor code

In my opinion, there are at least two possible approaches how can one refactors any code. Both of them require suite of tests though :trollface:. The first one is a radical way, with xUnit framework facility ignore all tests except one, remove or mock old functionality, and then one by one unignore each test implementing the functionality in a new way. The second one is slowly change old code piece by piece in teeny tiny steps. They are both valid and have their own advantages and disadvantages. The first one is faster but you might be less confident in you changes, the second one is the opposite. Here, I choose to move slowly because I assume the readers aren’t familiar with reactor and that helps me to introduce it in small pieces as possible, however I assume basic knowledge of Java 8 Stream API.

Emitting events

First of all, let’s revisit the findPaths method

  public Flux<List<Integer>> findPaths(Tree tree, int sum) {
    List<List<Integer>> paths = new ArrayList<>();
    addPathRecursively(tree, new ArrayList<>(), paths, sum);
    return Flux.fromIterable(paths);
  }

The only line that has some “reactivity” is the last one. It contains a call to Flux.fromIterable(Iterable<E>) method. As our goal is to make our implementation more reactive we need to change the way how we create a Flux object. Reactor has lots of factory methods to create Flux and we need Flux.create(Consumer<? super FluxSink<E>> emitter). FluxSink is an interface that has next, complete and error methods. Thanks to Java 8 we can implement a lambda method that handles each item of paths list and pass it into Flux.create method.

Flux is an abstract class from the reactor-core artifact that represents a stream with one or more values.

  public Flux<List<Integer>> findPaths(Tree tree, int sum) {
    List<List<Integer>> paths = new ArrayList<>();
    addPathRecursively(tree, new ArrayList<>(), paths, sum);
    return Flux.create(
        emitter -> {
          paths.forEach(p -> emitter.next(p));
          emitter.complete();
        }
    );
  }
examples.ReactiveTreeTest > streamIsEmpty_whenTreeHasOnlyRoot_andRootValueNotEqualsToTargetSum PASSED

examples.ReactiveTreeTest > emptyStream_whenGivenEmptyTree PASSED

examples.ReactiveTreeTest > streamHasTwoEvents_whenTreeHasRoot_andBothLeaves_onPaths PASSED

examples.ReactiveTreeTest > streamHasSingleEvent_whenTreeHasOnlyRoot_andRootValueEqualsToTargetSum PASSED

examples.ReactiveTreeTest > streamHasSingleEvent_whenTreeHasRoot_andLeftLeaf_andValuesSumEqualsToTargetSum PASSED

examples.ReactiveTreeTest > streamHasEventsOf_rootToLeafPaths PASSED

Tests passed; we haven’t screwed anything up; let’s go further.

A List as a Stream

Next step is to get rid of the list of lists and use FluxSink instead. Rather than make one big step I will make lots of small ones. The first is moving invocation of addPathRecursively method into the lambda method.

  public Flux<List<Integer>> findPaths(Tree tree, int sum) {
    List<List<Integer>> paths = new ArrayList<>();
    // addPathRecursively(tree, new ArrayList<>(), paths, sum);
    return Flux.<List<Integer>>create(
        emitter -> {
          addPathRecursively(tree, new ArrayList<>(), paths, sum);
          paths.forEach(p -> emitter.next(p));
          emitter.complete();
        }
    );
  }

It is a good habit to rerun tests after each step if you want to be confident in your changes.

examples.ReactiveTreeTest > streamIsEmpty_whenTreeHasOnlyRoot_andRootValueNotEqualsToTargetSum PASSED

examples.ReactiveTreeTest > emptyStream_whenGivenEmptyTree PASSED

examples.ReactiveTreeTest > streamHasTwoEvents_whenTreeHasRoot_andBothLeaves_onPaths PASSED

examples.ReactiveTreeTest > streamHasSingleEvent_whenTreeHasOnlyRoot_andRootValueEqualsToTargetSum PASSED

examples.ReactiveTreeTest > streamHasSingleEvent_whenTreeHasRoot_andLeftLeaf_andValuesSumEqualsToTargetSum PASSED

examples.ReactiveTreeTest > streamHasEventsOf_rootToLeafPaths PASSED

The second one is adding FluxSink interface as an argument to addPathRecursively method and making the code compile.

  public Flux<List<Integer>> findPaths(Tree tree, int sum) {
    List<List<Integer>> paths = new ArrayList<>();
    return Flux.<List<Integer>>create(
        emitter -> {
          addPathRecursively(tree, new ArrayList<>(), paths, sum, emitter);
          paths.forEach(p -> emitter.next(p));
          emitter.complete();
        }
    );
  }

  private void addPathRecursively(Tree node, List<Integer> path, List<List<Integer>> paths, int sum, FluxSink<List<Integer>> emitter) {
    if (node != null) {
      path.add(node.value);
      if (node.left == null && node.right == null && sum(path) == sum) {
        paths.add(path);
      } else {
        addPathRecursively(node.left, new ArrayList<>(path), paths, sum, emitter);
        addPathRecursively(node.right, new ArrayList<>(path), paths, sum, emitter);
      }
    }
  }

The third one is passing path variable into emitter.next instead of inserting into paths inside the addPathRecursively method.

  private void addPathRecursively(Tree node, List<Integer> path, List<List<Integer>> paths, int sum, FluxSink<List<Integer>> emitter) {
    if (node != null) {
      path.add(node.value);
      if (node.left == null && node.right == null && sum(path) == sum) {
        // paths.add(path);
        emitter.next(path);
      } else {
        addPathRecursively(node.left, new ArrayList<>(path), paths, sum, emitter);
        addPathRecursively(node.right, new ArrayList<>(path), paths, sum, emitter);
      }
    }
  }
examples.ReactiveTreeTest > streamIsEmpty_whenTreeHasOnlyRoot_andRootValueNotEqualsToTargetSum PASSED

examples.ReactiveTreeTest > emptyStream_whenGivenEmptyTree PASSED

examples.ReactiveTreeTest > streamHasTwoEvents_whenTreeHasRoot_andBothLeaves_onPaths PASSED

examples.ReactiveTreeTest > streamHasSingleEvent_whenTreeHasOnlyRoot_andRootValueEqualsToTargetSum PASSED

examples.ReactiveTreeTest > streamHasSingleEvent_whenTreeHasRoot_andLeftLeaf_andValuesSumEqualsToTargetSum PASSED

examples.ReactiveTreeTest > streamHasEventsOf_rootToLeafPaths PASSED

Now we can remove paths from the arguments list of addPathRecursively method.

  public Flux<List<Integer>> findPaths(Tree tree, int sum) {
    List<List<Integer>> paths = new ArrayList<>();
    return Flux.<List<Integer>>create(
        emitter -> {
          //addPathRecursively(tree, new ArrayList<>(), sum, paths, emitter);
          addPathRecursively(tree, new ArrayList<>(), sum, emitter);
          paths.forEach(p -> emitter.next(p));
          emitter.complete();
        }
    );
  }

  private void addPathRecursively(Tree node, List<Integer> path, int sum,/* List<List<Integer>> paths,*/ FluxSink<List<Integer>> emitter) {
    if (node != null) {
      path.add(node.value);
      if (node.left == null && node.right == null && sum(path) == sum) {
        emitter.next(path);
      } else {
        addPathRecursively(node.left, new ArrayList<>(path), sum, emitter);
        addPathRecursively(node.right, new ArrayList<>(path), sum, emitter);
      }
    }
  }

And finally remove paths.forEach(p -> emitter.next(p)) and paths variable all together.

  public Flux<List<Integer>> findPaths(Tree tree, int sum) {
    // List<List<Integer>> paths = new ArrayList<>();
    return Flux.<List<Integer>>create(
        emitter -> {
          addPathRecursively(tree, new ArrayList<>(), sum, emitter);
          // paths.forEach(p -> emitter.next(p));
          emitter.complete();
        }
    );
  }

  private void addPathRecursively(Tree node, List<Integer> path, int sum, FluxSink<List<Integer>> emitter) {
    if (node != null) {
      path.add(node.value);
      if (node.left == null && node.right == null && sum(path) == sum) {
        emitter.next(path);
      } else {
        addPathRecursively(node.left, new ArrayList<>(path), sum, emitter);
        addPathRecursively(node.right, new ArrayList<>(path), sum, emitter);
      }
    }
  }
examples.ReactiveTreeTest > streamIsEmpty_whenTreeHasOnlyRoot_andRootValueNotEqualsToTargetSum PASSED

examples.ReactiveTreeTest > emptyStream_whenGivenEmptyTree PASSED

examples.ReactiveTreeTest > streamHasTwoEvents_whenTreeHasRoot_andBothLeaves_onPaths PASSED

examples.ReactiveTreeTest > streamHasSingleEvent_whenTreeHasOnlyRoot_andRootValueEqualsToTargetSum PASSED

examples.ReactiveTreeTest > streamHasSingleEvent_whenTreeHasRoot_andLeftLeaf_andValuesSumEqualsToTargetSum PASSED

examples.ReactiveTreeTest > streamHasEventsOf_rootToLeafPaths PASSED

Source code

The source code for this article you can find here

Wrap up

By doing this small refactoring we reach our goal of having more reactive style in our implementation. Also, we got another benefit, instead of accumulating events in a list we are emitting them immediately.