如何设置增大hadoop里hadoop和mapreducee任务的并发数

大数据之Hadoop(MapReduce)
一 MapReduce入门1.1 MapReduce定义Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架.Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。1.2 MapReduce优缺点1.2.1 优点1)MapReduce 易于编程。它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。2)良好的扩展性。当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。3)高容错性。MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop内部完成的。4)适合PB级以上海量数据的离线处理。这里加红字体离线处理,说明它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce很难做到。1.2.2 缺点MapReduce不擅长做实时计算、流式计算、DAG(有向图)计算。1)实时计算。MapReduce无法像Mysql一样,在毫秒或者秒级内返回结果。2)流式计算。流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。3)DAG(有向图)计算。多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。1)分布式的运算程序往往需要分成至少2个阶段。2)第一个阶段的maptask并发实例,完全并行运行,互不相干。3)第二个阶段的reduce task并发实例互不相干,但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出。4)MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行。1.4 MapReduce进程一个完整的mapreduce程序在分布式运行时有三类实例进程:1)MrAppMaster:负责整个程序的过程调度及状态协调。2)MapTask:负责map阶段的整个数据处理流程。3)ReduceTask:负责reduce阶段的整个数据处理流程。1.5 MapReduce编程规范(八股文)用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)1)Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(maptask进程)对每一个&K,V&调用一次2)Reducer阶段
(1)用户自定义的Reducer要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)Reducetask进程对每一组相同k的&k,v&组调用一次reduce()方法3)Driver阶段整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象4)案例实操
详见7.1.1统计一堆文件中单词出现的个数(WordCount案例)。1.6 MapReduce程序运行流程分析1)在MapReduce程序读取文件的输入目录上存放相应的文件。2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算7)map()运算完毕后将KV对收集到maptask缓存。8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。二 Hadoop序列化2.1 为什么要序列化?
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。2.2 什么是序列化?序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。 反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。2.3 为什么不用Java的序列化?
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。2.4 为什么序列化对Hadoop很重要?
因为Hadoop在集群之间进行通讯或者RPC调用的时候,需要序列化,而且要求序列化要快,且体积要小,占用带宽要小。所以必须理解Hadoop的序列化机制。
序列化和反序列化在分布式数据处理领域经常出现:进程通信和永久存储。然而Hadoop中各个节点的通信是通过远程调用(RPC)实现的,那么RPC序列化要求具有以下特点:1)紧凑:紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资2)快速:进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的;3)可扩展:协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文;4)互操作:能支持不同语言写的客户端和服务端进行交互; 2.5 常用数据序列化类型常用的数据类型对应的hadoop数据序列化类型Java类型
Writable类型 boolean
BooleanWritable byte
ByteWritable int
IntWritable float
FloatWritable long
LongWritable double
DoubleWritable string
MapWritable array
ArrayWritable 自定义bean对象实现序列化接口(Writable)1)自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下7项。(1)必须实现Writable接口(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造(3)重写序列化方法(4)重写反序列化方法(5)注意反序列化的顺序和序列化的顺序完全一致(6)要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用(7)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序// 1 必须实现Writable接口
public class FlowBean implements
Writable {
private long upF
private long downF
private long sumF
//2 反序列化时,需要反射调用空参构造函数,所以必须有
public FlowBean() {
* 3重写序列化方法
* @param out
* @throws IOException
public void
write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
重写反序列化方法
5 注意反序列化的顺序和序列化的顺序完全一致
* @param in
* @throws IOException
public void
readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
// 6要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用
public String toString() {
return upFlow +
"\t" + downFlow + "\t" + sumF
//7 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序
public int
compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow
& o.getSumFlow() ? -1 : 1;
} 2)案例实操
详见7.2.1统计每一个手机号耗费的总上行流量、下行流量、总流量(序列化)。三 MapReduce框架原理MapReduce工作流程1)流程示意图2)流程详解上面的流程是整个mapreduce最全工作流程,但是shuffle过程只是从第7步开始到第16步结束,具体shuffle过程详解,如下:1)maptask收集我们的map()方法输出的kv对,放到内存缓冲区中2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件3)多个溢出文件会被合并成大的溢出文件4)在溢出过程中,及合并的过程中,都要调用partitioner进行分区和针对key进行排序5)reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据6)reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)7)合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)3)注意Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整,参数:io.sort.mb
默认100M3.2 InputFormat数据输入MapReduce任务的输入文件一般是存储在HDFS里面。输入的文件格式包括:基于行的日志文件、二进制格式文件等。这些文件一般会很大,达到数十GB,甚至更大。那么MapReduce是如何读取这些数据的呢?下面我们首先学习InputFormat接口。3.2.1 InputFormat接口实现类InputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。1)TextInputFormatTextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。以下是一个示例,比如,一个分片包含了如下4条文本记录。Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the
enterprise 每条记录表示为以下键/值对:(0,Rich learning form)
(19,Intelligent learning engine)
(47,Learning more convenient)
(72,From the real demand for more close to the
enterprise) 很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。2)KeyValueTextInputFormat每一行均为一条记录,被分隔符(缺省是tab(\t))分割为key(Text),value(Text)。可以通过mapreduce.input.keyvaluelinerecordreader.key.value,separator属性来设定分隔符。它的默认值是一个制表符。以下是一个示例,输入是一个包含4条记录的分片。其中——&表示一个(水平方向的)制表符。line1 ——&Rich learning form
line2 ——&Intelligent learning engine
line3 ——&Learning more convenient
line4 ——&From the real demand for more
close to the enterprise 每条记录表示为以下键/值对:(line1,Rich learning form)
(line2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to
the enterprise)
此时的键是每行排在制表符之前的Text序列。 3)NLineInputFormat默认情况下在对输入文件进行拆分时,会按block块的大小分成多个InputSplit,InputSplit的数量取决于block的大小。每个map进程处理一个InputSplit,InputSplit中有多少行记录就会调用多少次map函数。如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NlineInputFormat指定的行数N来划分。即,每个InputSplit中只有N行记录数。同样InputSplit中有多少行记录就会调用多少次map函数。以下是一个示例,仍然以上面的4行输入为例。Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the
enterprise
例如,如果N是2,则每个输入分片包含两行。一个mapper收到前两行键值对:(0,Rich learning form)
(19,Intelligent learning engine) 另一个mapper 则收到后两行:(47,Learning more convenient)
(72,From the real demand for more close to the
enterprise)
这里的键和值与TextInputFormat生成的一样。3.2.2 自定义InputFormat1)概述(1)自定义一个类继承FileInputFormat(2)改写RecordReader,实现一次读取一个完整文件封装为KV(3)在输出时使用SequenceFileOutPutFormat输出合并文件2)案例实操
详见7.5小文件处理(自定义InputFormat)。切片机制1)job提交流程源码详解waitForCompletion()submit();// 1建立连接
connect();
// 1)创建提交job的代理
new Cluster(getConfiguration());
// (1)判断是本地yarn还是远程
initialize(jobTrackAddr,conf);
// 2 提交jobsubmitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的Stag路径
Path jobStagingArea =JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取jobid ,并创建job路径
JobID jobId =submitClient.getNewJobID();
// 3)拷贝jar包到集群copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job,jobSubmitDir);// 4)计算切片,生成切片规划文件writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);// 5)向Stag路径写xml配置文件writeConf(conf, submitJobFile);
conf.writeXml(out);// 6)提交job,返回提交状态status = submitClient.submitJob(jobId,submitJobDir.toString(), job.getCredentials());2)FileInputFormat源码解析(input.getSplits(job))(1)找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件
(3)遍历第一个文件ss.txt
a)获取文件大小fs.sizeOf(ss.txt);
b)计算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128Mc)默认情况下,切片大小=blocksize
d)开始切,形成第1个切片:ss.txt—0:128M第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成。g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。h)注意:block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。
(4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。3)FileInputFormat中默认的切片机制:(1)简单地按照文件的内容长度进行切片(2)切片大小,默认等于block大小(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片比如待处理数据有两个文件:file1.txt
10M 经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1--
file1.txt.split2--
file1.txt.split3--
file2.txt.split1--
0~10M 4)FileInputFormat切片大小的参数配置(1)通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize,Math.min(maxSize, blockSize));
切片主要由这几个值来运算决定mapreduce.input.fileinputformat.split.minsize=1默认值为1mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue因此,默认情况下,切片大小=blocksize。maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。5)获取切片信息API// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit)
context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName(); 3.2.4 CombineTextInputFormat切片机制关于大量小文件的优化策略1)默认情况下TextInputformat对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个maptask,这样如果有大量小文件,就会产生大量的maptask,处理效率极其低下。2)优化策略
(1)最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到HDFS做后续分析。
(2)补救措施:如果已经是大量小文件在HDFS中了,可以使用另一种InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask。
(3)优先满足最小切片大小,不超过最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job,4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job,2097152);// 2m
举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m3)具体实现步骤// 9 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job,
4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job,
2097152);// 2m 4)案例实操大量小文件的切片优化(CombineTextInputFormat)。
没有更多推荐了,【hadoop】 3001-mapreduce并发框架思想
简述:MapReduce是什么?
1、MapReduce工作机制
它可以将计算任务拆分成大量可以独立运行的子任务,接着并行运算,另外会有一个系统调度的架构负责收集和汇总每个子任务的分析结果。其中 包含映射算法与规约算法。
MapReduce的主体是两个函数Map()和Reduce(),Map负责清洗数据,Reduce负责数据分析并输出最终结果,而且这两个功能之间并非一对一的关系,可以根据具体业务选择匹配关系。
如图是MapReduce的内部计算步骤
2、Map函数
输入:键值关系的数据队列,键是每段内容开头的偏移量。
处理:从输入中抽取出自定义的关键字段。这个处理过程可以很简单,也可以很复杂。
输出:键值关系的数据队列,通常是保存在硬盘上,而不是HDFS中。因为这个数据集只是个中间过程,计算结束时需要被删除。
3、Reduce函数
输入:Map的输出结果经过MapReduce框架处理之后分发给Reduce函数,因为通常一个Reduce函数需要拿到完整的数据集之后才能开始分析。
处理:这一步的分析处理将是最为艰难和富有价值的环节。根据不同业务指标定义处理函数。
输出:输出自定义的格式文件,并且保存在HDFS上。
3、Combiner函数
输入:Map的输出结果未经过MapReduce框架处理之后直接传送给Combiner函数。
处理:Combiner函数着手做合并归类和排序等处理,经过处理之后,数据集大大缩小。避免在shuffle阶段进行数据的Group和Partition。
输出:这时的输出结果才传送给MapReduce架构处理中心。
解决问题:减少带宽传输压力!
一、基本概念
进程:同一数据在不同进程之间并发进行,并发执行可以加快整体速度
线程:可以解决同步和异步的问题,但不能增加处理的速度
二、普通JAVA程序统计文本数据
读取每行数据,采用HashMap存放每个单词对应的个数
=&新建文本wordcount.txt
hello world
=&程序处理结果
=&程序处理伪代码
v = get(k) ; if(v!=null){ v++;}; &k,v)
三、海量数据可以采用MapReduce并发执行,用于提高速度
1、分两步处理数据
Map: 不同进程之间并发执行
Reduce: 对Map的数据进行汇总
2、Mapreduce概述
2.1 Mapreduce是一种分布式计算模型,google提出,主要用于搜索领域,解决海量数据的计算问题
2.2 MR由两个阶段组成:Map和Reduce,用户只要实现map()和reduce()两个函数
2.3 Mapreduce原理
&k1,v1&---&Map --&&k2,v2& ---&Group[K2,{v21,v22,v23},K1,{v11,v12,v13}], Partition--&Reduce--&k3,v3&
其中: Group为分组,Partition按照k进行排序
3、统一文本文件中每个单词出现的个数
需求内容:
新建一个文件命名wordcount.txt,其中的内容为
hello world
how are you
my name is shenfl
I am a new student this term
伪代码统计程序:
* @param key : 偏移量
* @param value: 每一行数据
map(key,value,context){
String[] vs = value.split(",");
for(String v:vs){
context.write(v,1)
* @param key: 经过k分组后内容
reduce(key,valueList,context){
int count = 0;
for(int i: valueList){
context.write(key,count);
示例图方式模拟MR的map和reduce两个阶段,体现MR的并发框架的计算模型
没有更多推荐了,精于此道、乐于此道
Hadoop之MapReduce的两种任务模式
MapReduce按照任务大小和设置的不同,提供了两种任务模式:
客户端通过org.apache.hadoop.mapreduce.protocol.ClientProtocol与服务端通信,ClientProtocol的继承关系:
老一些的版本还有一个JobTracker的实现类,即:classic。用于和MapReduce1.X兼容用的,高一些的版本已经没有这个实现类了。
一,本地模式(LocalJobRunner实现)
mapreduce.framework.name设置为local,则不会使用YARN集群来分配资源,在本地节点执行。在本地模式运行的任务,无法发挥集群的优势。注:在web UI是查看不到本地模式运行的任务。
二,Yarn模式(YARNRunner实现)
mapreduce.framework.name设置为yarn,当客户端配置mapreduce.framework.name为yarn时, 客户端会使用YARNRunner与服务端通信, 而YARNRunner真正的实现是通过ClientRMProtocol与RM交互, 包括提交Application, 查询状态等功能。但是根据任务的特性,分为两种方式执行任务:
1,uber mode:
Uber模式是Hadoop2.0针对MR小作业的优化机制。通过mapreduce.job.ubertask.enable来设置是否开启小作业优化,默认为false。
如果用Job足够小,则串行在的一个JVM完成该JOB,即MRAppMaster进程中,这样比为每一个任务分配Container性能更好。
那么什么才是足够小的Job呢?下面我们看看一些的参数(mapred-site.xml):
mapreduce.job.ubertask.maxmaps 最大的map数。默认值9mapreduce.job.ubertask.maxreduces 最大的reduce数,默认为1mapreduce.job.ubertask.maxbytes 最大的字节数,如果没有指定,默认和dfs.block.size一样。
应用程序的其他配置也会影响到对“小”的定义,yarn.app.mapreduce.am.resource.mb必须大于mapreduce.map.memory.mb和mapreduce.reduce.memory.mb,还有yarn.app.mapreduce.am.resource.cpu-vcores必须大于mapreduce.map.cpu.vcores 和 mapreduce.reduce.cpu.vcores,以下是这个配置的说明:
yarn.app.mapreduce.am.resource.mb
MR AppMaster需要的内存数,默认为1536mapreduce.map.memory.mb
从调度器(scheduler)为每个Map Task请求的内存数,默认1024mapreduce.reduce.memory.mb
从调度器(scheduler)为每个Reduce Task请求的内存数,默认1024yarn.app.mapreduce.am.resource.cpu-vcores MR AppMaster需要的虚拟CPU核数,默认为1536mapreduce.map.cpu.vcores 从调度器(scheduler)为每个Map Task请求的虚拟CPU核数,默认1mapreduce.reduce.cpu.vcores
为每个Map Reduce请求的虚拟CPU核数,默认1
链式Job也不能使用Uber模式执行,即使满足了上面的情况也不能。因为链式作业会并发执行不同资源需求的map task和reduce task。链式Job是指集成了org.apache.hadoop.mapreduce.lib.chain.ChainReducer和org.apache.hadoop.mapreduce.lib.chain.ChainMapper类的用户Map或Reduce程序。
yarn.app.mapreduce.am.resource.mb和yarn.app.mapreduce.am.resource.cpu-vcores是在yarn框架的级别,其他四个关于内存和CPU的配置是和具体每个Mapreduce任务有关,如果Mapreduce所需的资源大于Yarn框架定义的资源数量,则不能当成“小”Job使用uber mode执行了。
2,Non-Uber mode:
Uber只能执行一小部门的任务,在大数据环境下,大部分任务仍然运行在Non-Uber模式下,MRAppMaster将一个作业的map task和reduce task分为四种状态:
pending:刚启动但尚未向ResourceManager发送资源请求
scheduled:已经向ResourceManager发送资源请求,但尚未分配到资源
assigned:已经分配到了资源且正在运行
completed:已经运行完成。
MRAppMaster初始化之后,会产生一系列的Map Task和Reduce Task。
Map Task的生命周期是:
scheduled-&assigned-&completed
Reduce Task的生命周期是:pending-&scheduled-&assigned-&completed
上面我们可以看到,Reduce Task比Map Task多一个pending的状态,主要是因为Reduce Task需要依赖Map Task的输出,为了防止Reduce Task启动过早造成资源浪费,MRAppMaster让刚启动的Reduce Task处于pending状态,这样可以根据Map Task的运行情况和具体的配置来调整Reduce Task状态(pengding到scheduled中相互转移),以下几个参数是有来配置Reduce Task的启动时机的:
mapreduce.job.reduce.slowstart.completedmaps
map task完整了多少比率才开始为reduce task生成资源yarn.app.mapreduce.am.job.reduce.rampup.limit
在maps task已经完成,启动reduce task的比率。默认为0.5
org.apache.hadoop.mapreduce.MRJobConfig:
* Limit reduces starting until a certain percentage of maps have finished.
Percentage between 0.0 and 1.0
public static final String MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT =
MR_AM_PREFIX
+ "job.reduce.rampup.limit";
public static final float DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT = 0.5f;
yarn.app.mapreduce.am.job.reduce.preemption.limit
当map task不能申请资源时,map task最多可以抢占reduce task资源的比率。默认为0.5
org.apache.hadoop.mapreduce.MRJobConfig:
* Limit on the number of reducers that can be preempted to ensure that at
least one map task can run if it needs to. Percentage between 0.0 and 1.0
public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT =
MR_AM_PREFIX
+ "job.reduce.preemption.limit";
public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
没有更多推荐了,

我要回帖

更多关于 hadoop的mapreduce 的文章

 

随机推荐