Monday, November 17, 2014

Using a Complex Structure as a Spark Accumulator

In two earlier blogs I discussed the uses of Accumulators in Spark. I will continue this discussion by describing how Accumulators may be used for more complex structures. The structure I will describe keeps statistics on a double variable tracking mean, standard deviation, minimum and maximum values.
While the code was written in part to test the capabilities of accumulators, there were real problems motivating it. Accumulators are an excellent way to pull secondary, summary results from processing and RDD without interfering with the main result.
The Statistics class can accumulate statistics on data. The object is immutable. addition results in a new objects combining one or more number or one other Statistics. The code is shown below.

The code may be used in two ways - to create an accumulator or directly as shown in the following
sample using combineByKey

To create an accumulator say
    // Make an accumulators using Statistics
        final Accumulator<Statistics> totalLetters = ctx.accumulator(Statistics.ZERO, "Total Letters ", Statistics.PARAM_INSTANCE);
         // lines from word count
          JavaRDD<String> lines = ctx.textFile(args[0], 4);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(final String s) throws Exception {
                // Handle accumulator here
                totalLetters.add((long)s.length()); // count all letters
              ... // other stuff
     // more code
    Statistics letterStatistics = totalLetters.value();
    int  numberLetters =  letterStatistics.getNumber();
    double averageLineLength = letterStatistics.getAverage();

 When multiple keys are involved the same structure may be used in combineByKey to generate separate statistics for each key

All code for this article is available here.

Friday, November 14, 2014

More on Spark Accumulators

The Power of Spark Accumulators

 Spark Accumulators, discussed in an earlier blog) are massively more powerful than Hadoop counters because they support multiple types of data.  I have not seem discussions of using accumulators holding large sets of data, something that some of the classes discussed here could certainly do. The code discussed here is available here.

The only things required for an accumulator are a an AccumulatorParam instance defining how to construct a zero element and how to combine multiple instances. 

AccumulatorParam use a Long as a Counter (accumulator)

AccumulatorParam to accumulate a single string by concatenation

AccumulatorParam use a Set of Strings as an accumulator

How to use accumulators

Accumulators may be used in two ways. First, the accumulator may be created in code as a final variable in the scope of the function - this is especially useful for lambdas, functions created on line. 
The following is an illustration of this

Using an accumulator as a final local variable

Alternatively a function may be defined with an accumulator as a member variable. Here the function is defined as a class and later used. I prefer this approach to lambdas especially if significant work is done in the function.
In a later blog I will discuss using a base class for more sophisticated logging

Using an accumulator as a member variable

Wednesday, November 12, 2014

Managing Spark Accumulators

Managing Accumulators

As I move from relatively simple Spark problems to significant issues such as Proteomic search - especially with sufficient processing to raise performance issues, it is important to be able to measure and track performance. 
The first issues I want to track is where (on which slave process) and how often a function is called. In a later post I will discuss uses of subclassing to instrument Functions to log and centralizing code for logging in a common base class. In this section the issue of how to keep global counts of function use can be done inside a Spark project. All code is in the Google code distributed-tools project.
Spark gives a class called Accumulator, 
In Hadoop Counters are very useful in tracking performance, A counter is an object that any portion of the code can get and increment,  The values of a counter are eventually consistent. Even current values (if available) are useful in tracking the  progress of a job.
In a Spark job with a number of  steps, Accumulators may be used to track the number of operations in each step.

Getting an Accumulator

JavaSparkContext has a method accumulator which returns an accumulator with a given name.
       JavaSparkContext currentContext = SparkUtilities.getCurrentContext();
       String accName = "My Accumulator Name";
       Accumulator<Integer> accumulator = currentContext.accumulator(0, accName );

Accumulators should be gotten in the executor code and can be serialized into functions. 

Using an Accumulator

 Accumulators may be of any type supporting an add operator although only types Integer and Double are hard coded. The accumulator has an add method taking the proper type - say an integer and will add it to the current value.

       Accumulator<Integer> numberCalls = ...
      numberCalls.add(1); // increment

Reading an Accumulator

 Accumulators may read only in the executor program. The accumulator has a value method which returns the current value of the accumulator. After RDDs are collected this should be accurate.

       Accumulator<Integer> numberCalls = ...
      int myCallCount = numberCalls.value( ); 

My Library Code

The class com.lordjoe.distributed.spark.SparkAccumulators is a class designed to make handling Accumulators simpler. The class is a singleton containing a Map from accumulator names to Accumulators. A function with access to the singleton, say through a final local variable or, as in my code a member variable set in the constructor, can use the instance to look up existing accumulator.

This needs to be called  in the executor once to initialize. It is normally called in a function which initializes the library. The call is required to make sure one object is constructed in the Executor.
Once initialized the instance can be gotten and used to create an accumulator. These are stored in the internal map.
SparkAccumulators instance = SparkAccumulators,getInstance();
instance .createAccumulator("MyAccumulator"); 

 Accumulators may be incremented using the incrementAccumulator function, There is an alternative version taking  an amount to increment - the default is 1.
instance .incrementAccumulator("MyAccumulator"); 

Use in a function

public class MyFunction implements Function<T,R>,Serializable {
       private SparkAccumulators accumulators;
      public MyFunction() {
                if(accumulators == null)
                           instance = SparkAccumulators,getInstance(); // happens in Executor
     public R call(T input) {
            // doStuff
           accumulators.incrementAccumulator()"MyAccumulator"; // keep count

Better code using inheritance to put all logging in one place will be discussed in a later blog.

Wednesday, November 5, 2014

Spark Utilities

Spark Utilities

All code described here is in the projects at distributed-tools on the described are in the class com.lordjoe.distributed.SpareUtilities in the subproject spark-implementation

In working with Spark I find there is a need for a library of commonly used functions. I out these in a class called SparkUtilities. My general convention is classes named utilities are collections of static function. 

A major is getCurrentContext(). Because JavaSparkContexts cannot be serialized, it is not possible to pass a context into a function. The body of the function executing on a slave process will need to find a local copy of the context. If no local copy exists then one will need to be constructed, All of this is handled by getCurrentContext(). It caches a constructed context in a transient field. The transient key word will cause the field not to be serialized. The code below will cause one JavaSparkContext to be constructed per slave VM.

One important function is guaranteeSparkMaster. When running on a machine with out a cluster, the spark master will be undefined.. calling sparkConf.setMaster("local[*]"); causes the job to run in a local master (with the proper threads for the processor). This is good for debugging. The fact that code does this means there is no need to set up code or command line to run locally. If there is no cluster available getCurrentContext defaults to a local cluster.

Realize and Return - debugging Spark

Realize and Return - debugging Spark

All code described here is in the projects at distributed-tools on the described are in the class com.lordjoe.distributed.SpareUtilities in the subproject spark-implementation

Spark and Java 8 Streaming use lazy evaluation to manage operations of collections. This means that when an line like

The operations are saved but not executed until evaluation is required. While this makes operation efficient, it makes it difficult to debug. During development of small samples on a single machine running in local mode it is frequently useful to stop and look at the results before passing them to the nest step.

Theses functions require that all data be held in memory in a List - not a good idea for Bid Data seta but fine for debugging. The code does two things.
First, it forces all code to execute. This allows debugging of all the steps up to the realization and can isolate errors.
Second, all results are held in a list. Placing a break point allows the list to be examined to see if the values are reasonable.

The code below shows how realizeAndReturn can be used, Note that for any JavaRDD or JavaPairRDD the return is of the same type of the original and can serve in the code as a new value.
My general strategy is to follow each operation with a line or realizeAndReturn and comment them out as things are successful.
When problems arise the lines can be uncommented forcing more frequent evaluation and allowing a peek at intermediate results