Monday, November 17, 2014

Using a Complex Structure as a Spark Accumulator

In two earlier blogs I discussed the uses of Accumulators in Spark. I will continue this discussion by describing how Accumulators may be used for more complex structures. The structure I will describe keeps statistics on a double variable tracking mean, standard deviation, minimum and maximum values.
While the code was written in part to test the capabilities of accumulators, there were real problems motivating it. Accumulators are an excellent way to pull secondary, summary results from processing and RDD without interfering with the main result.
The Statistics class can accumulate statistics on data. The object is immutable. addition results in a new objects combining one or more number or one other Statistics. The code is shown below.





The code may be used in two ways - to create an accumulator or directly as shown in the following
sample using combineByKey

To create an accumulator say
    // Make an accumulators using Statistics
        final Accumulator<Statistics> totalLetters = ctx.accumulator(Statistics.ZERO, "Total Letters ", Statistics.PARAM_INSTANCE);
         // lines from word count
          JavaRDD<String> lines = ctx.textFile(args[0], 4);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(final String s) throws Exception {
                // Handle accumulator here
                totalLetters.add((long)s.length()); // count all letters
              ... // other stuff
            }
        });
     // more code
    Statistics letterStatistics = totalLetters.value();
    int  numberLetters =  letterStatistics.getNumber();
    double averageLineLength = letterStatistics.getAverage();

 When multiple keys are involved the same structure may be used in combineByKey to generate separate statistics for each key





All code for this article is available here.

No comments:

Post a Comment