Friday, February 13, 2015

Using KryoSerializer in SPARK

Using KryoSerializer in SPARK

There is a general consensus that Kryo is a faster serializer than standard Java serialization. In my first months of using Spark I avoided Kryo serialization because Kryo requires all classes that will be serialized to be registered before use. The default errors when this is not done properly happen at read time with little indication as to the problem class.
My initial approach was to use Java serialization.. This is set by executing.
SparkConf  sparkConf = ...sparkConf.set("spark.serializer","org.apache.spark.serializer.JavaSerializer");

When you attempt to run code (I recommend running a small data set locally until all issues are resolved). All classes used by the JavaSerializer must implement the tagging interface java.io.Serializable . My experience is that many classes will not and this capability must be added to your code. Remember the classes which were changed because if and when the code is switched to use Kyro these classes will need to be registered.

Eventually the Spark code will work using Java serialization. I do not recommend optimizations like using Kryo until there is confidence that the code will run under SPARK as there are many other issues to resolve and optimization should not be a major early concern.

Once code is running well using JavaSerializer the developer might switch code to Kryo. 



KryoRegistrar Class

Before Kryo can be used there needs to be a class implementing KryoRegistrator and registered as the spark.kryo.registrator. The class I use is listed below omitting only most of a long list of classes registered. the critical method is registerClasses. I added helper methods allowing classes to be registered by Class or full name and registering Arrays of  registered classes since Kryo seems to need arrays registered as well.
Once a registrar is created, add lines of code to register all classes JavaSerializer  complained about (you did save these didn't you?).  Next run the program. It will crash with a KryoException but the message will name the unregistered class and show code to register it. Using a small sample in local mode continue running, crashing and registering the offending class in local mode until Kryo is happy and the sample runs. At this point the code should run properly on the cluster. My experience with a reasonably large and complex program is an hour running with a good development environment , I use IntelliJ,  should suffice to find and register all needed classes.



My preliminary data suggest that using Kryo reduces the run time of one of my larger samples from about 60 to around 40 minutes on a 16 node cluster. Your results may vary.

Monday, November 17, 2014

Using a Complex Structure as a Spark Accumulator

In two earlier blogs I discussed the uses of Accumulators in Spark. I will continue this discussion by describing how Accumulators may be used for more complex structures. The structure I will describe keeps statistics on a double variable tracking mean, standard deviation, minimum and maximum values.
While the code was written in part to test the capabilities of accumulators, there were real problems motivating it. Accumulators are an excellent way to pull secondary, summary results from processing and RDD without interfering with the main result.
The Statistics class can accumulate statistics on data. The object is immutable. addition results in a new objects combining one or more number or one other Statistics. The code is shown below.





The code may be used in two ways - to create an accumulator or directly as shown in the following
sample using combineByKey

To create an accumulator say
    // Make an accumulators using Statistics
        final Accumulator<Statistics> totalLetters = ctx.accumulator(Statistics.ZERO, "Total Letters ", Statistics.PARAM_INSTANCE);
         // lines from word count
          JavaRDD<String> lines = ctx.textFile(args[0], 4);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(final String s) throws Exception {
                // Handle accumulator here
                totalLetters.add((long)s.length()); // count all letters
              ... // other stuff
            }
        });
     // more code
    Statistics letterStatistics = totalLetters.value();
    int  numberLetters =  letterStatistics.getNumber();
    double averageLineLength = letterStatistics.getAverage();

 When multiple keys are involved the same structure may be used in combineByKey to generate separate statistics for each key





All code for this article is available here.

Friday, November 14, 2014

More on Spark Accumulators

The Power of Spark Accumulators

 Spark Accumulators, discussed in an earlier blog) are massively more powerful than Hadoop counters because they support multiple types of data.  I have not seem discussions of using accumulators holding large sets of data, something that some of the classes discussed here could certainly do. The code discussed here is available here.

The only things required for an accumulator are a an AccumulatorParam instance defining how to construct a zero element and how to combine multiple instances. 


AccumulatorParam use a Long as a Counter (accumulator)



AccumulatorParam to accumulate a single string by concatenation



AccumulatorParam use a Set of Strings as an accumulator



How to use accumulators

Accumulators may be used in two ways. First, the accumulator may be created in code as a final variable in the scope of the function - this is especially useful for lambdas, functions created on line. 
The following is an illustration of this

Using an accumulator as a final local variable




Alternatively a function may be defined with an accumulator as a member variable. Here the function is defined as a class and later used. I prefer this approach to lambdas especially if significant work is done in the function.
In a later blog I will discuss using a base class for more sophisticated logging

Using an accumulator as a member variable



Wednesday, November 12, 2014

Managing Spark Accumulators

Managing Accumulators

As I move from relatively simple Spark problems to significant issues such as Proteomic search - especially with sufficient processing to raise performance issues, it is important to be able to measure and track performance. 
The first issues I want to track is where (on which slave process) and how often a function is called. In a later post I will discuss uses of subclassing to instrument Functions to log and centralizing code for logging in a common base class. In this section the issue of how to keep global counts of function use can be done inside a Spark project. All code is in the Google code distributed-tools project.
Spark gives a class called Accumulator, 
In Hadoop Counters are very useful in tracking performance, A counter is an object that any portion of the code can get and increment,  The values of a counter are eventually consistent. Even current values (if available) are useful in tracking the  progress of a job.
In a Spark job with a number of  steps, Accumulators may be used to track the number of operations in each step.

Getting an Accumulator

JavaSparkContext has a method accumulator which returns an accumulator with a given name.
       JavaSparkContext currentContext = SparkUtilities.getCurrentContext();
       String accName = "My Accumulator Name";
       Accumulator<Integer> accumulator = currentContext.accumulator(0, accName );

Accumulators should be gotten in the executor code and can be serialized into functions. 

Using an Accumulator

 Accumulators may be of any type supporting an add operator although only types Integer and Double are hard coded. The accumulator has an add method taking the proper type - say an integer and will add it to the current value.

       Accumulator<Integer> numberCalls = ...
      ...
      numberCalls.add(1); // increment

Reading an Accumulator

 Accumulators may read only in the executor program. The accumulator has a value method which returns the current value of the accumulator. After RDDs are collected this should be accurate.

       Accumulator<Integer> numberCalls = ...
      ...
      int myCallCount = numberCalls.value( ); 

My Library Code

The class com.lordjoe.distributed.spark.SparkAccumulators is a class designed to make handling Accumulators simpler. The class is a singleton containing a Map from accumulator names to Accumulators. A function with access to the singleton, say through a final local variable or, as in my code a member variable set in the constructor, can use the instance to look up existing accumulator.

This needs to be called  in the executor once to initialize. It is normally called in a function which initializes the library. The call is required to make sure one object is constructed in the Executor.
SparkAccumulators.createInstance()
Once initialized the instance can be gotten and used to create an accumulator. These are stored in the internal map.
SparkAccumulators instance = SparkAccumulators,getInstance();
instance .createAccumulator("MyAccumulator"); 

 Accumulators may be incremented using the incrementAccumulator function, There is an alternative version taking  an amount to increment - the default is 1.
instance .incrementAccumulator("MyAccumulator"); 


Use in a function

public class MyFunction implements Function<T,R>,Serializable {
       private SparkAccumulators accumulators;
      public MyFunction() {
                if(accumulators == null)
                           instance = SparkAccumulators,getInstance(); // happens in Executor
     } 
     public R call(T input) {
            // doStuff
           accumulators.incrementAccumulator()"MyAccumulator"; // keep count
     }
}

Better code using inheritance to put all logging in one place will be discussed in a later blog.





Wednesday, November 5, 2014

Spark Utilities

Spark Utilities

All code described here is in the projects at distributed-tools on code.google.com. the described are in the class com.lordjoe.distributed.SpareUtilities in the subproject spark-implementation

In working with Spark I find there is a need for a library of commonly used functions. I out these in a class called SparkUtilities. My general convention is classes named utilities are collections of static function. 

A major is getCurrentContext(). Because JavaSparkContexts cannot be serialized, it is not possible to pass a context into a function. The body of the function executing on a slave process will need to find a local copy of the context. If no local copy exists then one will need to be constructed, All of this is handled by getCurrentContext(). It caches a constructed context in a transient field. The transient key word will cause the field not to be serialized. The code below will cause one JavaSparkContext to be constructed per slave VM.

One important function is guaranteeSparkMaster. When running on a machine with out a cluster, the spark master will be undefined.. calling sparkConf.setMaster("local[*]"); causes the job to run in a local master (with the proper threads for the processor). This is good for debugging. The fact that code does this means there is no need to set up code or command line to run locally. If there is no cluster available getCurrentContext defaults to a local cluster.

Realize and Return - debugging Spark

Realize and Return - debugging Spark

All code described here is in the projects at distributed-tools on code.google.com. the described are in the class com.lordjoe.distributed.SpareUtilities in the subproject spark-implementation

Spark and Java 8 Streaming use lazy evaluation to manage operations of collections. This means that when an line like


The operations are saved but not executed until evaluation is required. While this makes operation efficient, it makes it difficult to debug. During development of small samples on a single machine running in local mode it is frequently useful to stop and look at the results before passing them to the nest step.


Theses functions require that all data be held in memory in a List - not a good idea for Bid Data seta but fine for debugging. The code does two things.
First, it forces all code to execute. This allows debugging of all the steps up to the realization and can isolate errors.
Second, all results are held in a list. Placing a break point allows the list to be examined to see if the values are reasonable.

The code below shows how realizeAndReturn can be used, Note that for any JavaRDD or JavaPairRDD the return is of the same type of the original and can serve in the code as a new value.
My general strategy is to follow each operation with a line or realizeAndReturn and comment them out as things are successful.
When problems arise the lines can be uncommented forcing more frequent evaluation and allowing a peek at intermediate results


Sunday, May 19, 2013

Programatically finding the Hadoop version code is using


In porting a program from Hadoop 0.2 to 1.0 I had the problem that the code might
care about the current Hadoop version and there was minimal documentation on
how this might be accomplished
 The package org.apache.hadoop has an annotation shiowing the version but I had nver worked with
package annotations before -
The following code creates an enum with Hadoop major versions and determines which one is in use

================================================================

package org.systemsbiology.hadoop;

import org.apache.hadoop.*;

/**
 * org.systemsbiology.hadoop.HadoopMajorVersion
 *   enumeration of major Hadoop versions
 * @author Steve Lewis
 * @date 19/05/13
 */
public enum HadoopMajorVersion {
    Version0("0.2"), Version1("1."), Version2("2.");

    private final String m_StartText;

    private HadoopMajorVersion(String startText) {
        m_StartText = startText;
    }

    public String getStartText() {
        return m_StartText;
    }

    /**
     * figure out the major hadoop version by looking at     HadoopVersionAnnotation in
     * package
     * @return !null majoe version
     * @throws  IllegalStateException if presented with a version it does not understand
     */
    public static HadoopMajorVersion getHadoopVersion() {
        // force the class loader to load a class in the package so we can read the package
        Class hadoopVersionAnnotation = HadoopVersionAnnotation.class; // make cure the class loader know about packages
        Package aPackage = Package.getPackage("org.apache.hadoop");
        HadoopVersionAnnotation annotation = (HadoopVersionAnnotation) aPackage.getAnnotation(HadoopVersionAnnotation.class);
        String version = annotation.version();
        HadoopMajorVersion[] versions = values();
        for (HadoopMajorVersion v : versions) {
             if(version.startsWith(v.getStartText()))
                 return v;
        }
        throw new IllegalStateException("Unknown Hadoop version " + version);
    }
}

Thursday, January 17, 2013

Window 8 Metro DOES Suck


In my last post I described how I had installed Windows 8 on several of my systems and I really like the way the desktop behaved. I had heard a lot of bad things about this operating system and I realized after playing with it that all the bad things had to do with the Metro interface. Technically, since Microsoft has decided that Metro is not the real name for this interface and has yet to supply an alternative, we should call this "the interface formerly known as Metro" but I will simply call it Metro.

Metro is version 1 from Microsoft and as all version 1 products, particularly from Microsoft it is very bad. Many of the bad behaviors have been described in great detail by others so I will not go into them. There are many little things like when you scroll the mouse you expect things to scroll vertically but in Metro they scroll horizontally. Why, who can say but presumably you are supposed to use a touch interface which most existing computers don't have. Metro has no concept of a right-click and many places where you would reasonably expect a menu to pop up for you click in fact the menu pops up somewhere else on the screen forcing you to click and then moved to deal with it. The icons on the Metro interfaces come in only two sizes: square and roughly twice the width of a square at the same height. The number of rows of icons seems to be independent of screen size. Having a fixed size is only one of many limitations for Metro apps which end up making them essentially toys on a system like my desktop.

 Metro is a response to a problem that Microsoft has. Microsoft has no presence in the huge and rapidly growing phone and tablet market. Virtually all of the phones and tablets being sold in this country either run Apple's operating system will Google's. Microsoft is coming to the party about five years too late and is desperately playing catch-up.  The only way that Microsoft can establish a credible presence in the phone and tablet market is by exploiting its virtual monopoly on the desktop in order to cause developers to write applications that will work well on phones and tablets. If developers see a huge Windows market and develop cool applications that will also run on Windows tablets and phones then Microsoft can establish itself as a major presence in this market. The conclusion is that it is a good thing for Microsoft to have your computer behaved like a tablet.

The problem is that consumers do not want their computer to behave like a tablet, they want their tablet to behave like a computer. I recently took a five-week trip to Europe in which my only computer was a very sophisticated android tablet, the Asus transformer. The transformer looks a lot like a laptop with an attached keyboard loaded with batteries. What I discovered in using this as my only system for five weeks is that a tablet in many cases thinks more like a phone than a computer. Selecting text if you were typing or simply wanting to copy a string in Polish from a webpage into a search engine is extremely difficult with touch but totally trivial with a mouse. My tablet would not let me select multiple files to include in an email message with a single multiple selection nor would it let me send an email to a group of users. There is no good reason why either of these operations is not supported except that the tablet software was originally written for a phone.

Many websites treat a tablet as if it were a phone rather than a laptop even though the space available is similar and the experience can be quite disappointing. My conclusion at the end of this was that I wanted my tablet to behave more like a computer and not the other way around. Metro's view of the world is very very different.
   Others have described metro as a train wreck.

Windows 8 Does NOT Suck


This is going to be a very short blog and I am going to continue it later on when I have more time. My project over the past week was to install and play with Windows 8. I had expected to be very disappointed because I have heard many stories about how confusing Windows 8 was and indeed had participated in some of the events which led me to have low expectations for this product.

I distrusted Windows 8 so much that I did not even attempted to install it on one of my real machines. I created a virtual machine on my laptop and installed Windows 8 there. This turned out to be a huge mistake. My laptop is old, it has only 3 GB of memory and two processors. This is enough to run Windows 8 but not enough to run Windows 7 together with a virtual machine running Windows 8. The performance was dreadful but it did give me a chance to play with the product. I learned enough during a couple of days to decide, especially since the performance was so terrible that I ought to give it a chance on my desktop machine.

Before I came to this decision I had learned enough about the product to be comfortable using it. I installed a product that I had been using in the past on Windows 7 called classic shell which restores a start menu similar to that found in XP. I have no idea why Microsoft decided to remove the start menu and have never been happy with the changes that they made since Windows 2000 so the classic start menu fit my needs perfectly. As a nice addition there is an option to cause Windows 8 to boot directly to the desktop without stopping at the Metro interface. This meant that I never had to deal with Metro and in my opinion that was a good thing since everything that people complain about with Windows 8 is really a complaint about Metro. I will say a lot more about Metro in my next blog posting but this posting is really about how Windows 8 does not suck..

I gave the system a real chance. I installed it on my desktop machine, a system that I have been using with various modifications for the past 10 years. In its latest incarnation the machine has three screens, six processors and 16 GB of memory. This is not a small system and there are literally hundreds of applications that have been installed over the years. I had no intention on reinstalling them.

I gave Windows 8 a partition on a drive different from the one that I had been using. I made a copy of my Windows 7 partition on the new drive. I then unplugged all other drives so that the Windows 8 installation would not have any opportunity to make changes on my production system. I made sure that the drive booted and ran properly and then I installed Windows 8.

I then installed classic shell and said it so that on boot you went straight to the desktop. I was very pleasantly surprised. Windows 8 recognized my three screens and use them immediately when the installation booted up. All my applications where there and my desktop look just like it did before the installation. Classic start menu was even smart enough to copy everything that I was doing in Windows 7. Every single application that I used in Windows 8 on the desktop has run properly. This includes things like Dragon NaturallySpeaking, Chaneru a video file server. VLC Player a good open source video player, of course Visual Studio and Microsoft Office, IntelliJ is a good job the development environment all work properly, the first time and right out of the box.

My original plan was to install Windows 8 and to play with it and go back to Windows 7. After playing with it for a couple of days I decided that it was worth keeping and would become my main production environment. I was surprised by this decision because I have heard a huge number of complaints about Windows 8 but my conclusion was that these were not really complaints about the operating system itself and its desktop but rather complaints about the Metro interface and a poorly conceived attempt to make your PC behave like a laptop. I will cover this more in the next posting

Friday, April 20, 2012

Talking to Elastic Map Reduce Jobs

Clusters on EC2 are like clusters on a local set of machines with one important exception - the ports for accessing the Hadoop UI are normally closed. to open then open the AWS Management console. Select Security Groups and click ElasticMapReduce-master.
Now create a custom TCP rule - the port range is 9000-9103. I would set the source to 0.0.0.0/0 unless you know your IP address and know it will  not change. Hit the add rule button and you will see a rule (shown as the last visible rule in the lower right hand panel.

Once the security group is set up you can talk to a Hadoop 
You will find a public DNS by selecting the job and looking at the description page as shown above.
Then talk to the the job tracker on port 9100 as shown below.


Saturday, November 13, 2010

The Instability of Hadoop 0.21

The bottom line of this post is that Hadoop 0.21 broke the usual rules for upgrading an existing library. Because of changes in the basic implementation of major classes in the system there is no guarantee that a well written program in version 0.20.2 will execute properly in a Hadoop 0.21 environment or, of course vice versa.

When Hadoop transitioned from 0.18 to 0.20 there were radical changes in the implementation of major classes. Classes like Mapper, Reducer and associated contexts were required to be subclasses of well known base classes rather than merely interfaces with appropriate sample implementations. This was, in my opinion, a poor decision a point discussed in more detail below. When this change was made, all of the new code was placed in a new and separate package. Older code was in the package org.apache.hadoop.mapred while new code was in the package org.apache.hadoop.maprededuce. The net effect of this design, not changing the functionality of older code but extending the libraries with new packages and classes was that any properly written application written using the 0.18 libraries would continue to run in the same manner when run under an 0.20.1 or an 0.20.2 or even (I presume) an 0.21 system.

It is a general and good principle of library design to design upgrades to not break older code. A few principles are involved. First, never change public interfaces. Any change to an interface, including adding new methods will break existing implementations. If new methods must be added to an interface, the usual pattern looks like this.

interface MyInterface2 extends MyInterface {

{

      < add new methods>

}

Public classes may occasionally have add methods and data (never abstract methods) cautiously since there is reasonable assurance that older code will not call those methods.

The general rule is to never, never make significant changes to public classes and interfaces since it is highly likely that those changes will break existing code.

These rules were violated when Hadoop moved to version 0.21. In Hadoop 0.20 there was a decision to change many interfaces would be replaced with concrete classes. This was a poor decision since it makes overriding a a class to add new functionality quite difficult whereas with a interface construction of a simple proxy is extremely straightforward. In Hadoop 0.21 this decision was altered and many major classes including Mapper.Context, Reducer.Context and their base classes TaskAttemptContext and JobContext have been transmogrified from concrete classes into interfaces.

What this means is that any code which subclasses any of these classes in 0.20 Hadoop is guaranteed to break when run with the Hadoop 0.21 libraries. The converse also applies, any code implementing these interfaces in 0.21 will not run with the 0.20 libraries (not so unexpected) .

In fifteen years of programming Java and seeing libraries evolve I have never seen such a violation of the desirability of making earlier versions of the libraries compatible with later versions. Never has there been such a major change in the functioning or well known and universally used classes. The fact that any 0.20 code works under 0.21 is a coincidence of the structure if a class file which allows a class with the same name and methods to work properly when changed from a concrete class to an interface.

It is also the case that clusters configured with 0.21 must be rebuilt to convert to 0.20 with the removal of all content in HDFS.

The conclusion is

1) Consider carefully a move to 0.21 since there is no guarantee that a well written class in 0,20 will run in a cluster using the 0.21 libraries. Curiously a well written class using the 0.18 libraries has a much better chance of working well.

2) Avoid subclassing Context or its superclasses as none of this code will port to 0.21.

Friday, October 29, 2010

Handling Statistics

A common problem we find in dealing with many kinds of data is that information which when accumulated is known by all of the mappers needs to be used in the reduce step. Let us consider a couple of examples of this problem. In the classic word count example the mapper in its words as keys and counts as values and the reducer sums the count and returns the word followed by its count. A variation on this problem would have the will of the reducer emit the word, its count and the fraction of all words of that length represented by this particular word. So, for example, we might see the output for the word ‘a’ being

A    4581    0.62702

indicating that A occurred 4581 times and that this represents 62% of all words of length 1.

While the example might seem somewhat trivial, The problem is very common. In processing biological data we frequently want the probability that an observation is greater than a particular value which is basically the number of observations greater than or equal to that value divided by the total observations.

In the word count/word frequency sample, It is straightforward for each mapper to keep a table tracking the number of words of a specific length seen which means at the end of the map step the system as a whole knows the counts for each word length.

While a second stage of map-reduce might allow the totals to be computed before further processing, this stage is unnecessary.

A solution needs to deal with two issues. First, every reducer need access to information which is accumulated in each mapper. This requires that every mapper send a copy of what it knows to every reducer. The second problem is that the reducers need to acquire this information before processing any data. This means that not only must the data be sent to each reducer but it must be sent with a key which is not only distinct but sorts ahead of any other key.

I modified the WordCount Sample on the following ways.

1) In the mapper for words of length less than 100, the number of words of each length is counted.

2) In the cleanup method the counts are emitted from each mapper N times with keys which are 1) designed to sort ahead or all other keys (starting with “    “) 2) are numbered so one copy from every mapper goes to every reducer.

3) In the reducer the broadcast keys are separated and handled by a special processor which accumulates a grand total in each reducer.

4) The reducer divides the count by the total words of that length to get a frequency which is added to the output -

The reducers output looks like

ABATED    3    0.00018
ABHORRENT    1    0.00015
ABODE    1    0.00004
ABOUTHOWEVER    1    0.00078
ABSTRACTED    1    0.00026

The code is

==========================================

Wednesday, August 11, 2010

Building a Hadoop Jar File

Hadoop processes work by sending a jar file to each machine in a Hadoop cluster. The jar file contains all of the code, the resources and the libraries required by the Hadoop process. Normally the way I develop is to get my job running using a local Hadoop instance with a single processor and when the code runs properly on a single instance deploying at to a cluster.

Usually it requires a significant amount of testing and debugging on the target cluster before a job runs successfully. This means that it is necessary to rapidly create and deploy new jar files during the testing and debugging phase. Eventually, the last successful jar file may be deployed to a cluster for production work. In my development process you want building and deploying a jar file to be a rapid and mindless operation.

A Hadoop tar file is a standard Java jar with one exception. All of the libraries used by the code are in a top level directory called lib. Any class files in the class path outside of jars can simply be copied into the jar file with the appropriate directory structure.

The easiest way to create a jar is to run a Java application using the same classpath that was used when the application was tested in a single machine cluster. Any directories found in the classpath can simply have their contents copied into the jar file. Any jars in the class path may be added in a directory called lib. The only problem with this approach is that it means that jars which may not be needed by the application war which are were already present on the target system such as the standard Hadoop jars will be needlessly copied. What what the code listed below does is to maintain a blacklist of jars that are not required in a deployed application. Usually this list is well-known to a developer and rarely changes.

The following code will generate a deployable Hadoop jar assuming

package org.systemsbiology.aws;

import java.io.*;
import java.util.*;
import java.util.zip.*;

/**
* org.systemsbiology.aws.HadoopDeployer
* build a hadoop jar file
*
* @author Steve Lewis
* @date Apr 3, 2007
*/
public class HadoopDeployer {
    public static HadoopDeployer[] EMPTY_ARRAY = {};
    public static Class THIS_CLASS = HadoopDeployer.class;

    public static final String[] EEXCLUDED_JARS_LIST = {
             "hadoop-0.20.2-core.jar",
            "slf4j-log4j12-1.4.3.jar",
            "log4j-1.2.15.jar",
             "junit-4.8.1.jar"

              // … add other jars to exclude
    };

    public static final Set<String> EXCLUDED_JARS = new HashSet(Arrays.asList(EEXCLUDED_JARS_LIST));

    /**
     * Turn the classpath into a list of files to add to the jar
     * @param pathItems !null list is classpath items - files and directories
     * @param javaHome   !null java diretory - things in javaHome are automatically excluded
     * @return
     */
    public static File[] filterClassPath(String[] pathItems, String javaHome) {
        List holder = new ArrayList();

        for (int i = 0; i < pathItems.length; i++) {
            String item = pathItems[i];
            if (".".equals(item))
                continue; // ignore
            if (inExcludedJars(item))
                continue;  // ignore  excluded jars
            if (item.indexOf(javaHome) > -1)
                continue;  // ignore java jars
            File itemFile = new File(item);
            if (!itemFile.exists())
                continue; // ignore non-existant jars
            if (itemFile.isFile()) {
                holder.add(itemFile);
                continue;
            }
            if (itemFile.isDirectory()) {
                continue; // ignore directories
            }

        }
        File[] ret = new File[holder.size()];
        holder.toArray(ret);
        return ret;
    }

    /**
     * get a list of directories in the classpath
     * @param pathItems !null list of items
     * @param javaHome   !nu; java home
     * @return !Null list of existing files
     */
    public static File[] filterClassPathDirectories(String[] pathItems, String javaHome) {
        List holder = new ArrayList();
        for (int i = 0; i < pathItems.length; i++) {
            String item = pathItems[i];
            if (".".equals(item))
                continue;    // ignore  .
            if (EXCLUDED_JARS.contains(item))
                continue;    // ignore  excluded jars
            if (item.indexOf(javaHome) > -1)
                continue;   // ignore java jars
            File itemFile = new File(item);
            if (!itemFile.exists())
                continue;   // ignore non-existant jars
            if (itemFile.isFile()) {
                continue;   // ignore files
            }
            if (itemFile.isDirectory())
                holder.add(itemFile);
        }

        File[] ret = new File[holder.size()];
        holder.toArray(ret);
        return ret;
    }

    /**
     * true if s is the name of an excluded jar
     * @param s !null name
     * @return  as above
     */
    protected static boolean inExcludedJars(String s) {
        for (int i = 0; i <  EEXCLUDED_JARS_LIST.length; i++) {
            String test =  EEXCLUDED_JARS_LIST[i];
            if (s.endsWith(test))
                return true;
        }
        return false;
    }

    /**
     * copy jars to a lib directory
     * @param out !null open output stream
     * @param libs   !null file list - should be jar files
     * @throws IOException on error
     */
    public static void copyLibraries(ZipOutputStream out, File[] libs) throws IOException {
        for (int i = 0; i < libs.length; i++) {
            File lib = libs[i];
            final String name = "lib/" + lib.getName();
            System.out.println(name);
            ZipEntry ze = new ZipEntry(name);
            out.putNextEntry(ze);
            copyFile(lib, out);
            out.closeEntry();
        }
    }

    /**
     *
     * @param src !null  destination file
     * @param dst  !null open output stream
     * @return  true if no problem
     */
    public static boolean copyFile(File src, ZipOutputStream dst) {
        int bufsize = 1024;
        try {
            RandomAccessFile srcFile = new RandomAccessFile(src, "r");
            long len = srcFile.length();
            if (len > 0x7fffffff) {
                return (false);
            }
            // too large
            int l = (int) len;
            if (l == 0) {
                return (false);
            }
            // failure - no data

            int bytesRead = 0;
            byte[] buffer = new byte[bufsize];
            while ((bytesRead = srcFile.read(buffer, 0, bufsize)) != -1) {
                dst.write(buffer, 0, bytesRead);
            }
            srcFile.close();
            return true;
        }
        catch (IOException ex) {
            return (false);
        }
    }

    /**
     * Create a deployable Jar as jarFile
     * @param jarFile !null creatable file for the Jan
     */
    public static void deployLibrariesToJar(File jarFile) {
        try {
            ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile));

            String javaHome = System.getProperty("java.home");
            String classpath = System.getProperty("java.class.path");
            String[] pathItems = null;
            if (classpath.contains(";")) {
                pathItems = classpath.split(";");  // windows
            }
            else {
                if (classpath.contains(":")) { // linux
                    pathItems = classpath.split(":");   // Linux stlye
                }
                else {
                    String[] items = {classpath};
                    pathItems = items; // only 1 I guess
                }
            }
            File[] pathLibs = filterClassPath(pathItems, javaHome);
            copyLibraries(out, pathLibs);
            File[] pathDirectories = filterClassPathDirectories(pathItems, javaHome);
            for (int i = 0; i < pathDirectories.length; i++) {
                File pathDirectory = pathDirectories[i];
                copyLibraryDirectory("", pathDirectory, out);
            }
            out.flush();
            out.close();

        }
        catch (IOException e) {
            throw new RuntimeException(e);

        }
    }

    /**
     * make a path string adding name to
     * @param path !null current path
     * @param name  !null name - uaually a subdirctory
     * @return
     */
    public static String nextPath(String path, String name) {
        if (path == null || path.length() == 0)
            return name;
        return path + "/" + name;
    }

    /**
     * Copy a library - if a
     * @param s path - really used only when creating a directory path
     * @param dir !null existing file or directory
     * @param pOut  !null open output stream
     * @throws IOException
     */
    private static void copyLibraryDirectory(final String s, final File dir, final ZipOutputStream pOut) throws IOException {
        File[] list = dir.listFiles();
        if (list == null) return;
        for (int i = 0; i < list.length; i++) {
            File file = list[i];
            if (file.isDirectory()) {
                final String np = nextPath(s, file.getName());
                copyLibraryDirectory(np, file, pOut);
            }
            else {
                final String np = nextPath(s, file.getName());
                ZipEntry ze = new ZipEntry(np);
                pOut.putNextEntry(ze);
                copyFile(file, pOut);
                pOut.closeEntry();
            }
        }
    }

    /**
     * create a deployable Hadoop jar using the existing classpath
     * @param pJarName !null name of a file that is creatable
     */
    public static void makeHadoopJar(final String pJarName) {
        File deployDir = new File(pJarName);
        deployLibrariesToJar(deployDir);
    }

    /**
     * Sample use
     * @param args    ignored
     */
    public static void main(String[] args) {
        String jarName = "FooBar.jar";
        if(args.length > 0)
            jarName = args[0];
        makeHadoopJar(jarName);

    }
}

Accessing Local Files

In most examples of Hadoop code there is no reason to access a local file system. All of the data is passed using the standard map and reduce methods. Indeed it is usually a bad idea to access a local filesystem on a slave processor because that data will not be persisted from one processing step to the next. Sometimes, however, these rules have to change.

One case where it is particularly necessary to access a local filesystem is where a critical step in either the mapper or the reducer involves launching a separate process where the application assumes the existence of certain local files. Normally when Hadoop uses an external process it uses Hadoop streaming which assumes that the external process takes all of its data from standard in and sends all of its output to standard out. These assumptions may fail under several conditions. First, the external process may require more than one input. For example, one or more configuration files may be required. In addition, it assumes that the developer has sufficient control over the external process and the way it functions to make it compatible with Hadoop streaming.

In many cases these assumptions may be unrealistic. Nothing prevents a custom mapper or reducer from writing appropriate files on the local filesystem for an external program and after watching that program reading any output files that have been written.

There are two ways to get files to the local filesystem of a slave processor. One is to use Hadoop's distributed cache which will send files specified in the job configuration to each slaves local file system. The distributed cache will be a topic of another blog entry. This entry will concentrate on reading and writing local files. The alternative is to have these slave process right the files directly to the file system. Files which will be required during all steps of processing may be written to a local filesystem during the setup phase. Files required only for a single step of processing may be written during that step and, if no longer required, deleted at the end of that step.

. Hadoop supplies a LocalFileSystem object which manages the relationship to the local file system. The code below shows how to get a LocalFileSystem  given a Hadoop context.

Configuration configuration = context.getConfiguration();
LocalFileSystem localFs = FileSystem.getLocal(configuration);

The LocalFileSystem  has methods create, delete, open and append to files on the local filesystem. Each file is designated by a Path. In my work I have made these Paths relative since I am uncertain about where a program is running or what permissions are available on a slave processer.

 

The following code is a set of static utility routines that write to the local filesystem. I consider three cases in the first the data is a string, possibly the contents of a Text object passed in. In the second, the contents are a resource passed in with a custom jar file. Resources are very convenient when data must be passed to every instance and where the data is not large relative to the size of the jar file. Both of these end up calling a routine which writes the contents of an InputStream to the local file system. This allows a third possibility where the data source is anything that can supply an input stream, very specifically Web services and other data sources. Will I

/**
* write a resource to a  LocalFileSystem
* @param cls - class holding the resource
* @param resourceName - !null name of the resource
* @param localFs - !null file system
* @param dstFile - !null local file name - this will become a path
*/
public static void writeResourceAsFile(Class cls,String resourceName, LocalFileSystem localFs, String dstFile) {
     InputStream inp = cls.getResourceAsStream(resourceName);
     writeStreamAsFile(localFs, dstFile, inp);
}

/**
* Write the contents of a stream to the local file system
* @param localFs  - !null file system
* @param dstFile - !null local file name - this will become a path
* @param pInp - !null open Stream
*/
public static void writeStreamAsFile(final LocalFileSystem localFs, final String dstFile, final InputStream pInp) {
    Path path = new Path(dstFile);
    try {
        FSDataOutputStream outStream = localFs.create(path);
        copyFile(pInp, outStream);
    }
    catch (IOException e) {
        throw new RuntimeException(e);

    }
}

/**
* Write the contents of a String to the local file system
* @param localFs  - !null file system
* @param dstFile - !null local file name - this will become a path
  * @param s  !null String
*/
public static void writeStringAsFile(final LocalFileSystem localFs, final String dstFile, final String s) {
    ByteArrayInputStream inp = new ByteArrayInputStream(s.getBytes());
     writeStreamAsFile(localFs,dstFile, inp);
}

/**
* copy an  InputStream to an outStream
* @param inp - !null open Stream it will be closed at the end
* @param outStream !null open Stream it will be closed at the end
* @return  true on success
*/
public static boolean copyFile(InputStream inp, FSDataOutputStream outStream) {
     int bufsize = 1024;
     try {
         // failure - no data

         int bytesRead = 0;
         byte[] buffer = new byte[bufsize];
         while ((bytesRead = inp.read(buffer, 0, bufsize)) != -1) {
             outStream.write(buffer, 0, bytesRead);
         }
         inp.close();
         outStream.close();
         return true;
     }
     catch (IOException ex) {
         return (false);
     }
}

Customized Splitters and Readers

A basic problem with Hadoop systems is acquiring input. The input to a Hadoop job is usually one large file, many small files or multiple larger files. The first question to ask in designing a Hadoop job is how the input is to be split.the mapper receives a split of the input file, which is to say the input is broken into smaller chunks for processing by the mapper. The size and meaning of these chunks is determined by the character of the problem that you are trying to solve. There are two common cases that are handled by splitters in the Hadoop distribution. The first case is a text file in which a record is a single line of text. This case is appropriate for parsing log files, much webpage content and other text based systems. The second case is a sequence file, a standard Hadoop form, which consists of a collection of key value pairs. In this case a single record is a key value pair.

The major issue in splitting input records is to determine an appropriate chunk of data for a mapper to handle. There are two nature concerns. A split must be parallelizable, that is a sufficiently large number of records must be generated to keep a number of mappers busy. Records are generated in parallel in two ways. First, when a number of files is passed to the job, typically this will involve passing a directory full of files, each mapper may pull records from a separate file and continue switching files until the job is complete. Another option is to split an input file.

When an input file is split a reader starts in the middle of a file and reads until it hits the beginning of a record. After that it reads data one record at a time until a specified segment of the file has been read. It is up to the FileInput class to specify whether an input file may be split and up to the associated reader to determine exactly how. There is some clever code in the Hadoop infrastructure to make sure that as a file is split all of the data will end up in one or another split.One very common way of splitting files is to split a text file into lines. In this case, a record starts immediately after a carriage return and all the reader has to do to determine the start of a record is to read until a carriage return is detected.

Splitting compressed files is more difficult and can be a major concern in the design of a job where the input is a single compressed file. Most common compression formats such as zip and gzip cannot be split. Hadoop has code for splitting a compression format called lzo, Unfortunately lzo codecs in Java do not exist. When your data consists of a small number of unsplittable files, in our case only one, it is necessary to change the data format to make it splittable in some way, one good solution is to break a single compressed file into a large number, say 1000 or so, compressed files. While each individual file cannot be split, the large number of files supplies sufficient parallelism.

It is possible to write your own splitter to solve a specific problem. I will present a simple case of a splitter and a reader to illustrate how this can work. The problem that I was trying to solve involved an input consisting of a large number of text files. Unlike the word count case, each text file was a complete record. This case is very common, a text file might represent an XML document or some other file describing an interesting object. What I wanted to do was to send all of the text from each file as a single record to a mapper which would process it appropriately.

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

         Job job = new Job(conf, "WholeFiles");

        conf = job.getConfiguration();

        // NOTE JOB Copies the configuraton

        job.setInputFormatClass(WholeFileInputFormat.class);

The InputFormat is a subclass of FileInputFormat<KeyType, ValueType>. In the standard line reader the key type is a long representing the position in the file plus some random information and the value is a line of text. For the WholeFileInputFormat reuse the key as the name of the file as Text and the value is the text in the file read as a series of laws. FileInputFormat may implement two methods. isSplitable simply returns true if the reader is capable of splitting the file and false otherwise. In this example we return false indicating that the value will be the entire text of the file.

 

   protected boolean isSplitable(JobContext context, Path file) {

        return false;

    }

 

The createRecordReader method can simply create a copy of the appropriate reader and return it. All of the interesting work is done in the reader class itself.

 

    public RecordReader<Text, Text>  createRecordReader(InputSplit split,

                       TaskAttemptContext context) {

        return new MyWholeFileReader();

    }

 

 

The reader is copied from LineRecordReader, the standard line reader for text files and modified at three points. The Method boolean nextKeyValue() reads the entire file and sets it as the current value. It also sets the current key to the filename which is assumed to be unique.

 

The entire code for my whole file input format is shown below.

 /**

* written by Steve Lewis

* lordjoe2000@gmail.com

* See http://lordjoesoftware.blogspot.com/

*/

package org.systemsbiology.hadoop;

 

 

import org.apache.hadoop.conf.*;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.*;

import org.apache.hadoop.io.compress.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.util.*;

 

import java.io.*;

 

 

/**

* Splitter that reads a whole file as a single record

* This is useful when you have a large number of files

* each of which is a complete unit - for example XML Documents

*/

public class WholeFileInputFormat extends FileInputFormat<Text, Text> {

 

    @Override

    public RecordReader<Text, Text>  createRecordReader(InputSplit split,

                       TaskAttemptContext context) {

        return new MyWholeFileReader();

    }

 

    @Override

    protected boolean isSplitable(JobContext context, Path file) {

        return false;

    }

 

    /**

     * Custom RecordReader which returns the entire file as a

     * single value with the name as a key

     * Value is the entire file

     * Key is the file name

     */

    public static class MyWholeFileReader extends RecordReader<Text, Text> {

 

        private CompressionCodecFactory compressionCodecs = null;

        private long start;

        private LineReader in;

        private Text key = null;

        private Text value = null;

        private Text buffer = new Text();

 

        public void initialize(InputSplit genericSplit,

                               TaskAttemptContext context) throws IOException {

            FileSplit split = (FileSplit) genericSplit;

            Configuration job = context.getConfiguration();

            start = split.getStart();

            final Path file = split.getPath();

            compressionCodecs = new CompressionCodecFactory(job);

            final CompressionCodec codec = compressionCodecs.getCodec(file);

 

            // open the file and seek to the start of the split

            FileSystem fs = file.getFileSystem(job);

            FSDataInputStream fileIn = fs.open(split.getPath());

            if (codec != null) {

                in = new LineReader(codec.createInputStream(fileIn), job);

              }

            else {

                in = new LineReader(fileIn, job);

            }

            if (key == null) {

                key = new Text();

            }

            key.set(split.getPath().getName());

            if (value == null) {

                value = new Text();

            }

 

        }

 

        public boolean nextKeyValue() throws IOException {

            int newSize = 0;

            StringBuilder sb = new StringBuilder();

            newSize = in.readLine(buffer);

            while (newSize > 0) {

                String str = buffer.toString();

                sb.append(str);

                sb.append("\n");

                newSize = in.readLine(buffer);

            }

 

            String s = sb.toString();

            value.set(s);

 

            if (sb.length() == 0) {

                key = null;

                value = null;

                return false;

            }

            else {

                return true;

            }

        }

 

        @Override

        public Text getCurrentKey() {

            return key;

        }

 

        @Override

        public Text getCurrentValue() {

            return value;

        }

 

        /**

         * Get the progress within the split

         */

        public float getProgress() {

            return 0.0f;

        }

 

        public synchronized void close() throws IOException {

            if (in != null) {

                in.close();

            }

        }

    }

 

}