Radical Java

Introducing Crunch Lambda - the Java 8 way to do Big Data

This post is going to deviate from the usual format of talking about the general Java world to instead focus on a specific tool; the reason being that I contributed a lot to its development, so I feel like I should probably write something about it.

If you’ve been working in data processing in the last 10 years, you’ve probably at some point come across Google’s MapReduce paper and the open-source Apache Hadoop implementation of it. The big data world has made all kinds of technological progress since then, but there still exists a huge amount of processing work which is well suited to and continues to be performed as MapReduce workloads on Hadoop. Whilst more complex frameworks like Spark might get all the attention for trendy stuff like machine learning and graph algorithms; for ETL, aggregation and data cleaning workloads Hadoop MapReduce is still a good fit, especially when reliability at scale is important.

The trouble with Hadoop MapReduce, however, is that the API is awful.

I’ve been working with Hadoop for around 5 years now, and I’ve seen a whole range of abstractions come in and out of fashion in that time. Only one of them has managed to make me consistently more productive at building data workflows of all kinds, from one-off experiements to robust regular executions, whilst also being both typesafe and general-purpose, and that’s Apache Crunch.

Crunch

Crunch was originally written mostly by Josh Wills after leaving Google, where a similar tool based on the FlumeJava paper was in use internally (which later became Google Dataflow, part of GCP, but that’s a story for another day). Crunch is a Java API for MapReduce which lets you construct your data pipelines from meaningful primitives with an absolute minimum of boilerplate, whilst leveraging Java’s type system to provide safety and make it easy to develop generic higher-level operations. I started using it back in 2013 when I was working at Spotify, and it has continued to be my big data tool of choice right up until the present day.

Here, for example, is some Crunch code to count words in a text file and output the results into another text file:

import org.apache.crunch.*;
import org.apache.crunch.io.*;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.impl.mr.MRPipeline;

import static org.apache.crunch.types.writable.Writables.*;

public class WordCountJob implements Serializable {
    public static void main(String[] args) {
        Pipeline crunch = new MRPipeline(WordCountJob.class);
        crunch.read(From.textFile("/path/on/hdfs"))
                .parallelDo(new DoFn<String, Pair<String, Long>>() {
                    public void process(String s, Emitter<Pair<String, Long>> emitter) {
                        for (String word: s.split(" ")) {
                            emitter.emit(Pair.of(word, 1L));
                        }
                    }
                }, tableOf(strings(), longs()))
                .groupByKey()
                .combineValues(Aggregators.SUM_LONGS())
                .parallelDo(new MapFn<Pair<String, Long>, String>() {
                    public String map(Pair<String, Long> wordCount) {
                        return wordCount.first() + ":" + wordCount.second();
                    }
                }, strings())
                .write(To.textFile("/path/to/output"));
        crunch.done();
    }
}

Reading from top to bottom, we see that we’re reading a collection of Strings from a text file, then applying a DoFn which tokenizes each line into words and outputs the pair ([word], 1L) for each word, then groups by key (the word) and sums the values (the 1s) to yield pairs of unique words and their count. Then we apply a MapFn operation which formats those pairs back into String lines, then writes them to a text file. The execution is lazy, so crunch.done() tells Crunch to run everything.

Unfortunately there’s a huge confusing and misleading feeling to this code snippet. The anonymous inner classes required to express the custom operations are ugly and superfluous. We express the types of the input and output of each operation at least 3 times in each step, and it makes the actual intent of the code a lot harder to follow. This ugliness is part of the reason many data developers have been driven away from Java and towards Scala APIs such as Scalding, because it can offer much more attractive readable code, even though it comes with many of its own problems (which are beyond the scope of this post).

Java 8 to the rescue

Luckily, Java 8 gave us the gift of Lambda expressions, method references and Streams; all of which are intended specifically to make functional programming in Java more user friendly. With the rest of my code starting to look more functional and concise, the Crunch parts of the code started to look even more archaic then they did before, so I set about building a thin Java-8-friendly wrapper around the Crunch API to make the situation better. That wrapper became Crunch Lambda, which as of version 0.14.0 is now part of the main Crunch project (in the crunch-lambda submodule, artifact org.apache.crunch:crunch-lambda:0.14.0)

Mini side-note about Crunch versions: People are worried about using Crunch because it’s not version “1” yet, but Crunch version numbers are incredibly conservative. I have been using the “zero” series in production for all kinds of uses for over 3 years and I wouldn’t consider it unstable in any way (unlike many other version >1 big data products cough Spark cough ;-) ).

Crunch Lambda

Let’s revisit the above example with Crunch Lambda instead:

import org.apache.crunch.*;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.apache.crunch.lambda.Lambda;

import java.util.Arrays;

import static org.apache.crunch.types.writable.Writables.*;

public class WordCountJobLambda {
    public static void main(String[] args) {
        Pipeline crunch = new MRPipeline(WordCountJobLambda.class);
        Lambda.wrap(
          crunch.read        (From.textFile("/path/on/hdfs")))
                .flatMap     (line -> Arrays.stream(line.split(" ")), strings())
                .map         (word -> Pair.of(word, 1L), tableOf(strings(), longs()))
                .groupByKey  ()
                .reduceValues((a, b) -> a + b)
                .map         (wc -> wc.first() + ":" + wc.second(), strings())
                .write       (To.textFile("/path/to/output"));
        crunch.done();
    }
}

See how the type information for each transformation is only required once now (via the tableOf, strings, and longs invocations statically imported from Writables) and there’s no distracting superfluous information.

In the library there’s a wrapping API exposing sensible functional abstractions for the PCollection, PTable and PGroupedTable interfaces. I’m still working on the in-depth user guide, but for now there’s at least complete Javadocs for it, and you can start using it today. Just wrap your PCollection or PTable objects with Lambda.wrap(...) and use your IDE’s code completion to see what you can do.

Final note: I’ve got a brand new album out this week, so if you like chiptune music you should go listen to it here: Demoscene Time Machine - Gravity