Ever tried , Ever failed !
No matter ,Try again ! Fail again , Fail better !

MapReduce详解及WordCount程序演示

重温Hadoop2.X 重要组件

HDFS:分布式文件系统

MapReduce:分布式运算编程框架

YARN: Hadoop 的资源调度系统(Hadoop 2.x出现,且为核心)

Common:以上三大组件底层支撑,提供基础工具包和RPC框架

一、什么是 MapReduce?

1.MapReduce

是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架

2.MapReduce 核心功能

是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架

3.MapReduce 核心功能

  • 自带默认组件
  • 用户编写的业务逻辑代码
  • 整合成一个完整的分布式运算程序,并发运行在Hadoop集群上

二、为什么需要 MapReduce?

  • 海量数据单机处理,硬件资源受限(硬盘读写速度、CPU处理速度等),所以无法胜任
  • 单机版扩成分布式版,增加程序复杂度和开发难度
  • 为了提高开发效率,将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。
  • MapReduce就是这样一个分布式运算框架

三、MapReduce 程序运行演示

1.pi 程序

启动 HDFS 和 YARN 集群

start-dfs.sh
strat-yarn.sh

集群任意节点执行程序

hadoop jar hadoop-mapreduce-examples-2.6.5.jar pi 10 10 

此 jar 包存放在 Hadoop 安装目录下的/share/hadoop/mapreduce/目录里

2.wordcount 程序

启动 HDFS 和 YARN 集群

start-dfs.sh
strat-yarn.sh

源数据上传至HDFS

集群任意节点执行程序

hadoop jar hadoop-mapreduce-examples-2.6.5.jar wordcount 输入路径 输出路径

四、MapReduce 示例编写及编程规范

1.编码规范

(1)Mapper类

  • 继承 Mapper 类
  • 输入数据是<K,V>对形式(KV 的类型可自定义)
  • 输出数据是<K,V>对形式(KV 的类型可自定义)
  • 业务逻辑写在map()方法中
  • MapTask 进程对每一个<K,V>调用一次map()方法

(2)Reducer

  • 继承Reducer类
  • 输入数据类型对应Mapper的输出数据类型,也是<K,V>对形式
  • 输出数据是<K,V>对形式(KV 的类型可自定义)
  • 业务逻辑写在reduce()方法中
  • ReduceTask 进程对每一组相同K的<K,V>组调用一次reduce()方法

(3)Driver(提交运行MR程序的客户端)

配置程序的运行信息,提交 job 对象

2.WordCount 示例程序编写

( 1 )WordCount 的业务逻辑:

MapTask 阶段:

处理每个数据分块的单词统计分析,思路是将每一行文本拆分成一个个的单词,每遇到一个单词则把其转换成一个 key-value 对,比如单词 hello,就转换成<’hello’,1>发送给 ReduceTask 去汇总。

ReduceTask 阶段:

将接收 MapTask 的结果,按照 key 对 value 做汇总计数

流程图:

下面将演示Wordcount程序的代码示例

需要处理的数据:word.txt

shiny angel mark cendy
cendy shiny mark
mark judy
tina shiny
shiny

需要导入的包:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

首先是编写Map端的代码

public class WordCountDemo {
    /**
     * KEYIN:是指框架读取到的数据的key类型,在默认的InputFormat中,读取到的key是一行文本的起始偏移量,所以应该是Long类型(LongWritable)
     * VALUEIN:是指框架读取到的数据的value类型,在默认的InputFormat中,读取到的value是一行文本的内容,所以应该是String类型(Text)
     * KEYOUT:是指用户自定义业务逻辑方法中输出数据的key的类型,由用户自定义业务逻辑决定,在此WordCount代码中,输出的key是每个单词,所以是String类型(Text)
     * VALUEOUT:是指用户自定义业务逻辑方法中输出数据的value的类型,由用户自定义业务逻辑决定,在此WordCount代码中,输出的value是单词出现的次数1,所以是Integer(IntWritable)
     * 
     * Long、Integer、String都是jdk自带的数据类型,在序列化的时候效率比较慢,为了提高序列化的效率,Hadoop自定义了一套独有的序列化框架
     * 所以,在写Hadoop程序的时候,若是数据需要实现序列化(写磁盘、网络传输等),必须实现Hadoop自定义序列化框架中的数据类型
     * Long------>LongWritable
     * Integer---->IntWritable
     * Null------>NullWritable
     * String---->Text
     * Map的个数默认等于切片的个数,默认按照blocksize(128M)进行切片
     */

    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        /*
         * Map阶段用户自定义业务逻辑代码需要写在map()方法中
         * MapTask对每一行输入的数据调用一次map()方法 
         * Context对象的作用是用来传输数据的
         */
        //将Java的数据类型转换成Hadoop的数据类型
        //第一种
        Text key_word=new Text();
        //第二种(将Java数据类型的整型1转换成Hadoop数据类型的IntWritable)
        IntWritable value_count=new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            //第一种:将Hadoop数据类型转换成Java的数据类型,获取一行文本的内容
            String line=value.toString();
            //将每行文本按照分隔符进行分隔
            String[] words=line.split(" ");
            //对数组进行遍历
            for(String word:words){
                //使用set()方法将String类型的数据转换成Hadoop的Text类型
                key_word.set(word);
                //将单词作为key,将次数1作为value,分发给Reduce端
                context.write(key_word, value_count);
            }
        }
    }

进入Reducer之前在Map端对每个MapTask的数据按照key进行局部排序(字典顺序),有助于相同key的key-value值分发到相同的reduce中
默认的ReduceTask的个数为1,可以使用代码显示的设定ReduceTask的个数

Reduce端的输入 KEYIN, VALUEIN 对应Map端的输出 KEYOUT, VALUEOU

KEYOUT:用户自定义reduce端的处理结果的输出key的值,也就是整个程序最终输出的key的值,即每个单词,也就是String类型

VALUEOUT:用户自定义reduce端的处理结果的输出value的值,也就是整个程序最终输出的value的值,即每个单词的总次数,也就是Integer类型

编写reduce代码:

public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        /*
         * Reduce端的用户自定义业务逻辑写在reduce()方法中
         * 
         * ReduceTask调用reduce()方法之前:
         * (1)对传入的kv对按照key进行分组排序(全局排序)
         * <hdfs,1><hdfs,1><hdfs,1>
         * <mapreduce,1><mapreduce,1>
         * 
         * (2)将某一组kv中的第一个kv对中的key传给reduce()方法的变量key,将相同key的value放入迭代器中传给reducde()方法的变量values
         * <hdfs,list(1,1,1)>
         * <mapreduce,list(1,1)>
         * ReduceTask对每一组kv对启动一个reduce()方法
         */

        IntWritable value_sum=new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            //对总次数进行统计
            int sum=0;
            //对values进行遍历
            for(IntWritable num:values){
                //第二种:将Hadoop数据类型转换成Java的数据类型(对于除了String类型的其他类型)
                sum+=num.get();//sum=sum+num.get();
            }
            //将Java数据类型转换成Hadoop数据类型
            value_sum.set(sum);
            //将最终结果输出到hdfs文件系统中
            context.write(key, value_sum);
        }
    }

Main 方法:

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //创建配置信息类
        Configuration conf=new Configuration();
        //新建一个Job任务
        Job job=Job.getInstance(conf);
        //指定Mapper类和Reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //指定maptask的key-value对的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定reducetask的key-value对的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定mr程序输入输出数据路径
        Path inpath=new Path("D:\\wordcount\\input");
        Path outpath=new Path("D:\\wordcount\\output");
        FileInputFormat.setInputPaths(job, inpath);
        FileOutputFormat.setOutputPath(job, outpath);

        //提交任务job,等待mr程序运行完成打印反馈信息
        boolean waitForCompletion = job.waitForCompletion(true);
        // 退出程序
        System.exit(waitForCompletion?0:1);

    }

以上代码可以在windows本机上运行,如果要在yarn集群上运行,在main()方法中加入以下代码,打包上传到集群上即可

        //yarn集群上运行
        conf.set("fs.defaultFS","hdfs://hadoop01:9000");
        conf.set("mapreduce.framework.name","yarn");
        conf.set("yarn.resourcemanager.hostname","hadoop01");
        System.setProperty("HADOOP_USER_NAME","lemenness");
        //新建一个Job任务
        Job job = Job.getInstance(conf);
        //指定jar包的路径
        job.setJarByClass(wordcountDemo.class);
        //指定combiner
        job.setCombinerClass(WordCountReducer.class);

运行结果:

angel   1
cendy   2
judy    1
mark    3
shiny   4
tina    1

完整代码:


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class wordcountDemo { public static class WordCountMapper extends Mapper<LongWritable ,Text,Text,IntWritable> { //第一种 Text key_word=new Text(); //第二种(将java数据类型的整型1转换成hadoop数据类型的IntWritable) IntWritable value_count=new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将hadoop数据类型转换成java的数据类型,获取一行文本的内容 String line=value.toString(); //将每行文本按照分隔符进行分割 String[] words = line.split(" "); //对数组进行遍历 for (String word:words){ //使用set方法将String类型转换成hadoop的Text类型 key_word.set(word); //将单词作为key,将次数1作为value,分发给Reduce context.write(key_word,value_count); } } } public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ IntWritable value_sum=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //对总次数进行统计 int sum=0; //对values进行遍历 for (IntWritable num:values){ //将hadoop数据类型转换成java类型(对于除了String类型的其他类型) sum+=num.get(); } //将java数据类型转换成Hadoop数据类型 value_sum.set(sum); //将最终结果输出到hdfs系统中 context.write(key,value_sum); } } public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { //创建配置信息类 Configuration conf =new Configuration(); //yarn集群上运行 conf.set("fs.defaultFS","hdfs://hadoop01:9000"); conf.set("mapreduce.framework.name","yarn"); conf.set("yarn.resourcemanager.hostname","hadoop01"); System.setProperty("HADOOP_USER_NAME","lemenness"); //新建一个Job任务 Job job = Job.getInstance(conf); //指定jar包的路径 job.setJarByClass(wordcountDemo.class); //指定combiner job.setCombinerClass(WordCountReducer.class); //指定Mapper类和Reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //指定MapTask的key-value对的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定ReduceTask的key-value对的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定mr程序输入输出数据路径 Path inpath=new Path("/wordcount/input"); Path outpath=new Path("/wordcount/output"); //获取fs对象 FileSystem fs=FileSystem.get(conf); //判断是否存在output if (fs.exists(outpath)){ //删除hdfs路径 fs.delete(outpath,true); } FileInputFormat.setInputPaths(job, inpath); FileOutputFormat.setOutputPath(job, outpath); //提交任务job,等待mr程序运行完成打印反馈信息 boolean waitForCompletion = job.waitForCompletion(true); //退出程序 System.exit(waitForCompletion?0:1); } }

3.MapReduce 程序运行模式

( 1 )本地运行模式

输入输出在windows本地

  • 相关配置参数设置(默认)
conf.set("mapreduce.framework.name", "local");
conf.set("fs.defaultFS", "file:///");
  • 输入输出路径设置
//windows路径
Path inPath=new Path("F:/input");
Path outpath=new Path("F:/output");

输入输出在HDFS

  • 相关配置参数设置
conf.set("fs.defaultFS", "hdfs://master:9000");
conf.set("mapreduce.framework.name", "local");
  • 输入输出路径设置
//HDFS路径
Path inPath=new Path("/wordcount/input");
Path outpath=new Path("/wordcount/output");

( 2 )集群运行模式

打jar包,提交job到集群

  • 相关配置参数设置
//搭建集群时,配置文件中都已设置,故代码中可不书写
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "resourcemanager节点");
conf.set("fs.defaultFS", "hdfs://namenode节点:9000");
  • Linux中提交job步骤

    1.打成jar包上传至Linux服务器

    2.使用hadoop jar 命令将代码提交到yarn集群运行

    3.输入输出路径都应位于 HDFS 上

windows中的eclipse可以直接提交job到集群,但需要修改源码,较复杂,不建议使用

赞(1) 打赏
未经允许不得转载:Mr. Almost的个人博客 » MapReduce详解及WordCount程序演示

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

谢谢老板~

支付宝扫一扫打赏

微信扫一扫打赏