【译文】MapReduce:大型集群上的简化数据处理

作者:Jeffrey Dean 和 Sanjay Ghemawat

摘要:

MapReduce 是一个编程模型,以及处理和生成大型数据集的一个相关实现,它适合各种各样的现实任务。用户指定计算的 map 和 reduce 函数。底层运行系统自动地将大规模集群机器间的计算并行化,处理机器故障,以及调度机器间通信以充分利用网络和磁盘。程序员会发现这个系统很好使用:在过去的去年中,超过一万个不同的 MapReduce 程序已经在 Google 内部实现,平均每天有十万个 MapReuce 作业在 Google 集群上被执行,每天总共处理 20PB 以上的数据。

1 简介

在 MapReduce 开发之前,作者和其他许多的 Google 员工实现了数以百计的处理大量原始数据(如抓取到的文档、Web 请求日志等等)的专用计算方法,以计算各种导出的数据,如倒排索引、Web文档图结构的各种表示、每个 host 抓取到的页面数的总结、某一天最频繁的一组查询。大多数这样的计算在概念上是非常简单的,然而它们的输入数据量通常非常大。为了在合理的时间内完成这些计算,它们必须分布到成百上千的机器上。如何并行化计算,分发数据,以及处理故障,这些问题结合起来,往往会让程序员使用大量复杂代码来处理,而掩盖了原本简单的计算。

为了应对这一复杂性,我们设计了一个新的抽象,它允许我们表达试图执行的简单计算,但将并行化、容错、数据分布和负载均衡等凌乱的细节隐藏到了库中。这个抽象的灵感来源于出现在Lisp和许多其他函数式语言中的 map 和 reduce 原语。我们实现了大部分的计算,包括为输入的每一个逻辑记录应用一个map操作以计算一组中间键值对,然后对所有共享同一个键的值应用一个 reduce 操作以恰当地结合导出的数据。此函数式模型支持用户自定义map和reduce操作,使我们能非常容易地并行处理大型计算,和使用再执行(reexecution)作为主要的容错机制。

这项工作的主要贡献就是一个简单而强大的接口,它完成自动并行化、大规模分布计算,结合该接口的一个实现在大型商用PC集群上获得了很高的性能表现。该编程模型还可以用于同一台机器上多个核心间的并行计算。

第 2 部分描述了基本的编程模型并给出几个例子。第 3 部分描述了 MapReduce 接口专门针对基于集群的计算环境的一个实现。第 4 部分描述了我们发现的这个编程模型的几个很有用的改进(refinements)。第5部分描述了对各种不同任务的实现的性能度量。第 6 部分探索了 MapReduce 在 Google 中的应用,包括使用它作为重写我们的生产索引系统的基础的一些经验。第7部分讨论了相关和未来的工作。

2 编程模型

这个计算需要一组输入键/值对,并生成一组输出键/值对。MapReduce 库的使用者将计算表达为两个函数:map 和 reduce。

map,由用户编写,需要一对输入并生成一组中间键/值对。MapReduce 库将所有与相同键值 I 相关联的值组合到一起,并将它们传递给 reduce 函数。

Reduce 函数,同样由用户编写,接受中间键 I 和这个键的一组值。它将这些值合并以形成一组可能更小的值。通常每次 reduce 调用只生成 0 个或 1 个输出值。中间值靠一个迭代器提供给用户的 reduce 函数。这使我们能够处理大量太大以至于不能装入内存的值列表。

2.1 例子

考虑一下在一个巨大的文档集合中统计每个单词出现次数的问题。使用者会编写与下面伪代码类似的代码:

map(String key, String value);
// key: document name
// value: document contents
for each word w in value:
    EmitIntermediate(w, "1");
    
reduce(String key, String values);
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
    result += ParseInt(v);
Emit(AsString(result));

map 函数发出每个单词加一个相关的出现次数 (count)(在这个简单例子中仅为 1)。reduce 函数对发给一个单词的所有数 (count) 求和。

此外,用户编写代码将输入和输出文件名以及可选的调优参数填入 mapreduce 规范对象中。然后调用 MapReduce 函数,将它传递给规范对象。用户的代码与 MapReduce 库(C++实现)相连接。我们最初的 MapReduce 资料中有这个例子的完整程序【8】。

2.2 类型

尽管前面的伪代码是按照输入输出字符串形式编写的,概念上由用户提供的 map 和 reduce 函数是有相关类型的。

map        (k1, v1)        --> list(k2, v2)
reduce     (k2, list(v2))  --> list(v2)

也就是说,输入键和值与输出键和值来自不同的域。此外,中间键和值与输出键和值来自同一个域。

3 实现

MapRedue 接口的许多不同实现都是可能的。正确的选择取决于环境。例如,一种实现可能适合一个小型的共享内存的机器,另外一种可能适合一个大型的 NUMA 多处理器,而另外一种可能适合一个更大的联网计算机集合。在我们最初的文章发表以后,已经发展出了很多 MapReduce 的开源实现【1, 2】,MapReduce 在各种问题领域的适用性也得到了研究【7, 16】。

这一部分描述了我们的一种 MapReduce 实现,其目标是目前广泛应用在 Google 中的计算环境:由交换千兆以太网连接在一起的大型 PC 集群【4】。在该环境中,机器通常运行 Linux 系统,有双核 x86 处理器以及 4-8GB 内存。个别机器拥有 1GB/s 的网络带宽,但每台机器等分的带宽远远低于 1GB/s。一个计算集群包含了成千上万台机器,因此机器故障是很常见的。存储由直接附在单独机器上的廉价IDE磁盘提供。GFS,Google 内部开发的一个分布式文件系统【10】,用来管理存储在这些磁盘上的数据。文件系统使用复制来提供不可靠的硬件之上的可用性与可靠性。

使用者提交 jobs 给调度系统。每个 job 包含一组任务,且由调度程序映射 (mapped) 到集群间的一组可用的机器上。

3.1 执行概述

通过自动将输入数据分割为一个有 M 个分裂(splits)的组,map 调用分布在多台机器间。输入分裂可以由不同的机器并行处理。reduce 调用通过利用分割函数(比如,hash(key) mod R)将中间键空间划分为 R 片进行分布。分割数 R 和分割函数都是由使用者指定的。

图1展示了在我们的实现中 MapReduce 操作的整体流程。当用户程序调用 MapReduce 函数,以下顺序行为将会发生(图 1 中标记的数字对应下面列中的数字)。

  1. 用户程序中的 MapReduce 库首先将输入文件划分为 M 片,通常每片 16~64MB(由用户通过可选参数控制)。然后启动集群中程序的多个副本。

  2. 这些程序副本中有一个特殊的 master 副本。其他副本则是由 master 分配了 work 的 workers。集群中需要分配 M 个 map tasks 和 R 个 reduce tasks。master 挑选闲置的 workers 且为每个 worker 分配一个 map task 或 reduce task。

  3. 分配了 map task 的一个 worker 读取相应输入划分的内容。它从输入数据中解析出键/值对并将每一对传递给用户定义的map函数。由map函数产生的中间键/值对缓冲在内存中。

  4. 缓冲区的键/值对定期地写入本地磁盘,由 partition 函数划分到 R 个区域中。这些本地磁盘上的缓冲对的位置被传递会 master,它将负责转发这些位置给 reduce workers。

  5. 当一个 reduce worker 被 master 通知了这些位置后,它使用远程进程调用来读取来自 map workers 的本地磁盘中的缓冲数据。当 reduce worker 读取到了所有分区中的中间数据后,它按照中间键将其排序,从而使所有相同键的出现次数组合在了一起。排序是必要的,因为通常很多不同的键被 map 到了同一个 reduce task。如果中间数据太大以至于不能放在内存中,还需要使用一个外部的排序。

  6. reduce worker 对排序好的中间数据执行迭代,对每个唯一的中间键,它将这个键以及相应的一组中间值传递个用户的 reduce 函数。reduce 函数的输出被附加到这个 reduce 分区的最终输出文件中。

  7. 当所有的 map tasks 和 reduce tasks 都完成后,master 唤醒用户程序。在这一点上,用户程序的 MapReduce 调用返回到用户代码处。

成功完成后,mapreduce 执行的输出可以在R个输出文件中获得(每个 reduce task 一个,由用户指定文件名)。通常,用户无需将这 R 个输出文件合并到一个文件中;他们通常将这些文件作为另一个 MapReduce 调用的输入,或者在来自另外一个可以处理划分到了多个文件中的输入的分布式应用程序中使用它们。

3.2 master数据结构

master 中有多种数据结构。对每一个 map task 和 reduce task,它存储了其状态信息(限制,进行,或完成)和 worker 机器的身份(对于非闲置 tasks)。

master 是 map tasks 传播中间文件区域位置到 reduce tasks 的导管。因此,对于每个完成了的 map task,master 存储由这个 map task 生成的 R 个中间文件区域的位置和大小。master 在 map tasks 称后接收到这些位置和大小信息的更新。这些信息将逐步推送到正在进行 reduce tasks 的 workers 中。

3.3 容错

由于 MapReduce 库旨在帮助利用成百上千的机器来处理大量数据,它必须优雅地容忍机器故障。

处理 worker 故障

master 会定期地 ping 每一个 worker。如果在一定时间内没有收到来自某台 worker 的响应,master 将这个 worker 标记为故障。任何由 worker 完成的 map tasks 都被重置为初始闲置状态,因而可以在其他的 workers 中调度。同样,在故障 worker 上的任何正在进行的 map task 和 reduce task 也被重置为闲置状态以便进行重新调度。

故障 worker 上已完成的 map task 需要重新执行,因为它们的输出存储在了故障机器的本地磁盘中导致无法访问。已完成的 reduce tasks 无需重新执行,因为它们的输出存储在了全局文件系统中。

当一个 map task 首先由 worker A 执行然后又由 worker B 执行(因为 A 发生了故障),所有执行 reduce task 的 workers 将被通知重新执行。任何还未从 worker A 读取数据的 reduce task 将从 worker B 读取数据。

MapReduce 适应于大规模的 worker 故障。例如,在一个 MapReduce 操作中,在运行中的集群上的网络维护导致了一组80台机器在几分钟内无法到达。MapReduce master 简单地重新执行无法到达的 worker 机器的工作且继续前进,最终完成 MapReduce 操作。

语义失败

当用户提供的 map 和 reduce 操作是它们他们的输入值的特定函数时,我们的分布式实现生成的输出将与整个程序的无错顺序执行生成的输出相同。

我们依靠 map 和 reduce 任务输出的原子的提交来实现这一性质。每个正在进行的 task 将其输出写入私有临时文件中。一个 reduce task 生成一个这样的文件,map task 生成 R 个这样的文件(每个 reduce task 一个)。当一个 map task 完成后,worker 发送一条消息给 master,这条消息中包含了 R 个临时文件的名字。如果 master 接收到了来自一个已完成的 map task 的完成消息,它将忽略这条消息。否则,它将这 R 个文件名记录到 master 数据结构中。

当一个 reduce task 完成后,reduce worker 自动重命名其临时输出文件为最终输出文件。如果同一个 reduce task 在多台机器上执行,同一个最终输出文件的多个重命名调用将被执行。我们依靠由底层文件系统提供的原子的重命名操作来保证最终文件系统状态仅包含来自一个 reduce 任务执行生成的数据。

绝大多数的map和reduce操作是确定的,事实上,我们的语义等价于这种情况下的一次顺序执行,这使得程序员能够非常容易地推断程序的行为。当 map 和/或 reduce 操作不确定时,我们提供了较弱但仍然合理的语义。在不确定操作存在时,一个特定 reduce task R1 的输出等价于由非确定性程序的一次顺序执行 R1 生成的输出。然而,另一个不同的 reduce task R2 的输出可能对应该非确定性程序的另一个不同顺序执行 R2 的输出。

考虑 map task M 和 reduce task R1 和 R2。令 e(Ri) 作为作为 R1 的执行(这确实是一个这样的执行)。较弱的语义出现因为 e(R1) 可能读取了 M 的一次执行生成的输出,e(R2) 可能读取了 M 的另一次执行生成的输出。

3.4 局部性

在我们的计算环境中,网络带宽是一个相对稀缺的资源。我们靠充分利用输入数据(由 GFS 管理【10】)存储在组成集群的机器的本地磁盘中这一事实来节省网络带宽。GFS 将每个文件分成 64MB 的块且在不同机器上存储了每个块的多个副本(通常 3 个)。MapReeuce master 考虑每个输入文件的位置信息且试图调度一台含有相应输入数据的机器上的一个 map task。如果失败,它将试图调度与该任务的输入的复制品相邻的一个 map task(例如,同一网络交换机中包含相同数据的两台机器)。当在一个集群的 workers 重要部分运行大型 MapReduce 操作时,大多数输入数据都是本地读取的,并不消耗网络带宽。

3.5 Task粒度

我们将 map 阶段细分为 M 个片段,reduce 阶段细分为 R 个片段,如前所述。理想情况下,M 和 R 应该远高于 worker 机器的数量。每个 worker 执行多个不同 tasks 改善了负载均衡,且当一个 worker 故障后加快了恢复速度:它完成的多个 map tasks 可以分布到所有其他 worker 机器上重新执行。

由于 master 必须做 O(M+R) 此调度决策和在内存中保持 O(MR) 个状态,如前所述,在我们的实现中 M 和 R 的数量大小是有实际界限的。(然而,内存的使用量很小。O(MR) 个状态中大约包含每个 map/reduce task 对一字节的数据。)

此外,R通常受到用户限制,因为每个 reduce task 的输出最终保存在一个单独的输出文件中。在实践中,我们倾向于选择M因而每个独立 task 大约有 16MB 到 64MB 的输入数据(因而之前所述的局部优化达到最搞笑),且我们让R是我们希望使用的机器数量的一个小的倍数。我们通常以 M=200000, R=5000,使用 2000 台 worker 机器执行 MapReduce。

3.6 备份 Tasks

延长 MapReduce 操作总时间的一个普遍原因是一个掉队者(straggler),也就是说,在这个计算中有一台机器花了异常长的时间来完成最后几个 map 或 reduce tasks。掉队者会以一大堆的理由出现。比如说,一台拥有坏磁盘的机器可能经历频繁的矫正错误从而使读取性能从 30MB/s 降低到了 1MB/s。集群调度系统可能在这个机器上调度了其他任务,导致它更慢地执行 MapReduce 代码,由于竞争 CPU、内存、本地磁盘或网络带宽等资源。我们经历的一个最近的问题是机器初始化代码中的一个 bug 导致处理器缓存失效:受影响的机器计算速度放慢了 100 倍。

我们有一个通用机制来减轻掉队者问题。当一个 MapReduce 操作接近完成时,master 将调度还在进行的任务的备份执行。无论是原始或者备份执行完成,这个任务都被标记为完成。我们调整了这个机制,因而它增加了该计算的计算资源的使用,但不超过几个百分点。我们发现它大大降低了完成大型 MapReduce 操作的时间。作为一个例子,当没有备份 task 机制时,在 5.3 部分描述的排序程序多花了 44% 的时间完成。

4 改进

虽然由简单编写的 map和 reduce 函数提供的基本功能已足以满足大多数需求,我们发现了一些有用的扩展。这包括:

  • 用户指定的分区(partition)函数来决定如何将中间键值对映射到 R 个 reduce 碎片;

  • 排序保证:我们的实现保证这 R 个 reduce 分区中的每个,中间键值对都按键的升序处理;

  • 用户指定的结合(combiner)函数的作用是,在同一个 map task 内,对按照同一个键生成的中间值进行局部结合,以减少必须在网络间传输的中间数据数量;

  • 自定义输入输出类型,为了读新的输入格式和生成新的输出格式;

  • 在单机上执行简单 debug 和小规模测试的一种方式。

在【8】中有对这几项的详细讨论。

5 性能表现

在此部分,我们利用大型集群上的两个计算来测量 MapReduce 的性能表现。一个计算通过搜索大约 1TB 的数据来找到一个特定的模式。另一个计算对大约 1TB 的数据进行排序。这两个程序代表由 MapReduce 用户编写的真正程序的一个大的子集-----程序的一个类用来从一个表示(representation)向另一个表示 shuffle 数据,另一个类从大数据集中提取小部分关注的数据。

5.1 集群配置

所有程序都在一个拥有大约 1800 台机器的集群上执行。每台机器拥有两个支持超线程的 2GHz 的 Intel Xeon 处理器,4GB 内存,两个 160GB 的 IDE 磁盘,和千兆以太网接入。这些机器被安排在一个二级树形的交换网络中,该网络根部大约有 100~200Gbps 的聚合带宽。所有机器都在同一个托管设施中,因此任何一对机器间的往返通信时间不超过 1 毫秒。

虽然有 4GB 内存,但是大约 1~1.5GB 保留给了运行在集群上的其他任务。这些程序在一个周末的下午执行,此时 CPUs,磁盘和网络带宽基本都空闲。

5.2 Grep

grep 程序扫描了 10^10 个 100 字节的记录,搜索一个相对稀有的三字符模式串(该模式串大约出现在 92337 个记录中)。输入被划分为了大约 64MB 大小的片(M=15000),整个输出都放在了一个文件中(R=1)。

图 2 展示了计算随时间推移的进展。Y 轴显示了输入数据的扫描速率。随着安排到 MapReduce 计算的机器越来越多,速率也在逐步提升,当安排了 1764 个 workers 时速度达到峰值 30GB/s 以上。map 任务结束后,速率来时下降且在大约 80 秒时到达 0。整个计算从开始到结束大约花费了 150 秒。这包括 1 分钟的启动消耗。这个消耗来自向所有 workers 机器传播程序、延迟与 GFS 的交互以开启一组 1000 个输入文件,和获取局部优化所需的信息。