Saturday, November 13, 2010

The Instability of Hadoop 0.21

The bottom line of this post is that Hadoop 0.21 broke the usual rules for upgrading an existing library. Because of changes in the basic implementation of major classes in the system there is no guarantee that a well written program in version 0.20.2 will execute properly in a Hadoop 0.21 environment or, of course vice versa.

When Hadoop transitioned from 0.18 to 0.20 there were radical changes in the implementation of major classes. Classes like Mapper, Reducer and associated contexts were required to be subclasses of well known base classes rather than merely interfaces with appropriate sample implementations. This was, in my opinion, a poor decision a point discussed in more detail below. When this change was made, all of the new code was placed in a new and separate package. Older code was in the package org.apache.hadoop.mapred while new code was in the package org.apache.hadoop.maprededuce. The net effect of this design, not changing the functionality of older code but extending the libraries with new packages and classes was that any properly written application written using the 0.18 libraries would continue to run in the same manner when run under an 0.20.1 or an 0.20.2 or even (I presume) an 0.21 system.

It is a general and good principle of library design to design upgrades to not break older code. A few principles are involved. First, never change public interfaces. Any change to an interface, including adding new methods will break existing implementations. If new methods must be added to an interface, the usual pattern looks like this.

interface MyInterface2 extends MyInterface {

{

      < add new methods>

}

Public classes may occasionally have add methods and data (never abstract methods) cautiously since there is reasonable assurance that older code will not call those methods.

The general rule is to never, never make significant changes to public classes and interfaces since it is highly likely that those changes will break existing code.

These rules were violated when Hadoop moved to version 0.21. In Hadoop 0.20 there was a decision to change many interfaces would be replaced with concrete classes. This was a poor decision since it makes overriding a a class to add new functionality quite difficult whereas with a interface construction of a simple proxy is extremely straightforward. In Hadoop 0.21 this decision was altered and many major classes including Mapper.Context, Reducer.Context and their base classes TaskAttemptContext and JobContext have been transmogrified from concrete classes into interfaces.

What this means is that any code which subclasses any of these classes in 0.20 Hadoop is guaranteed to break when run with the Hadoop 0.21 libraries. The converse also applies, any code implementing these interfaces in 0.21 will not run with the 0.20 libraries (not so unexpected) .

In fifteen years of programming Java and seeing libraries evolve I have never seen such a violation of the desirability of making earlier versions of the libraries compatible with later versions. Never has there been such a major change in the functioning or well known and universally used classes. The fact that any 0.20 code works under 0.21 is a coincidence of the structure if a class file which allows a class with the same name and methods to work properly when changed from a concrete class to an interface.

It is also the case that clusters configured with 0.21 must be rebuilt to convert to 0.20 with the removal of all content in HDFS.

The conclusion is

1) Consider carefully a move to 0.21 since there is no guarantee that a well written class in 0,20 will run in a cluster using the 0.21 libraries. Curiously a well written class using the 0.18 libraries has a much better chance of working well.

2) Avoid subclassing Context or its superclasses as none of this code will port to 0.21.

Friday, October 29, 2010

Handling Statistics

A common problem we find in dealing with many kinds of data is that information which when accumulated is known by all of the mappers needs to be used in the reduce step. Let us consider a couple of examples of this problem. In the classic word count example the mapper in its words as keys and counts as values and the reducer sums the count and returns the word followed by its count. A variation on this problem would have the will of the reducer emit the word, its count and the fraction of all words of that length represented by this particular word. So, for example, we might see the output for the word ‘a’ being

A    4581    0.62702

indicating that A occurred 4581 times and that this represents 62% of all words of length 1.

While the example might seem somewhat trivial, The problem is very common. In processing biological data we frequently want the probability that an observation is greater than a particular value which is basically the number of observations greater than or equal to that value divided by the total observations.

In the word count/word frequency sample, It is straightforward for each mapper to keep a table tracking the number of words of a specific length seen which means at the end of the map step the system as a whole knows the counts for each word length.

While a second stage of map-reduce might allow the totals to be computed before further processing, this stage is unnecessary.

A solution needs to deal with two issues. First, every reducer need access to information which is accumulated in each mapper. This requires that every mapper send a copy of what it knows to every reducer. The second problem is that the reducers need to acquire this information before processing any data. This means that not only must the data be sent to each reducer but it must be sent with a key which is not only distinct but sorts ahead of any other key.

I modified the WordCount Sample on the following ways.

1) In the mapper for words of length less than 100, the number of words of each length is counted.

2) In the cleanup method the counts are emitted from each mapper N times with keys which are 1) designed to sort ahead or all other keys (starting with “    “) 2) are numbered so one copy from every mapper goes to every reducer.

3) In the reducer the broadcast keys are separated and handled by a special processor which accumulates a grand total in each reducer.

4) The reducer divides the count by the total words of that length to get a frequency which is added to the output -

The reducers output looks like

ABATED    3    0.00018
ABHORRENT    1    0.00015
ABODE    1    0.00004
ABOUTHOWEVER    1    0.00078
ABSTRACTED    1    0.00026

The code is

==========================================

Wednesday, August 11, 2010

Building a Hadoop Jar File

Hadoop processes work by sending a jar file to each machine in a Hadoop cluster. The jar file contains all of the code, the resources and the libraries required by the Hadoop process. Normally the way I develop is to get my job running using a local Hadoop instance with a single processor and when the code runs properly on a single instance deploying at to a cluster.

Usually it requires a significant amount of testing and debugging on the target cluster before a job runs successfully. This means that it is necessary to rapidly create and deploy new jar files during the testing and debugging phase. Eventually, the last successful jar file may be deployed to a cluster for production work. In my development process you want building and deploying a jar file to be a rapid and mindless operation.

A Hadoop tar file is a standard Java jar with one exception. All of the libraries used by the code are in a top level directory called lib. Any class files in the class path outside of jars can simply be copied into the jar file with the appropriate directory structure.

The easiest way to create a jar is to run a Java application using the same classpath that was used when the application was tested in a single machine cluster. Any directories found in the classpath can simply have their contents copied into the jar file. Any jars in the class path may be added in a directory called lib. The only problem with this approach is that it means that jars which may not be needed by the application war which are were already present on the target system such as the standard Hadoop jars will be needlessly copied. What what the code listed below does is to maintain a blacklist of jars that are not required in a deployed application. Usually this list is well-known to a developer and rarely changes.

The following code will generate a deployable Hadoop jar assuming

package org.systemsbiology.aws;

import java.io.*;
import java.util.*;
import java.util.zip.*;

/**
* org.systemsbiology.aws.HadoopDeployer
* build a hadoop jar file
*
* @author Steve Lewis
* @date Apr 3, 2007
*/
public class HadoopDeployer {
    public static HadoopDeployer[] EMPTY_ARRAY = {};
    public static Class THIS_CLASS = HadoopDeployer.class;

    public static final String[] EEXCLUDED_JARS_LIST = {
             "hadoop-0.20.2-core.jar",
            "slf4j-log4j12-1.4.3.jar",
            "log4j-1.2.15.jar",
             "junit-4.8.1.jar"

              // … add other jars to exclude
    };

    public static final Set<String> EXCLUDED_JARS = new HashSet(Arrays.asList(EEXCLUDED_JARS_LIST));

    /**
     * Turn the classpath into a list of files to add to the jar
     * @param pathItems !null list is classpath items - files and directories
     * @param javaHome   !null java diretory - things in javaHome are automatically excluded
     * @return
     */
    public static File[] filterClassPath(String[] pathItems, String javaHome) {
        List holder = new ArrayList();

        for (int i = 0; i < pathItems.length; i++) {
            String item = pathItems[i];
            if (".".equals(item))
                continue; // ignore
            if (inExcludedJars(item))
                continue;  // ignore  excluded jars
            if (item.indexOf(javaHome) > -1)
                continue;  // ignore java jars
            File itemFile = new File(item);
            if (!itemFile.exists())
                continue; // ignore non-existant jars
            if (itemFile.isFile()) {
                holder.add(itemFile);
                continue;
            }
            if (itemFile.isDirectory()) {
                continue; // ignore directories
            }

        }
        File[] ret = new File[holder.size()];
        holder.toArray(ret);
        return ret;
    }

    /**
     * get a list of directories in the classpath
     * @param pathItems !null list of items
     * @param javaHome   !nu; java home
     * @return !Null list of existing files
     */
    public static File[] filterClassPathDirectories(String[] pathItems, String javaHome) {
        List holder = new ArrayList();
        for (int i = 0; i < pathItems.length; i++) {
            String item = pathItems[i];
            if (".".equals(item))
                continue;    // ignore  .
            if (EXCLUDED_JARS.contains(item))
                continue;    // ignore  excluded jars
            if (item.indexOf(javaHome) > -1)
                continue;   // ignore java jars
            File itemFile = new File(item);
            if (!itemFile.exists())
                continue;   // ignore non-existant jars
            if (itemFile.isFile()) {
                continue;   // ignore files
            }
            if (itemFile.isDirectory())
                holder.add(itemFile);
        }

        File[] ret = new File[holder.size()];
        holder.toArray(ret);
        return ret;
    }

    /**
     * true if s is the name of an excluded jar
     * @param s !null name
     * @return  as above
     */
    protected static boolean inExcludedJars(String s) {
        for (int i = 0; i <  EEXCLUDED_JARS_LIST.length; i++) {
            String test =  EEXCLUDED_JARS_LIST[i];
            if (s.endsWith(test))
                return true;
        }
        return false;
    }

    /**
     * copy jars to a lib directory
     * @param out !null open output stream
     * @param libs   !null file list - should be jar files
     * @throws IOException on error
     */
    public static void copyLibraries(ZipOutputStream out, File[] libs) throws IOException {
        for (int i = 0; i < libs.length; i++) {
            File lib = libs[i];
            final String name = "lib/" + lib.getName();
            System.out.println(name);
            ZipEntry ze = new ZipEntry(name);
            out.putNextEntry(ze);
            copyFile(lib, out);
            out.closeEntry();
        }
    }

    /**
     *
     * @param src !null  destination file
     * @param dst  !null open output stream
     * @return  true if no problem
     */
    public static boolean copyFile(File src, ZipOutputStream dst) {
        int bufsize = 1024;
        try {
            RandomAccessFile srcFile = new RandomAccessFile(src, "r");
            long len = srcFile.length();
            if (len > 0x7fffffff) {
                return (false);
            }
            // too large
            int l = (int) len;
            if (l == 0) {
                return (false);
            }
            // failure - no data

            int bytesRead = 0;
            byte[] buffer = new byte[bufsize];
            while ((bytesRead = srcFile.read(buffer, 0, bufsize)) != -1) {
                dst.write(buffer, 0, bytesRead);
            }
            srcFile.close();
            return true;
        }
        catch (IOException ex) {
            return (false);
        }
    }

    /**
     * Create a deployable Jar as jarFile
     * @param jarFile !null creatable file for the Jan
     */
    public static void deployLibrariesToJar(File jarFile) {
        try {
            ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile));

            String javaHome = System.getProperty("java.home");
            String classpath = System.getProperty("java.class.path");
            String[] pathItems = null;
            if (classpath.contains(";")) {
                pathItems = classpath.split(";");  // windows
            }
            else {
                if (classpath.contains(":")) { // linux
                    pathItems = classpath.split(":");   // Linux stlye
                }
                else {
                    String[] items = {classpath};
                    pathItems = items; // only 1 I guess
                }
            }
            File[] pathLibs = filterClassPath(pathItems, javaHome);
            copyLibraries(out, pathLibs);
            File[] pathDirectories = filterClassPathDirectories(pathItems, javaHome);
            for (int i = 0; i < pathDirectories.length; i++) {
                File pathDirectory = pathDirectories[i];
                copyLibraryDirectory("", pathDirectory, out);
            }
            out.flush();
            out.close();

        }
        catch (IOException e) {
            throw new RuntimeException(e);

        }
    }

    /**
     * make a path string adding name to
     * @param path !null current path
     * @param name  !null name - uaually a subdirctory
     * @return
     */
    public static String nextPath(String path, String name) {
        if (path == null || path.length() == 0)
            return name;
        return path + "/" + name;
    }

    /**
     * Copy a library - if a
     * @param s path - really used only when creating a directory path
     * @param dir !null existing file or directory
     * @param pOut  !null open output stream
     * @throws IOException
     */
    private static void copyLibraryDirectory(final String s, final File dir, final ZipOutputStream pOut) throws IOException {
        File[] list = dir.listFiles();
        if (list == null) return;
        for (int i = 0; i < list.length; i++) {
            File file = list[i];
            if (file.isDirectory()) {
                final String np = nextPath(s, file.getName());
                copyLibraryDirectory(np, file, pOut);
            }
            else {
                final String np = nextPath(s, file.getName());
                ZipEntry ze = new ZipEntry(np);
                pOut.putNextEntry(ze);
                copyFile(file, pOut);
                pOut.closeEntry();
            }
        }
    }

    /**
     * create a deployable Hadoop jar using the existing classpath
     * @param pJarName !null name of a file that is creatable
     */
    public static void makeHadoopJar(final String pJarName) {
        File deployDir = new File(pJarName);
        deployLibrariesToJar(deployDir);
    }

    /**
     * Sample use
     * @param args    ignored
     */
    public static void main(String[] args) {
        String jarName = "FooBar.jar";
        if(args.length > 0)
            jarName = args[0];
        makeHadoopJar(jarName);

    }
}

Accessing Local Files

In most examples of Hadoop code there is no reason to access a local file system. All of the data is passed using the standard map and reduce methods. Indeed it is usually a bad idea to access a local filesystem on a slave processor because that data will not be persisted from one processing step to the next. Sometimes, however, these rules have to change.

One case where it is particularly necessary to access a local filesystem is where a critical step in either the mapper or the reducer involves launching a separate process where the application assumes the existence of certain local files. Normally when Hadoop uses an external process it uses Hadoop streaming which assumes that the external process takes all of its data from standard in and sends all of its output to standard out. These assumptions may fail under several conditions. First, the external process may require more than one input. For example, one or more configuration files may be required. In addition, it assumes that the developer has sufficient control over the external process and the way it functions to make it compatible with Hadoop streaming.

In many cases these assumptions may be unrealistic. Nothing prevents a custom mapper or reducer from writing appropriate files on the local filesystem for an external program and after watching that program reading any output files that have been written.

There are two ways to get files to the local filesystem of a slave processor. One is to use Hadoop's distributed cache which will send files specified in the job configuration to each slaves local file system. The distributed cache will be a topic of another blog entry. This entry will concentrate on reading and writing local files. The alternative is to have these slave process right the files directly to the file system. Files which will be required during all steps of processing may be written to a local filesystem during the setup phase. Files required only for a single step of processing may be written during that step and, if no longer required, deleted at the end of that step.

. Hadoop supplies a LocalFileSystem object which manages the relationship to the local file system. The code below shows how to get a LocalFileSystem  given a Hadoop context.

Configuration configuration = context.getConfiguration();
LocalFileSystem localFs = FileSystem.getLocal(configuration);

The LocalFileSystem  has methods create, delete, open and append to files on the local filesystem. Each file is designated by a Path. In my work I have made these Paths relative since I am uncertain about where a program is running or what permissions are available on a slave processer.

 

The following code is a set of static utility routines that write to the local filesystem. I consider three cases in the first the data is a string, possibly the contents of a Text object passed in. In the second, the contents are a resource passed in with a custom jar file. Resources are very convenient when data must be passed to every instance and where the data is not large relative to the size of the jar file. Both of these end up calling a routine which writes the contents of an InputStream to the local file system. This allows a third possibility where the data source is anything that can supply an input stream, very specifically Web services and other data sources. Will I

/**
* write a resource to a  LocalFileSystem
* @param cls - class holding the resource
* @param resourceName - !null name of the resource
* @param localFs - !null file system
* @param dstFile - !null local file name - this will become a path
*/
public static void writeResourceAsFile(Class cls,String resourceName, LocalFileSystem localFs, String dstFile) {
     InputStream inp = cls.getResourceAsStream(resourceName);
     writeStreamAsFile(localFs, dstFile, inp);
}

/**
* Write the contents of a stream to the local file system
* @param localFs  - !null file system
* @param dstFile - !null local file name - this will become a path
* @param pInp - !null open Stream
*/
public static void writeStreamAsFile(final LocalFileSystem localFs, final String dstFile, final InputStream pInp) {
    Path path = new Path(dstFile);
    try {
        FSDataOutputStream outStream = localFs.create(path);
        copyFile(pInp, outStream);
    }
    catch (IOException e) {
        throw new RuntimeException(e);

    }
}

/**
* Write the contents of a String to the local file system
* @param localFs  - !null file system
* @param dstFile - !null local file name - this will become a path
  * @param s  !null String
*/
public static void writeStringAsFile(final LocalFileSystem localFs, final String dstFile, final String s) {
    ByteArrayInputStream inp = new ByteArrayInputStream(s.getBytes());
     writeStreamAsFile(localFs,dstFile, inp);
}

/**
* copy an  InputStream to an outStream
* @param inp - !null open Stream it will be closed at the end
* @param outStream !null open Stream it will be closed at the end
* @return  true on success
*/
public static boolean copyFile(InputStream inp, FSDataOutputStream outStream) {
     int bufsize = 1024;
     try {
         // failure - no data

         int bytesRead = 0;
         byte[] buffer = new byte[bufsize];
         while ((bytesRead = inp.read(buffer, 0, bufsize)) != -1) {
             outStream.write(buffer, 0, bytesRead);
         }
         inp.close();
         outStream.close();
         return true;
     }
     catch (IOException ex) {
         return (false);
     }
}

Customized Splitters and Readers

A basic problem with Hadoop systems is acquiring input. The input to a Hadoop job is usually one large file, many small files or multiple larger files. The first question to ask in designing a Hadoop job is how the input is to be split.the mapper receives a split of the input file, which is to say the input is broken into smaller chunks for processing by the mapper. The size and meaning of these chunks is determined by the character of the problem that you are trying to solve. There are two common cases that are handled by splitters in the Hadoop distribution. The first case is a text file in which a record is a single line of text. This case is appropriate for parsing log files, much webpage content and other text based systems. The second case is a sequence file, a standard Hadoop form, which consists of a collection of key value pairs. In this case a single record is a key value pair.

The major issue in splitting input records is to determine an appropriate chunk of data for a mapper to handle. There are two nature concerns. A split must be parallelizable, that is a sufficiently large number of records must be generated to keep a number of mappers busy. Records are generated in parallel in two ways. First, when a number of files is passed to the job, typically this will involve passing a directory full of files, each mapper may pull records from a separate file and continue switching files until the job is complete. Another option is to split an input file.

When an input file is split a reader starts in the middle of a file and reads until it hits the beginning of a record. After that it reads data one record at a time until a specified segment of the file has been read. It is up to the FileInput class to specify whether an input file may be split and up to the associated reader to determine exactly how. There is some clever code in the Hadoop infrastructure to make sure that as a file is split all of the data will end up in one or another split.One very common way of splitting files is to split a text file into lines. In this case, a record starts immediately after a carriage return and all the reader has to do to determine the start of a record is to read until a carriage return is detected.

Splitting compressed files is more difficult and can be a major concern in the design of a job where the input is a single compressed file. Most common compression formats such as zip and gzip cannot be split. Hadoop has code for splitting a compression format called lzo, Unfortunately lzo codecs in Java do not exist. When your data consists of a small number of unsplittable files, in our case only one, it is necessary to change the data format to make it splittable in some way, one good solution is to break a single compressed file into a large number, say 1000 or so, compressed files. While each individual file cannot be split, the large number of files supplies sufficient parallelism.

It is possible to write your own splitter to solve a specific problem. I will present a simple case of a splitter and a reader to illustrate how this can work. The problem that I was trying to solve involved an input consisting of a large number of text files. Unlike the word count case, each text file was a complete record. This case is very common, a text file might represent an XML document or some other file describing an interesting object. What I wanted to do was to send all of the text from each file as a single record to a mapper which would process it appropriately.

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

         Job job = new Job(conf, "WholeFiles");

        conf = job.getConfiguration();

        // NOTE JOB Copies the configuraton

        job.setInputFormatClass(WholeFileInputFormat.class);

The InputFormat is a subclass of FileInputFormat<KeyType, ValueType>. In the standard line reader the key type is a long representing the position in the file plus some random information and the value is a line of text. For the WholeFileInputFormat reuse the key as the name of the file as Text and the value is the text in the file read as a series of laws. FileInputFormat may implement two methods. isSplitable simply returns true if the reader is capable of splitting the file and false otherwise. In this example we return false indicating that the value will be the entire text of the file.

 

   protected boolean isSplitable(JobContext context, Path file) {

        return false;

    }

 

The createRecordReader method can simply create a copy of the appropriate reader and return it. All of the interesting work is done in the reader class itself.

 

    public RecordReader<Text, Text>  createRecordReader(InputSplit split,

                       TaskAttemptContext context) {

        return new MyWholeFileReader();

    }

 

 

The reader is copied from LineRecordReader, the standard line reader for text files and modified at three points. The Method boolean nextKeyValue() reads the entire file and sets it as the current value. It also sets the current key to the filename which is assumed to be unique.

 

The entire code for my whole file input format is shown below.

 /**

* written by Steve Lewis

* lordjoe2000@gmail.com

* See http://lordjoesoftware.blogspot.com/

*/

package org.systemsbiology.hadoop;

 

 

import org.apache.hadoop.conf.*;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.*;

import org.apache.hadoop.io.compress.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.util.*;

 

import java.io.*;

 

 

/**

* Splitter that reads a whole file as a single record

* This is useful when you have a large number of files

* each of which is a complete unit - for example XML Documents

*/

public class WholeFileInputFormat extends FileInputFormat<Text, Text> {

 

    @Override

    public RecordReader<Text, Text>  createRecordReader(InputSplit split,

                       TaskAttemptContext context) {

        return new MyWholeFileReader();

    }

 

    @Override

    protected boolean isSplitable(JobContext context, Path file) {

        return false;

    }

 

    /**

     * Custom RecordReader which returns the entire file as a

     * single value with the name as a key

     * Value is the entire file

     * Key is the file name

     */

    public static class MyWholeFileReader extends RecordReader<Text, Text> {

 

        private CompressionCodecFactory compressionCodecs = null;

        private long start;

        private LineReader in;

        private Text key = null;

        private Text value = null;

        private Text buffer = new Text();

 

        public void initialize(InputSplit genericSplit,

                               TaskAttemptContext context) throws IOException {

            FileSplit split = (FileSplit) genericSplit;

            Configuration job = context.getConfiguration();

            start = split.getStart();

            final Path file = split.getPath();

            compressionCodecs = new CompressionCodecFactory(job);

            final CompressionCodec codec = compressionCodecs.getCodec(file);

 

            // open the file and seek to the start of the split

            FileSystem fs = file.getFileSystem(job);

            FSDataInputStream fileIn = fs.open(split.getPath());

            if (codec != null) {

                in = new LineReader(codec.createInputStream(fileIn), job);

              }

            else {

                in = new LineReader(fileIn, job);

            }

            if (key == null) {

                key = new Text();

            }

            key.set(split.getPath().getName());

            if (value == null) {

                value = new Text();

            }

 

        }

 

        public boolean nextKeyValue() throws IOException {

            int newSize = 0;

            StringBuilder sb = new StringBuilder();

            newSize = in.readLine(buffer);

            while (newSize > 0) {

                String str = buffer.toString();

                sb.append(str);

                sb.append("\n");

                newSize = in.readLine(buffer);

            }

 

            String s = sb.toString();

            value.set(s);

 

            if (sb.length() == 0) {

                key = null;

                value = null;

                return false;

            }

            else {

                return true;

            }

        }

 

        @Override

        public Text getCurrentKey() {

            return key;

        }

 

        @Override

        public Text getCurrentValue() {

            return value;

        }

 

        /**

         * Get the progress within the split

         */

        public float getProgress() {

            return 0.0f;

        }

 

        public synchronized void close() throws IOException {

            if (in != null) {

                in.close();

            }

        }

    }

 

}