`

google mapreduce杂谈

 
阅读更多

 

 

0 摘要

 

mapreduce是一个编程模型也是一个算法模型的相关实现。

这个系统运行时只关心:

如何分割数据,在大量计算机组成的集群上调度,  集群在计算机中的错误处理,  管理集群中计算机之间的必要通讯。

 

google集群中每天都有1000多个mapreduce程序在执行。

 

 

1 介绍

 

 处理大量原始数据:  文档抓取, web请求日志,计算各种类型的衍生数据(倒排索引 web文档的图结构各种表示形式 每台主机网爬数据抓取页面数量汇总 每天被请求最多的查询集合) , 基于运算数据量很大,

并在成百上千台机器上执行, 并行 分发 错误处理, 造成了简单的运算变得非常复---- mr出现前的背景

 

提出一个模型, 只要表述我们想要执行的简单运算即可, 而并行计算,容错,数据分布,负载均衡等复杂细节都封装在一个库里,最终设计出来的mapreduce来灵感自于Lisp和其他的函数式语言Map和Reduce原语。

 

 

 

 

2 编程模型   没啥可写的  可以下载附件JAR查看

2.1 例子

2.2 类型

2.3 更多例子

 

 

3 实现

 

 环境包括:

a x86架构 linux系统 双处理器 2-4G内存

b 普通网络硬件设备,每个机器带宽为百兆/千兆,但远小于网络平均带宽的一半

c 集群有成百上千台机器

d 存储为廉价IDE硬盘

e 用户提交工作(job)给调度系统,每个工作(job)包含一系列任务(task),调度系统将这些任务调度到集群中

多台可用机器上(

即: 比如你写好一个单词计数案例 main函数中org.apache.hadoop.mapreduce.Job.job.waitForCompletion(true);提交这个job给 mapreduce集群,这个集群会分配单词计数一个 jobid _1072,_1072在执行时候会拆分成12个maps task和3个reduces,task)

 

3.1 执行概况

 通过将Map调用输入数据a自动分割为M个数据片段的集合,Map调度被分布到多台机器上执行,

输入的数据a能够在不同机器上并行处理,

使用分区函数将Map调用产生的中间key值分成R个不同分区(eg hash(key) mod R), 这样不同key下的map中间值会被分配到不同reduce中,reduce调用也被分配到多台机器上执行(分区数量R和分区函数由用户来指定)

 

如下图:



 

 

具体解释如下,  下面序号和上图标号一一对应:

1 用户程序首先调用MapReduce 库将文件分成M个数据片,每个数据片段大小 一般从 16M到64M(可以通过可选参数来控制每个数据片段大小),然后用户程序在集群中创建大量程序副本

 

所谓的worker其实就是获取了 程序员写好的job 代码程序的节点服务器

 

2 这些程序副本有一个特殊的程序-master,  副本中其他程序都是worker程序,由master分配任务,有M个map任务和R个Reduce任务将被分配, master将一个map任务或reduce任务分配给一个空闲worker,

如果当前空闲worker没有要处理的数据片,将会从别的副本中拷贝或者读取过来执行(是拷贝还是读取还不清楚,个人理解)

 

3 被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出 key/value pair,

然后key/value pair 传递给用户自定义的map函数, 由map函数生成并输出中间 key/value pair,并缓存到内存

 

4 缓存中的 key/value pair 通过分区函数分成R个分区,之后周期性的写入到本地磁盘上(shffle过程),

缓存的 key/value pair在本地磁盘的存储位置将被回传给master, 由master负责把这些存储位置在传送给 reducer worker.

 

5 当reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从map worker所在主机

磁盘上读取这些缓存数据,当reduce worker读取了所有中间数据后,通过对key进行排序 使得具有相同key

值的数据聚合在一起。

 

6 Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户定义的Reduce函数,Reduce函数的输出被追加到所属分区的输出文件。

 

7 当所有的map reduce任务完成后,master唤醒用户程序,此时,用户程序对MapReduce调用才返回。

 

成功完成任务后,MapReduce的输出存放在R个输出文件中,输出文件名可以由用户指定,

一般情况下,用户不需要将这R个输出文件合并成一个文件,他们经常把这些文件作为另一个MapRedece的输入,或者在另一个可以处理多个分割文件的分布式应用中使用。

 

3.2 Master数据结构 (Master节点不仅存储hdfs名字空间 对应关系 副本 同时也存储mr任务下worker机器状态)

 

Master持有一些数据结构,它存储每一个Map和Reduce任务的状态(空闲|工作中|完成),以及worker机器(非空闲任务的机器)的标识。

 

Master更像一个数据管道,map任务处理的中间文件存储区域的位置信息通过这个管道从Map传递到Reduce, 对于每个已经完成的Map任务, master存储了Map任务产生的R个中间文件存储区域的大小和位置,当Map任务完成时, Master接收到位置和大小更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。

 

3.3 容错

 

3.3.1 worker故障:

 

 master周期性的ping每个worker, 如果在一个约定时间范围内没有收到worker返回信息,master将把这个worker标记为失效,所有由这个失效的worker完成的Map任务被重设为初始的空闲状态,之后这些被设置的任务被安排给其他worker。

 

当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,

因此必须重新执行, 而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行

 

当一个map任务首先被work A执行,之后由于work A失效了又被调度到work B执行,重新执行的动作会被

通知给所有执行Reduce任务的worker, 任何还没有从work A读取数据的Reduce任务将从work B读取数据。

 

在大规模worker失效情况下,在一个job任务执行期间,在正在运行的集群上进行网络维护引起80台机器在几分钟内不可访问了,Mapreduce master只需要简单的再次执行那些不可访问的worker完成的工作,之后继续执行未完成的任务,直到这个mapreduce操作完成。

 

3.3.2 master失效:

 

让master周期性的将上面数据结构写入磁盘(checkpoint),如果这个机器master任务失效了,可以从最后一个checkpoint开始启动另一个master进程。 而在之前master执行的Mapreduce运算,则终止掉,客户在新的master节点上从新执行。

 

 

 3.3.3 在失效方面的处理机制:  没看出啥道道来

 

 

3.4 存储位置

 

在计算机运行环境中,网络带宽是一个相当匮乏的资源,我们通过尽量把输入数据存储在集群中机器本地磁盘上来节省网络带宽。 mapreduce的master在调度map任务时会考虑输入文件的位置,尽量将一个map任务

调度在包含相关输入输入拷贝的机器上执行如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行map任务, 当一个足够大的cluster集群上运行大型mapreduce操作时,大部分的输入数据都能从本地寄去读取,因此消耗非常少的网络带宽。

 

  1. map-reduce框架上,并非每个数据都是在本地处理的,如果不是本地处理,那么数据复制是  
  2. 不可避免的, 没有任何理想的局面,数据分割也必然不是理想的平均切分

这个具体看ma-大数据mapreduce思想和数据切割 数据和代码的协同工作(data/code co-location)这段介绍

 

 

3.5 任务粒度

 

一个job产生的 map task, reduce task的个数取值都有一定的客观限制,master必须执行 o(M+R)次调度,

并且在内存中保存 O(M*R)个状态, 大概每对Map任务/Reduce任务1个字节就可以了,对内存空间影响还是比较小的。

 

 

 

3.6 备用任务

 

影响整体MapReduce总执行时间最通常因素是 落伍者, 出现落伍者的原因很多,比如一台机器硬盘出现问题,读取时要经常进行读取纠错操作,导致读取数据速度从 30M/s 降低到 1M/s,  如果cluster集群的调度系统在这台机器上又调度了其他的任务, 由于cpu,内存,本地硬盘和网络带宽等竞争因素的存在,导致mapreduce代码执行效率更加缓慢,或者比如机器初始化代码出现问题bug, 导致关闭了处理器的缓存,那么这些机器上执行任务的性能和正常情况相差上百倍。 亦或者出现了数据倾斜

 

下面这一段没看懂

我们通过通用机制来减少落伍者出现的情况,当一个mapreduce操作接近完成的时候,master调度备用任务进程来执行剩下的,处于处理中状态的任务,无论是最初的执行进程,还是备用任务进程完成了任务,

我们都把这个任务标记为已经完成的,通过调优这个机制,通常只会占用比正常操作多几个百分点的计算资源,这种机制能大大减少Mapreduce操作总处理时间,再关掉备用任务情况下要多花44%的时间完成排序任务。

 

 

 

4 技巧

 

4.1 分区函数

 

一个缺省的分区函数使用hash方法, 比如  hash(key) mod R 进行分区,hash方法能产生非常平衡的分区,

当然可以重改这个方法让用户定制分区以达到业务目的。

 

4.2 顺序保证

 

在分区中,中间 key/value  pair数据处理是按照key增量顺序处理的,这样保证每个key下的数据都处理好并输出到对应文件。

 

4.3 combiner函数

 

小reduce,在map节点就对当前map下相同key的数值做一次聚合(以单词计数为例), 然后在shuffle到reduce时节省带宽,一般情况下map中的combine写法和当前reduce写法一样。

 

4.4 输入和输出类型

 

4.5 副作用

 

4.6 跳过损坏的记录

 

有时候用户程序的bug会导致Map或者Reduce函数在处理某些记录时crash(崩溃),导致MapReduce操作无法顺利进行, 一般做法是找到这个BUG修复后在继续执行,但是有时候找到这个BUG并修复并不是一件容易的事,尤其是在引用了第三方包并找不到这些源码情况下,而在很多时候,忽略一些有问题的数据也是可以接受的,比如在一个巨大数据集上进行统计分析,我们提供一种执行模式,在这个执行模式下,MapReduce会检测哪些记录导致确定性的crash,并跳过这些记录不处理。

 

每个worker进程都设置了信号处理函数捕获内存段异常和总线错误,

在执行map或者reduce操作前, mapreduce库通过全局变量保存记录序号,如果用户程序触发了一个系统信号,消息处理函数将用最后一口气通过UDP包向master发送处理的最后一条记录的序号,当master看到在处理某条特定记录不止失败一次时,master就标记这条记录被跳过。

 

4.7 本地执行

 

集群执行mapreduce时,通常是几百几千台机器上,具体执行位置是由master进行动态调用,这样增加了调试难度,为了简化调试,我们开发了一套mapreduce库本地实现版,通过使用本地版本的mapreduce库,mapreduce操作在本地顺序执行,用户可以控制把操作限制到特定map任务上。

 

4.8 状态信息

 

master使用嵌入式的http服务器来显示一组状态信息页面,可以监控各种执行状态,比如计算执行速度,已经完成多少任务,有多少任务正在处理,输入字节数,中间数据字节数,输出字节数,处理百分比,以及哪些worker失效,以及失效时正在运行的map和reduce任务。

 

4.9 计数器

 

mapreduce库使用计数器统计不同事件发生次数,可以在map和reduce函数中增加计数器的值,

这些计数器的值周期性从各个单独worker机器上传递给master(附加在ping的应答包中传递)

master把执行成功的map和reduce任务的计数器值进行累计,当mapreduce操作完成后,返回给用户代码。

当累计计数器的值时,master要检查重复运行的map或者reduce任务,避免重复累加(比如备用任务和失效后重新执行任务这两个情况会导致相同的任务被多次执行)。

 

 

计数器机制对于Mapreduce操作完整性检查非常有用,比如,用户要求确保输出的 key value pair精确的等于输入的key value pair或者处理的文档数量在整个文档数量中属于合理范围。

 

5 性能

 

生活中的典型应用:

1TB数据进行特定模式匹配 (对数据格式进行转换)

1TB数据进行排序(海量数据中抽取少部分用户感兴趣的)

 

 

5.1 集群配置

 

1800个2G主频, 4G内存, 两个160G的IDE硬盘和一个千兆以太网卡,部署在一个两层的树形交换网络中,root节点大概有100-200GBPS传输带宽,所有机器采用相同部署对等部署,任意两点之间网络来回小于1ms。

 

每台机器中4G内存,大概有1-1.5G用于运行在集群上其他任务。

 

5.2 GREP

 

集群大约1分钟的初始启动阶段: 

a) 把程序传送给各个work机器上的时间 

b) 等待GFS文件系统打开XX个输入文件集合时间 

c) 获取相关文件本地位置优化信息时间。

 

而整体处理时间为150S。

 

5.3 排序

 

 

5.4 高效的backup任务

 

 

5.5 失效的机器

 

6 经验

 

 
 
 

6.1 大规模索引

 

mapreduce在google的典型应用:

重写了google网络搜索服务所使用的index系统,将索引系统的输入数据(网爬文档 大小超过20TB)存储在hdfs,后通过一系列mapreduce操作(大约5-10次)来建立索引。 替代后带来的好处:

 

a) 实现索引代码简单小巧容易理解,而容错分布式并行计算都交给了hadoop库,实现索引代码从3800行C++减少到700行

b) 将概念不相关的计算步骤分开处理,以前是混在一起以达到减少数据传递的额外消耗。

   概念不相关计算步骤分开也能让我们更容易改变索引处理方式,比如之前索引系统一个小更改要耗费几个月时间,但使用mapreduce系统这样的更改只要花费几天即可

c) 基于mapreduce库已经解决了机器失效,机器处理速度慢,网络瞬间阻塞等大部分问题,索引系统的管理更容易,不需要人员介入,并能不断增加机器提高集群整体处理性能。

 

 

 

7 相关工作

 

针对并行计算很多系统都提供了严格编程模型,并通过对编程进行严格限制来实现,并且大部分并发系统都只是在小规模集群上实现,并把容错处理交给了程序员。

 

而mapreduce是结合真实环境下处理海量数据并对并行经典模型进行的简化和萃取,并实现了基于成千处理器集群的容错处理,提供了透明的容错处理。

 

 

 

8 结束语

 

mapreduce封装了并行处理,容错处理,数据本地化优化,负载均衡等等技术难点细节,使得mapreduce库易于使用。

 

我们也从mapreduce开发过程中学到了很多东西:

a) 约束编程模式使得并行和分布式计算非常容易,也易于构造容错的计算环境

b) 网络带宽是稀有资源,大量的系统优化是针对减少网络传输量为目的的

   本地优化策略让大量数据从本地读取,

   中间文件写入磁盘,并且只写一份中间文件也节约了网络带宽

c) 多次执行相同任务可以减少性能缓慢的机器带来的负面影响,同时也解决了由于机器失效造成的数据丢失问题

 

 

  • 大小: 106.3 KB
  • 大小: 106.3 KB
  • 大小: 107.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics