MapReduce

Hadoop MapReduce — это фреймворк для создания приложений, обрабатывающих большие объемы данных в парадигме MapReduce. В данном разделе вы узнаете, как запускать MapReduce-задачу на кластере Hadoop. Мы будем работать с приложением WordCount, которое подсчитывает количество вхождений каждого слова в заданный набор строковых данных.

Подключитесь по SSH к головному узлу и сохраните код программы в файл WordCount.java. Для этого выполните в консоли cat WordCount.java, чтобы создать соответствующий файл, и скопируйте в него следующие строчки:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}


Нажмите Ctrl+C для сохранения изменений в файле.

Задайте необходимые переменные среды:

export JAVA_HOME=/usr/lib/jvm/java-openjdk
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

Скомпилируйте программу и создайте Java-архив:

hadoop com.sun.tools.javac.Main WordCount.java
jar cf wc.jar WordCount*.class

Создайте файл с исходными данными:

hdfs dfs -mkdir /user/$USER/wordcount
hdfs dfs -mkdir /user/$USER/wordcount/input
echo -e "Hello\nWorld" | hdfs dfs -put - /user/$USER/wordcount/input/1.txt
echo -e "Hello\nHadoop" | hdfs dfs -put - /user/$USER/wordcount/input/2.txt
echo -e "Hello\nMapReduce" | hdfs dfs -put - /user/$USER/wordcount/input/3.txt

Данный код создает папку wordcount/input и помещает в нее три текстовых файла, содержащих каждый по два слова.

Запустите MapReduce-задачу:

yarn jar wc.jar WordCount /user/$USER/wordcount/input /user/$USER/wordcount/output

Эта команда запускает файл wc.jar, который обрабатывает текстовые файлы, хранящиеся в папке wordcount/input, и сохраняет выходные данные в папку wordcount/output.

В консоль будет выводиться следующая информация о ходе обработки:


18/05/15 06:25:58 INFO client.RMProxy: Connecting to ResourceManager at test-hadoop-head-0.novalocal/10.0.1.25:8050
18/05/15 06:25:58 INFO client.AHSProxy: Connecting to Application History server at test-hadoop-head-0.novalocal/10.0.1.25:10200
18/05/15 06:25:58 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/05/15 06:25:59 INFO input.FileInputFormat: Total input paths to process : 3
18/05/15 06:25:59 INFO mapreduce.JobSubmitter: number of splits:3
18/05/15 06:25:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1526294000135_0001
18/05/15 06:26:00 INFO impl.YarnClientImpl: Submitted application application_1526294000135_0001
18/05/15 06:26:00 INFO mapreduce.Job: The url to track the job: http://test-hadoop-head-0.novalocal:8088/proxy/application_1526294000135_0001/
18/05/15 06:26:00 INFO mapreduce.Job: Running job: job_1526294000135_0001
18/05/15 06:26:11 INFO mapreduce.Job: Job job_1526294000135_0001 running in uber mode : false
18/05/15 06:26:11 INFO mapreduce.Job:  map 0% reduce 0%
18/05/15 06:26:22 INFO mapreduce.Job:  map 67% reduce 0%
18/05/15 06:26:24 INFO mapreduce.Job:  map 100% reduce 0%
18/05/15 06:26:28 INFO mapreduce.Job:  map 100% reduce 100%

По завершении задачи возвращается информация, аналогичная приведенной ниже:


18/05/15 06:26:30 INFO mapreduce.Job: Job job_1526294000135_0001 completed successfully
18/05/15 06:26:31 INFO mapreduce.Job: Counters: 50
     File System Counters
             FILE: Number of bytes read=83
             FILE: Number of bytes written=604291
             FILE: Number of read operations=0
             FILE: Number of large read operations=0
             FILE: Number of write operations=0
             HDFS: Number of bytes read=425
             HDFS: Number of bytes written=37
             HDFS: Number of read operations=12
             HDFS: Number of large read operations=0
             HDFS: Number of write operations=2
     Job Counters
             Launched map tasks=3
             Launched reduce tasks=1
             Data-local map tasks=2
             Rack-local map tasks=1
             Total time spent by all maps in occupied slots (ms)=29484
             Total time spent by all reduces in occupied slots (ms)=6880
             Total time spent by all map tasks (ms)=29484
             Total time spent by all reduce tasks (ms)=3440
             Total vcore-milliseconds taken by all map tasks=29484
             Total vcore-milliseconds taken by all reduce tasks=3440
             Total megabyte-milliseconds taken by all map tasks=15095808
             Total megabyte-milliseconds taken by all reduce tasks=3522560
     Map-Reduce Framework
             Map input records=6
             Map output records=6
             Map output bytes=65
             Map output materialized bytes=95
             Input split bytes=384
             Combine input records=6
             Combine output records=6
             Reduce input groups=4
             Reduce shuffle bytes=95
             Reduce input records=6
             Reduce output records=4
             Spilled Records=12
             Shuffled Maps =3
             Failed Shuffles=0
             Merged Map outputs=3
             GC time elapsed (ms)=619
             CPU time spent (ms)=3810
             Physical memory (bytes) snapshot=1784860672
             Virtual memory (bytes) snapshot=9878134784
             Total committed heap usage (bytes)=1381498880
     Shuffle Errors
             BAD_ID=0
             CONNECTION=0
             IO_ERROR=0
             WRONG_LENGTH=0
             WRONG_MAP=0
             WRONG_REDUCE=0
     File Input Format Counters
             Bytes Read=41
     File Output Format Counters
             Bytes Written=37

Теперь в папке wordcount/output находятся два файла, _SUCCESS и part-r-00000. Чтобы посмотреть результат работы программы WordCount, выполните следующую команду, выводящую в консоль содержимое файла part-r-00000:

hdfs dfs -cat /user/$USER/wordcount/output/part-r-00000

В консоли появится отсортированный в алфавитном порядке список слов и число вхождений каждого из них:

Hadoop  1
Hello   3
MapReduce    1
World   1