技术库 > 网站架构

Hadoop的MultipleOutputs进行多文件输出

技术库:tec.5lulu.com

有时候,我们使用Hadoop处理数据时,在Reduce阶段,我们可能想对每一个输出的key进行单独输出一个目录或文件,这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事, 
下面,先来看下散仙的测试数据:
  1. 中国;我们  
  2. 美国;他们  
  3. 中国;123  
  4. 中国人;善良  
  5. 美国;USA  
  6. 美国;在北美洲  

输出结果:预期输出结果是: 
中国一组,美国一组,中国人一组 
package com.partition.test;  
  
import java.io.IOException;  
  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapred.JobConf;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Partitioner;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  
import com.qin.operadb.PersonRecoder;  
import com.qin.operadb.ReadMapDB;  
   
/*** 
 * @author qindongliang 
 *  
 * 大数据技术交流群:324714439 
 * **/  
public class TestMultiOutput {  
      
      
    /** 
     * map任务 
     *  
     * **/  
    public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{  
          
        @Override  
        protected void map(LongWritable key, Text value,Context context)  
                throws IOException, InterruptedException {  
             String ss[]=value.toString().split(";");  
            context.write(new Text(ss[0]), new Text(ss[1]));      
        }  
          
          
    }  
      
   
     public static class PReduce extends Reducer<Text, Text, Text, Text>{  
         /** 
          * 设置多个文件输出 
          * */  
         private MultipleOutputs mos;  
           
         @Override  
        protected void setup(Context context)  
                throws IOException, InterruptedException {  
              mos=new MultipleOutputs(context);//初始化mos  
        }  
         @Override  
        protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)  
                throws IOException, InterruptedException {  
               
              String key=arg0.toString();  
             for(Text t:arg1){  
                   if(key.equals("中国")){   
                       /** 
                        * 一个参数 
                        * **/  
                       mos.write("china", arg0,t);   
                   } else if(key.equals("美国")){  
                       mos.write("USA", arg0,t);      
                   } else if(key.equals("中国人")){  
                       mos.write("cperson", arg0,t);   
                         
                   }  
           
                 //System.out.println("Reduce:  "+arg0.toString()+"   "+t.toString());  
             }  
                 
               
        }  
           
         @Override  
        protected void cleanup(  
                 Context context)  
                throws IOException, InterruptedException {  
             mos.close();//释放资源  
        }  
           
     }  
       
       
     public static void main(String[] args) throws Exception{  
         JobConf conf=new JobConf(ReadMapDB.class);  
         //Configuration conf=new Configuration();  
        // conf.set("mapred.job.tracker","192.168.75.130:9001");  
        //读取person中的数据字段  
        // conf.setJar("tt.jar");  
        //注意这行代码放在最前面,进行初始化,否则会报  
       
       
        /**Job任务**/  
        Job job=new Job(conf, "testpartion");  
        job.setJarByClass(TestMultiOutput.class);  
        System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
        // job.setCombinerClass(PCombine.class);  
        //job.setPartitionerClass(PPartition.class);  
        //job.setNumReduceTasks(5);  
         job.setMapperClass(PMapper.class);  
           
         /** 
          * 注意在初始化时需要设置输出文件的名 
          * 另外名称,不支持中文名,仅支持英文字符 
          *  
          * **/  
         MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);  
         MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);  
         MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);  
         job.setReducerClass(PReduce.class);  
         job.setOutputKeyClass(Text.class);  
         job.setOutputValueClass(Text.class);  
          
        String path="hdfs://192.168.75.130:9000/root/outputdb";  
        FileSystem fs=FileSystem.get(conf);  
        Path p=new Path(path);  
        if(fs.exists(p)){  
            fs.delete(p, true);  
            System.out.println("输出路径存在,已删除!");  
        }  
        FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");  
        FileOutputFormat.setOutputPath(job,p );  
        System.exit(job.waitForCompletion(true) ? 0 : 1);            
    }   
}  
如果是中文的路径名,则会报如下的一个异常: 
模式:  local  
输出路径存在,已删除!  
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).  
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1  
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001  
INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks  
INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0  
INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null  
INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91  
INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100  
INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720  
INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680  
INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output  
INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0  
INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting  
INFO - LocalJobRunner$Job.statusUpdate(466) |   
INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done.  
INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0  
INFO - LocalJobRunner$Job.run(348) | Map task executor complete.  
INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null  
INFO - LocalJobRunner$Job.statusUpdate(466) |   
INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments  
INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes  
INFO - LocalJobRunner$Job.statusUpdate(466) |   
WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001  
java.lang.IllegalArgumentException: Name cannot be have a '一' char  
    at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160)  
    at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186)  
    at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363)  
    at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348)  
    at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74)  
    at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1)  
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)  
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)  
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)  
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)  
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001  
INFO - Counters.log(585) | Counters: 17  
INFO - Counters.log(587) |   File Input Format Counters   
INFO - Counters.log(589) |     Bytes Read=91  
INFO - Counters.log(587) |   FileSystemCounters  
INFO - Counters.log(589) |     FILE_BYTES_READ=177  
INFO - Counters.log(589) |     HDFS_BYTES_READ=91  
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=71111  
INFO - Counters.log(587) |   Map-Reduce Framework  
INFO - Counters.log(589) |     Map output materialized bytes=105  
INFO - Counters.log(589) |     Map input records=6  
INFO - Counters.log(589) |     Reduce shuffle bytes=0  
INFO - Counters.log(589) |     Spilled Records=6  
INFO - Counters.log(589) |     Map output bytes=87  
INFO - Counters.log(589) |     Total committed heap usage (bytes)=227737600  
INFO - Counters.log(589) |     Combine input records=0  
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112  
INFO - Counters.log(589) |     Reduce input records=0  
INFO - Counters.log(589) |     Reduce input groups=0  
INFO - Counters.log(589) |     Combine output records=0  
INFO - Counters.log(589) |     Reduce output records=0  
INFO - Counters.log(589) |     Map output records=6  
源码中关于名称的校验如下: 
/** 
  * Checks if a named output name is valid token. 
  * 
  * @param namedOutput named output Name 
  * @throws IllegalArgumentException if the output name is not valid. 
  */  
 private static void checkTokenName(String namedOutput) {  
   if (namedOutput == null || namedOutput.length() == 0) {  
     throw new IllegalArgumentException(  
       "Name cannot be NULL or emtpy");  
   }  
   for (char ch : namedOutput.toCharArray()) {  
     if ((ch >= 'A') && (ch <= 'Z')) {  
       continue;  
     }  
     if ((ch >= 'a') && (ch <= 'z')) {  
       continue;  
     }  
     if ((ch >= '0') && (ch <= '9')) {  
       continue;  
     }  
     throw new IllegalArgumentException(  
       "Name cannot be have a '" + ch + "' char");  
   }  
 } 
程序运行成功输出: 
  1. 模式:  192.168.75.130:9001  
  2. 输出路径存在,已删除!  
  3. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
  4. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1  
  5. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
  6. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
  7. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006  
  8. INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
  9. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
  10. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%  
  11. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
  12. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006  
  13. INFO - Counters.log(585) | Counters: 29  
  14. INFO - Counters.log(587) |   Job Counters   
  15. INFO - Counters.log(589) |     Launched reduce tasks=1  
  16. INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=9289  
  17. INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
  18. INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
  19. INFO - Counters.log(589) |     Launched map tasks=1  
  20. INFO - Counters.log(589) |     Data-local map tasks=1  
  21. INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=13645  
  22. INFO - Counters.log(587) |   File Output Format Counters   
  23. INFO - Counters.log(589) |     Bytes Written=0  
  24. INFO - Counters.log(587) |   FileSystemCounters  
  25. INFO - Counters.log(589) |     FILE_BYTES_READ=105  
  26. INFO - Counters.log(589) |     HDFS_BYTES_READ=203  
  27. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=113616  
  28. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=87  
  29. INFO - Counters.log(587) |   File Input Format Counters   
  30. INFO - Counters.log(589) |     Bytes Read=91  
  31. INFO - Counters.log(587) |   Map-Reduce Framework  
  32. INFO - Counters.log(589) |     Map output materialized bytes=105  
  33. INFO - Counters.log(589) |     Map input records=6  
  34. INFO - Counters.log(589) |     Reduce shuffle bytes=105  
  35. INFO - Counters.log(589) |     Spilled Records=12  
  36. INFO - Counters.log(589) |     Map output bytes=87  
  37. INFO - Counters.log(589) |     Total committed heap usage (bytes)=176033792  
  38. INFO - Counters.log(589) |     CPU time spent (ms)=1880  
  39. INFO - Counters.log(589) |     Combine input records=0  
  40. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112  
  41. INFO - Counters.log(589) |     Reduce input records=6  
  42. INFO - Counters.log(589) |     Reduce input groups=3  
  43. INFO - Counters.log(589) |     Combine output records=0  
  44. INFO - Counters.log(589) |     Physical memory (bytes) snapshot=278876160  
  45. INFO - Counters.log(589) |     Reduce output records=0  
  46. INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=1460908032  
  47. INFO - Counters.log(589) |     Map output records=6  

运行成功后,生成的文件如下所示: 

Hadoop的MultipleOutputs进行多文件输出,by 5lulu.com 
china-r-00000里面的数据如下: 
  1. 中国  我们  
  2. 中国  123  
USA-r-00000里面的数据如下: 
  1. 美国  他们  
  2. 美国  USA  
  3. 美国  在北美洲  
cperson-r-00000里面的数据如下: 
  1. 中国人  善良  
在输出结果中,reduce自带的那个文件仍然会输出,但是里面没有任何数据,至此,我们已经在hadoop1.2.0的基于新的API里,测试多文件输出通过。

Hadoop的MultipleOutputs进行多文件输出


本文链接 http://tec.5lulu.com/detail/105dsn2i5p6w68s69.html

我来评分 :6.1
0

转载注明:转自5lulu技术库

本站遵循:署名-非商业性使用-禁止演绎 3.0 共享协议

www.5lulu.com