Wednesday, November 12, 2014

Managing Spark Accumulators

Managing Accumulators

As I move from relatively simple Spark problems to significant issues such as Proteomic search - especially with sufficient processing to raise performance issues, it is important to be able to measure and track performance. 
The first issues I want to track is where (on which slave process) and how often a function is called. In a later post I will discuss uses of subclassing to instrument Functions to log and centralizing code for logging in a common base class. In this section the issue of how to keep global counts of function use can be done inside a Spark project. All code is in the Google code distributed-tools project.
Spark gives a class called Accumulator, 
In Hadoop Counters are very useful in tracking performance, A counter is an object that any portion of the code can get and increment,  The values of a counter are eventually consistent. Even current values (if available) are useful in tracking the  progress of a job.
In a Spark job with a number of  steps, Accumulators may be used to track the number of operations in each step.

Getting an Accumulator

JavaSparkContext has a method accumulator which returns an accumulator with a given name.
       JavaSparkContext currentContext = SparkUtilities.getCurrentContext();
       String accName = "My Accumulator Name";
       Accumulator<Integer> accumulator = currentContext.accumulator(0, accName );

Accumulators should be gotten in the executor code and can be serialized into functions. 

Using an Accumulator

 Accumulators may be of any type supporting an add operator although only types Integer and Double are hard coded. The accumulator has an add method taking the proper type - say an integer and will add it to the current value.

       Accumulator<Integer> numberCalls = ...
      numberCalls.add(1); // increment

Reading an Accumulator

 Accumulators may read only in the executor program. The accumulator has a value method which returns the current value of the accumulator. After RDDs are collected this should be accurate.

       Accumulator<Integer> numberCalls = ...
      int myCallCount = numberCalls.value( ); 

My Library Code

The class com.lordjoe.distributed.spark.SparkAccumulators is a class designed to make handling Accumulators simpler. The class is a singleton containing a Map from accumulator names to Accumulators. A function with access to the singleton, say through a final local variable or, as in my code a member variable set in the constructor, can use the instance to look up existing accumulator.

This needs to be called  in the executor once to initialize. It is normally called in a function which initializes the library. The call is required to make sure one object is constructed in the Executor.
Once initialized the instance can be gotten and used to create an accumulator. These are stored in the internal map.
SparkAccumulators instance = SparkAccumulators,getInstance();
instance .createAccumulator("MyAccumulator"); 

 Accumulators may be incremented using the incrementAccumulator function, There is an alternative version taking  an amount to increment - the default is 1.
instance .incrementAccumulator("MyAccumulator"); 

Use in a function

public class MyFunction implements Function<T,R>,Serializable {
       private SparkAccumulators accumulators;
      public MyFunction() {
                if(accumulators == null)
                           instance = SparkAccumulators,getInstance(); // happens in Executor
     public R call(T input) {
            // doStuff
           accumulators.incrementAccumulator()"MyAccumulator"; // keep count

Better code using inheritance to put all logging in one place will be discussed in a later blog.

No comments:

Post a Comment