Wednesday, August 11, 2010

Building a Hadoop Jar File

Hadoop processes work by sending a jar file to each machine in a Hadoop cluster. The jar file contains all of the code, the resources and the libraries required by the Hadoop process. Normally the way I develop is to get my job running using a local Hadoop instance with a single processor and when the code runs properly on a single instance deploying at to a cluster.

Usually it requires a significant amount of testing and debugging on the target cluster before a job runs successfully. This means that it is necessary to rapidly create and deploy new jar files during the testing and debugging phase. Eventually, the last successful jar file may be deployed to a cluster for production work. In my development process you want building and deploying a jar file to be a rapid and mindless operation.

A Hadoop tar file is a standard Java jar with one exception. All of the libraries used by the code are in a top level directory called lib. Any class files in the class path outside of jars can simply be copied into the jar file with the appropriate directory structure.

The easiest way to create a jar is to run a Java application using the same classpath that was used when the application was tested in a single machine cluster. Any directories found in the classpath can simply have their contents copied into the jar file. Any jars in the class path may be added in a directory called lib. The only problem with this approach is that it means that jars which may not be needed by the application war which are were already present on the target system such as the standard Hadoop jars will be needlessly copied. What what the code listed below does is to maintain a blacklist of jars that are not required in a deployed application. Usually this list is well-known to a developer and rarely changes.

The following code will generate a deployable Hadoop jar assuming

package org.systemsbiology.aws;

import java.io.*;
import java.util.*;
import java.util.zip.*;

/**
* org.systemsbiology.aws.HadoopDeployer
* build a hadoop jar file
*
* @author Steve Lewis
* @date Apr 3, 2007
*/
public class HadoopDeployer {
    public static HadoopDeployer[] EMPTY_ARRAY = {};
    public static Class THIS_CLASS = HadoopDeployer.class;

    public static final String[] EEXCLUDED_JARS_LIST = {
             "hadoop-0.20.2-core.jar",
            "slf4j-log4j12-1.4.3.jar",
            "log4j-1.2.15.jar",
             "junit-4.8.1.jar"

              // … add other jars to exclude
    };

    public static final Set<String> EXCLUDED_JARS = new HashSet(Arrays.asList(EEXCLUDED_JARS_LIST));

    /**
     * Turn the classpath into a list of files to add to the jar
     * @param pathItems !null list is classpath items - files and directories
     * @param javaHome   !null java diretory - things in javaHome are automatically excluded
     * @return
     */
    public static File[] filterClassPath(String[] pathItems, String javaHome) {
        List holder = new ArrayList();

        for (int i = 0; i < pathItems.length; i++) {
            String item = pathItems[i];
            if (".".equals(item))
                continue; // ignore
            if (inExcludedJars(item))
                continue;  // ignore  excluded jars
            if (item.indexOf(javaHome) > -1)
                continue;  // ignore java jars
            File itemFile = new File(item);
            if (!itemFile.exists())
                continue; // ignore non-existant jars
            if (itemFile.isFile()) {
                holder.add(itemFile);
                continue;
            }
            if (itemFile.isDirectory()) {
                continue; // ignore directories
            }

        }
        File[] ret = new File[holder.size()];
        holder.toArray(ret);
        return ret;
    }

    /**
     * get a list of directories in the classpath
     * @param pathItems !null list of items
     * @param javaHome   !nu; java home
     * @return !Null list of existing files
     */
    public static File[] filterClassPathDirectories(String[] pathItems, String javaHome) {
        List holder = new ArrayList();
        for (int i = 0; i < pathItems.length; i++) {
            String item = pathItems[i];
            if (".".equals(item))
                continue;    // ignore  .
            if (EXCLUDED_JARS.contains(item))
                continue;    // ignore  excluded jars
            if (item.indexOf(javaHome) > -1)
                continue;   // ignore java jars
            File itemFile = new File(item);
            if (!itemFile.exists())
                continue;   // ignore non-existant jars
            if (itemFile.isFile()) {
                continue;   // ignore files
            }
            if (itemFile.isDirectory())
                holder.add(itemFile);
        }

        File[] ret = new File[holder.size()];
        holder.toArray(ret);
        return ret;
    }

    /**
     * true if s is the name of an excluded jar
     * @param s !null name
     * @return  as above
     */
    protected static boolean inExcludedJars(String s) {
        for (int i = 0; i <  EEXCLUDED_JARS_LIST.length; i++) {
            String test =  EEXCLUDED_JARS_LIST[i];
            if (s.endsWith(test))
                return true;
        }
        return false;
    }

    /**
     * copy jars to a lib directory
     * @param out !null open output stream
     * @param libs   !null file list - should be jar files
     * @throws IOException on error
     */
    public static void copyLibraries(ZipOutputStream out, File[] libs) throws IOException {
        for (int i = 0; i < libs.length; i++) {
            File lib = libs[i];
            final String name = "lib/" + lib.getName();
            System.out.println(name);
            ZipEntry ze = new ZipEntry(name);
            out.putNextEntry(ze);
            copyFile(lib, out);
            out.closeEntry();
        }
    }

    /**
     *
     * @param src !null  destination file
     * @param dst  !null open output stream
     * @return  true if no problem
     */
    public static boolean copyFile(File src, ZipOutputStream dst) {
        int bufsize = 1024;
        try {
            RandomAccessFile srcFile = new RandomAccessFile(src, "r");
            long len = srcFile.length();
            if (len > 0x7fffffff) {
                return (false);
            }
            // too large
            int l = (int) len;
            if (l == 0) {
                return (false);
            }
            // failure - no data

            int bytesRead = 0;
            byte[] buffer = new byte[bufsize];
            while ((bytesRead = srcFile.read(buffer, 0, bufsize)) != -1) {
                dst.write(buffer, 0, bytesRead);
            }
            srcFile.close();
            return true;
        }
        catch (IOException ex) {
            return (false);
        }
    }

    /**
     * Create a deployable Jar as jarFile
     * @param jarFile !null creatable file for the Jan
     */
    public static void deployLibrariesToJar(File jarFile) {
        try {
            ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile));

            String javaHome = System.getProperty("java.home");
            String classpath = System.getProperty("java.class.path");
            String[] pathItems = null;
            if (classpath.contains(";")) {
                pathItems = classpath.split(";");  // windows
            }
            else {
                if (classpath.contains(":")) { // linux
                    pathItems = classpath.split(":");   // Linux stlye
                }
                else {
                    String[] items = {classpath};
                    pathItems = items; // only 1 I guess
                }
            }
            File[] pathLibs = filterClassPath(pathItems, javaHome);
            copyLibraries(out, pathLibs);
            File[] pathDirectories = filterClassPathDirectories(pathItems, javaHome);
            for (int i = 0; i < pathDirectories.length; i++) {
                File pathDirectory = pathDirectories[i];
                copyLibraryDirectory("", pathDirectory, out);
            }
            out.flush();
            out.close();

        }
        catch (IOException e) {
            throw new RuntimeException(e);

        }
    }

    /**
     * make a path string adding name to
     * @param path !null current path
     * @param name  !null name - uaually a subdirctory
     * @return
     */
    public static String nextPath(String path, String name) {
        if (path == null || path.length() == 0)
            return name;
        return path + "/" + name;
    }

    /**
     * Copy a library - if a
     * @param s path - really used only when creating a directory path
     * @param dir !null existing file or directory
     * @param pOut  !null open output stream
     * @throws IOException
     */
    private static void copyLibraryDirectory(final String s, final File dir, final ZipOutputStream pOut) throws IOException {
        File[] list = dir.listFiles();
        if (list == null) return;
        for (int i = 0; i < list.length; i++) {
            File file = list[i];
            if (file.isDirectory()) {
                final String np = nextPath(s, file.getName());
                copyLibraryDirectory(np, file, pOut);
            }
            else {
                final String np = nextPath(s, file.getName());
                ZipEntry ze = new ZipEntry(np);
                pOut.putNextEntry(ze);
                copyFile(file, pOut);
                pOut.closeEntry();
            }
        }
    }

    /**
     * create a deployable Hadoop jar using the existing classpath
     * @param pJarName !null name of a file that is creatable
     */
    public static void makeHadoopJar(final String pJarName) {
        File deployDir = new File(pJarName);
        deployLibrariesToJar(deployDir);
    }

    /**
     * Sample use
     * @param args    ignored
     */
    public static void main(String[] args) {
        String jarName = "FooBar.jar";
        if(args.length > 0)
            jarName = args[0];
        makeHadoopJar(jarName);

    }
}

Accessing Local Files

In most examples of Hadoop code there is no reason to access a local file system. All of the data is passed using the standard map and reduce methods. Indeed it is usually a bad idea to access a local filesystem on a slave processor because that data will not be persisted from one processing step to the next. Sometimes, however, these rules have to change.

One case where it is particularly necessary to access a local filesystem is where a critical step in either the mapper or the reducer involves launching a separate process where the application assumes the existence of certain local files. Normally when Hadoop uses an external process it uses Hadoop streaming which assumes that the external process takes all of its data from standard in and sends all of its output to standard out. These assumptions may fail under several conditions. First, the external process may require more than one input. For example, one or more configuration files may be required. In addition, it assumes that the developer has sufficient control over the external process and the way it functions to make it compatible with Hadoop streaming.

In many cases these assumptions may be unrealistic. Nothing prevents a custom mapper or reducer from writing appropriate files on the local filesystem for an external program and after watching that program reading any output files that have been written.

There are two ways to get files to the local filesystem of a slave processor. One is to use Hadoop's distributed cache which will send files specified in the job configuration to each slaves local file system. The distributed cache will be a topic of another blog entry. This entry will concentrate on reading and writing local files. The alternative is to have these slave process right the files directly to the file system. Files which will be required during all steps of processing may be written to a local filesystem during the setup phase. Files required only for a single step of processing may be written during that step and, if no longer required, deleted at the end of that step.

. Hadoop supplies a LocalFileSystem object which manages the relationship to the local file system. The code below shows how to get a LocalFileSystem  given a Hadoop context.

Configuration configuration = context.getConfiguration();
LocalFileSystem localFs = FileSystem.getLocal(configuration);

The LocalFileSystem  has methods create, delete, open and append to files on the local filesystem. Each file is designated by a Path. In my work I have made these Paths relative since I am uncertain about where a program is running or what permissions are available on a slave processer.

 

The following code is a set of static utility routines that write to the local filesystem. I consider three cases in the first the data is a string, possibly the contents of a Text object passed in. In the second, the contents are a resource passed in with a custom jar file. Resources are very convenient when data must be passed to every instance and where the data is not large relative to the size of the jar file. Both of these end up calling a routine which writes the contents of an InputStream to the local file system. This allows a third possibility where the data source is anything that can supply an input stream, very specifically Web services and other data sources. Will I

/**
* write a resource to a  LocalFileSystem
* @param cls - class holding the resource
* @param resourceName - !null name of the resource
* @param localFs - !null file system
* @param dstFile - !null local file name - this will become a path
*/
public static void writeResourceAsFile(Class cls,String resourceName, LocalFileSystem localFs, String dstFile) {
     InputStream inp = cls.getResourceAsStream(resourceName);
     writeStreamAsFile(localFs, dstFile, inp);
}

/**
* Write the contents of a stream to the local file system
* @param localFs  - !null file system
* @param dstFile - !null local file name - this will become a path
* @param pInp - !null open Stream
*/
public static void writeStreamAsFile(final LocalFileSystem localFs, final String dstFile, final InputStream pInp) {
    Path path = new Path(dstFile);
    try {
        FSDataOutputStream outStream = localFs.create(path);
        copyFile(pInp, outStream);
    }
    catch (IOException e) {
        throw new RuntimeException(e);

    }
}

/**
* Write the contents of a String to the local file system
* @param localFs  - !null file system
* @param dstFile - !null local file name - this will become a path
  * @param s  !null String
*/
public static void writeStringAsFile(final LocalFileSystem localFs, final String dstFile, final String s) {
    ByteArrayInputStream inp = new ByteArrayInputStream(s.getBytes());
     writeStreamAsFile(localFs,dstFile, inp);
}

/**
* copy an  InputStream to an outStream
* @param inp - !null open Stream it will be closed at the end
* @param outStream !null open Stream it will be closed at the end
* @return  true on success
*/
public static boolean copyFile(InputStream inp, FSDataOutputStream outStream) {
     int bufsize = 1024;
     try {
         // failure - no data

         int bytesRead = 0;
         byte[] buffer = new byte[bufsize];
         while ((bytesRead = inp.read(buffer, 0, bufsize)) != -1) {
             outStream.write(buffer, 0, bytesRead);
         }
         inp.close();
         outStream.close();
         return true;
     }
     catch (IOException ex) {
         return (false);
     }
}

Customized Splitters and Readers

A basic problem with Hadoop systems is acquiring input. The input to a Hadoop job is usually one large file, many small files or multiple larger files. The first question to ask in designing a Hadoop job is how the input is to be split.the mapper receives a split of the input file, which is to say the input is broken into smaller chunks for processing by the mapper. The size and meaning of these chunks is determined by the character of the problem that you are trying to solve. There are two common cases that are handled by splitters in the Hadoop distribution. The first case is a text file in which a record is a single line of text. This case is appropriate for parsing log files, much webpage content and other text based systems. The second case is a sequence file, a standard Hadoop form, which consists of a collection of key value pairs. In this case a single record is a key value pair.

The major issue in splitting input records is to determine an appropriate chunk of data for a mapper to handle. There are two nature concerns. A split must be parallelizable, that is a sufficiently large number of records must be generated to keep a number of mappers busy. Records are generated in parallel in two ways. First, when a number of files is passed to the job, typically this will involve passing a directory full of files, each mapper may pull records from a separate file and continue switching files until the job is complete. Another option is to split an input file.

When an input file is split a reader starts in the middle of a file and reads until it hits the beginning of a record. After that it reads data one record at a time until a specified segment of the file has been read. It is up to the FileInput class to specify whether an input file may be split and up to the associated reader to determine exactly how. There is some clever code in the Hadoop infrastructure to make sure that as a file is split all of the data will end up in one or another split.One very common way of splitting files is to split a text file into lines. In this case, a record starts immediately after a carriage return and all the reader has to do to determine the start of a record is to read until a carriage return is detected.

Splitting compressed files is more difficult and can be a major concern in the design of a job where the input is a single compressed file. Most common compression formats such as zip and gzip cannot be split. Hadoop has code for splitting a compression format called lzo, Unfortunately lzo codecs in Java do not exist. When your data consists of a small number of unsplittable files, in our case only one, it is necessary to change the data format to make it splittable in some way, one good solution is to break a single compressed file into a large number, say 1000 or so, compressed files. While each individual file cannot be split, the large number of files supplies sufficient parallelism.

It is possible to write your own splitter to solve a specific problem. I will present a simple case of a splitter and a reader to illustrate how this can work. The problem that I was trying to solve involved an input consisting of a large number of text files. Unlike the word count case, each text file was a complete record. This case is very common, a text file might represent an XML document or some other file describing an interesting object. What I wanted to do was to send all of the text from each file as a single record to a mapper which would process it appropriately.

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

         Job job = new Job(conf, "WholeFiles");

        conf = job.getConfiguration();

        // NOTE JOB Copies the configuraton

        job.setInputFormatClass(WholeFileInputFormat.class);

The InputFormat is a subclass of FileInputFormat<KeyType, ValueType>. In the standard line reader the key type is a long representing the position in the file plus some random information and the value is a line of text. For the WholeFileInputFormat reuse the key as the name of the file as Text and the value is the text in the file read as a series of laws. FileInputFormat may implement two methods. isSplitable simply returns true if the reader is capable of splitting the file and false otherwise. In this example we return false indicating that the value will be the entire text of the file.

 

   protected boolean isSplitable(JobContext context, Path file) {

        return false;

    }

 

The createRecordReader method can simply create a copy of the appropriate reader and return it. All of the interesting work is done in the reader class itself.

 

    public RecordReader<Text, Text>  createRecordReader(InputSplit split,

                       TaskAttemptContext context) {

        return new MyWholeFileReader();

    }

 

 

The reader is copied from LineRecordReader, the standard line reader for text files and modified at three points. The Method boolean nextKeyValue() reads the entire file and sets it as the current value. It also sets the current key to the filename which is assumed to be unique.

 

The entire code for my whole file input format is shown below.

 /**

* written by Steve Lewis

* lordjoe2000@gmail.com

* See http://lordjoesoftware.blogspot.com/

*/

package org.systemsbiology.hadoop;

 

 

import org.apache.hadoop.conf.*;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.*;

import org.apache.hadoop.io.compress.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.util.*;

 

import java.io.*;

 

 

/**

* Splitter that reads a whole file as a single record

* This is useful when you have a large number of files

* each of which is a complete unit - for example XML Documents

*/

public class WholeFileInputFormat extends FileInputFormat<Text, Text> {

 

    @Override

    public RecordReader<Text, Text>  createRecordReader(InputSplit split,

                       TaskAttemptContext context) {

        return new MyWholeFileReader();

    }

 

    @Override

    protected boolean isSplitable(JobContext context, Path file) {

        return false;

    }

 

    /**

     * Custom RecordReader which returns the entire file as a

     * single value with the name as a key

     * Value is the entire file

     * Key is the file name

     */

    public static class MyWholeFileReader extends RecordReader<Text, Text> {

 

        private CompressionCodecFactory compressionCodecs = null;

        private long start;

        private LineReader in;

        private Text key = null;

        private Text value = null;

        private Text buffer = new Text();

 

        public void initialize(InputSplit genericSplit,

                               TaskAttemptContext context) throws IOException {

            FileSplit split = (FileSplit) genericSplit;

            Configuration job = context.getConfiguration();

            start = split.getStart();

            final Path file = split.getPath();

            compressionCodecs = new CompressionCodecFactory(job);

            final CompressionCodec codec = compressionCodecs.getCodec(file);

 

            // open the file and seek to the start of the split

            FileSystem fs = file.getFileSystem(job);

            FSDataInputStream fileIn = fs.open(split.getPath());

            if (codec != null) {

                in = new LineReader(codec.createInputStream(fileIn), job);

              }

            else {

                in = new LineReader(fileIn, job);

            }

            if (key == null) {

                key = new Text();

            }

            key.set(split.getPath().getName());

            if (value == null) {

                value = new Text();

            }

 

        }

 

        public boolean nextKeyValue() throws IOException {

            int newSize = 0;

            StringBuilder sb = new StringBuilder();

            newSize = in.readLine(buffer);

            while (newSize > 0) {

                String str = buffer.toString();

                sb.append(str);

                sb.append("\n");

                newSize = in.readLine(buffer);

            }

 

            String s = sb.toString();

            value.set(s);

 

            if (sb.length() == 0) {

                key = null;

                value = null;

                return false;

            }

            else {

                return true;

            }

        }

 

        @Override

        public Text getCurrentKey() {

            return key;

        }

 

        @Override

        public Text getCurrentValue() {

            return value;

        }

 

        /**

         * Get the progress within the split

         */

        public float getProgress() {

            return 0.0f;

        }

 

        public synchronized void close() throws IOException {

            if (in != null) {

                in.close();

            }

        }

    }

 

}