Gabi und Sascha
Tags - Kategorien : Alle | Berlin | Bücher | Fotografie | Java | Linkhalde | Weichware | Verfassung

Ich habe heute in eine Hadoop MapReduce-Applikation mit einem DistributedCache geschrieben. Grund: ich wollte eine 100 MB grosse Datei nicht in die uberjar einbauen. Hadoops DistributedCache ist für solche nur-lese Dateien bestens geeignet. Für die Benutzung ist auf der Hadoop Seite im WordCount v2.0 Beispiel auch eine ausreichende Anleitung. Nach ein paar anfänglichen Schwierigkeiten (wer lesen kann ist klar im Vorteil, siehe vorhergehenden Satz) klappte es auch ganz ordentlich. Leider nicht mit dem HadoopTestCase.

Ich habe den Pfade zur Datei im MapReduce-Driver der Konfiguration übergeben:


    static final String TRIEX_DAT = "simple.triex";
    public static final String LOCAL_PATH;

    static {
        final Thread currentThread = Thread.currentThread();
        final ClassLoader loader = currentThread.getContextClassLoader();
        final URL resource = loader.getResource(PARAGRAPHS_DAT);

        final String path = resource.getPath();
        final int lastIndexOf = path.lastIndexOf("/");
        LOCAL_PATH = path.substring(0, lastIndexOf);
    }

    DistributedCache.addCacheFile(new Path(LOCAL_PATH, TRIEX_DAT).toUri(),
                                  jobConf);

Im Mapper wurde der Pfad dann wie geholt und weiter verarbeitet:


    final Path[] triexFile = DistributedCache.getLocalCacheFiles(this.jobConf);
    final InputStream in = new FileInputStream(triexFile[0].toString());

Ergebnis:


Caused by: java.lang.NullPointerException
	at org.apache.hadoop.util.StringUtils.stringToPath(StringUtils.java:197)
	at org.apache.hadoop.filecache.DistributedCache.getLocalCacheFiles(DistributedCache.java:515)
	at example.DocumentProducerMapper.initializeConceptMapper(DocumentProducerMapper.java:365)

Scheinbar wird der Pfad nicht richtig übergeben, weil es sich nicht um ein fully-distributed System handelt. Die Lösung sieht wie folgt aus:


    final URI[] triexFile =
        DistributedCache.getCacheFiles(this.jobConf);
    final Path p = new Path(triexFile[0].toASCIIString());
    final FileSystem fs = p.getFileSystem(this.jobConf);
    final Path qualified = p.makeQualified(fs);
    final InputStream in = qualified.toUri().toURL().openStream();

Damit die Tests durchlaufen habe ich das Strategie-Pattern angewendet. Es gibt eine default-Strategie für den Cluster-Betrieb. Diese wird verwendet, wenn keine andere Strategie verwendet wird, um die DistributedCache Pfade aufzulösen. Die Strategie-Klasse wird in der Job-Konfiguration hinterlegt. Ist der Inhalt nicht null, wird daraus ein neues Exemplar gebaut und verwendet. Nicht ganz elegant - für die Experimente allerdings ausreichend.


Strategie-Interface und -Klasse:


    package example;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapred.JobConf;

    import java.io.IOException;

    public interface DistributedCacheStrategy {
        String KEY = "example.DistributedCacheStrategy.classname";
        Path[] distributedCachePathes(final JobConf conf) throws IOException;
    }


    package example;

    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapred.JobConf;

    import java.io.IOException;

    public class FullyDistributedCacheStrategy implements DistributedCacheStrategy {
        public Path[] distributedCachePathes(final JobConf conf) throws IOException
        {
            final List pathes = new ArrayList();
            final Path[] pathList = DistributedCache.getLocalCacheFiles(conf);

            for (final Path path : pathList) {
                final Path qualified = new Path("file", "", path.toString());
                pathes.add(qualified);
            }

            return pathes.toArray(new Path[pathList.length]);
        }
    }


    package example;

    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapred.JobConf;

    import java.io.IOException;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.List;

    public class TestDistributedCacheStrategy implements DistributedCacheStrategy {

        public Path[] distributedCachePathes(final JobConf conf) throws IOException
        {

          final URI[] uris = DistributedCache.getCacheFiles(conf);
          final List pathes = new ArrayList();

          for (final URI uri : uris) {
              final Path p = new Path(uri.toASCIIString());
              final FileSystem fs = p.getFileSystem(conf);
              final Path qualified = p.makeQualified(fs);
              pathes.add(qualified);
          }

          return pathes.toArray(new Path[uris.length]);
        }
    }