何为 MapReduce?

在当今数据驱动的市场上,算法和应用程序全天候在收集有关人员、流程、系统和组织的数据,进而产生了海量的数据。这就给我们提出了一个挑战,即如何在不牺牲洞察有效性的同时,高效快速地处理这些海量的数据。

这便是 MapReduce 编程模型派上用场的地方。MapReduce 最初被 Google 用来分析其搜索结果,由于它能够并行拆分和处理 TB 级数据并快速获得结果,故而广受欢迎。

何为 MapReduce?

MapReduce 是 Hadoop 框架中的一种编程模型或模式,用于访问 Hadoop 文件系统 (HDFS) 中存储的大数据。它是 Hadoop 框架正常运行不可或缺的一个核心组件。

MapReduce 通过将 PB 级数据拆分成较小的数据块然后在 Hadoop 市售服务器中并行处理,以此帮助实现并行处理。最后它还可以聚合来自多个服务器的所有数据,将整合后数据返回至应用程序。

阅读“何为 Hadoop”了解更多 →

例如,一个包含 20,000 台廉价市售服务器且各有 256MB 数据块的 Hadoop 集群可以同时处理大约 5TB 的数据。相比顺序处理如此大的数据集,这可大大节省处理时间。

利用 MapReduce,无需将数据发送至应用程序或逻辑驻留的位置,即可让逻辑在数据已驻留的服务器上执行,进而加快处理速度。数据访问和存储皆基于磁盘,输入通常存储为包含结构化、半结构化或非结构化数据的文件,输出也以文件形式存储。

MapReduce 曾是检索 HDFS 中所存储数据的唯一方式,但那只是过去式了。如今,市场上还出现了其他基于查询的系统,比如 Hive 和 Pig,可使用类 SQL 语句从 HDFS 检索数据。但是,这些通常与采用 MapReduce 模型编写的作业一同运行。这是因为 MapReduce 具有独特的优势。

Ready For More? Download 何为 MapReduce? User Guide now.

View Now

MapReduce 如何运作

MapReduce 有两大核心函数:Map(映射)和 Reduce(归约)。两者依序排列。

  • Map 函数可以从磁盘接收输入作为 <key,value> 对,进行处理,然后生成另一组中间 <key,value> 对作为输出。
  • Reduce 函数也可以接收输入作为 <key,value> 对,并生成 <key,value> 对作为输出。

mapreduce diagram

键和值的类型视使用案例而有所不同。所有输入和输出均存储在 HDFS 中。虽然映射是对初始数据筛选和排序的强制步骤,但是归约函数为可选项。

<k1, v1> -> Map() -> list(<k2, v2>)

<k2, list(v2)> -> Reduce() -> list(<k3, v3>)

Mapper(映射器)和 Reducer(归约器)是分别用于运行映射和归约函数的 Hadoop 服务器,它们为相同还是不同服务器均无关紧要。

映射

输入数据首先拆分为较小的数据块,所有数据块随后被分配到映射器中进行处理。

例如,如果一个文件有 100 条记录要处理,那么可一起运行 100 个映射器,每个映射器处理一条记录。或者可一起运行 50 个映射器,每个映射器处理两条记录。Hadoop 框架根据要处理数据的大小以及每台映射器服务器上可用的内存块,来确定要使用多少映射器。

归约

当所有映射器完成处理过后,框架将对结果进行随机排序和排序,然后将其传递到归约器中。归约器无法在映射器仍在运行时启动。所有具有相同键的映射输出值将分配到单个归约器,该归约器将聚合该键的值。

合并与分区

映射和归约有两个中间步骤。

合并是一个可选流程。合并器是一个可单独在各映射器服务器上运行的归约器。它可以将每一映射器上的数据进一步归约至简化形式,然后将其传递到下游。

这就使得随机排序和排序变得更加简单,因为要处理的数据更少。鉴于归约函数中存在累积函数和结合函数,通常合并器类设为归约器类本身。但是如果需要,合并器也可以作为一个单独的类。

分区流程是指将源自映射器的 <key, value> 对转换成另一 <key, value> 对,以馈送至归约器。它决定应如何将数据呈现给归约器,并将其分配至特定的归约器。

默认分区器确定源自映射器的键的哈希值,并根据此哈希值分配分区。分区数量与归约器相同。因此当完成分区后,一个分区的数据将发送到特定归约器。

Ready For More? Download 何为 MapReduce? User Guide now.

View Now

MapReduce 示例

考虑一种每天接收数百万付款处理请求的电子商务系统。在这些请求期间可能抛出一些异常,比如“付款被支付网关拒绝”、“库存不足”和“地址无效”等。开发人员想要分析过去四天的日志,以便了解哪种异常被抛出了多少次。

使用案例示例

目标是隔离出最容易出现错误的用例,然后采取适当的措施。例如,如果同一支付网关频繁抛出异常,是否是因为服务不可靠或者界面编写很糟糕?如果经常抛出“库存不足”,这是否意味着库存计算服务有待改进,或者需要增加特定产品的库存?

开发人员可以问一些相关的问题,并确定适合的行动方案。要对带有数百万记录的庞大日志执行此分析,MapReduce 可谓是理想的编程模型。多个映射器可同时处理这些日志:一个映射器可以根据日志大小以及映射器服务器上可用的内存块,来处理某一天的日志或其子集。

映射

为简化之便,我们假设 Hadoop 框架仅运行四个映射器。映射器 1、映射器 2、映射器 3 和映射器 4。

输入到映射器的值是日志文件中的一个记录。键可以是文本字符串,例如“文件名 + 行号”。映射器而后处理日志文件的每一个记录,以生成键值对。这里,我们使用一个填充符表示值为“1”。映射器输出如下所示:

映射器 1:<Exception A, 1>, <Exception B, 1>, <Exception A, 1>, <Exception C, 1>, <Exception A, 1>映射器 2:<Exception B, 1>, <Exception B, 1>, <Exception A, 1>, <Exception A, 1>映射器 3:<Exception A, 1>, <Exception C, 1>, <Exception A, 1>, <Exception B, 1>, <Exception A, 1>映射器 4:-<Exception B, 1>, <Exception C, 1>, <Exception C, 1>, <Exception A, 1>

假设每一映射器上运行有一个合并器(合并器 1 …… 合并器 4),用于计算每一异常的计数(与归约器函数相同),则合并器 1 输入将为:

<Exception A, 1>, <Exception B, 1>, <Exception A, 1>, <Exception C, 1>, <Exception A, 1>

合并

合并器 1 的输出将为:

<Exception A, 3>, <Exception B, 1>, <Exception C, 1>

其他合并器的输出将为:

合并器2: <Exception A, 2> <Exception B, 2>
合并器3: <Exception A, 3> <Exception B, 1> <Exception C, 1>
合并器4: <Exception A, 1> <Exception B, 1> <Exception C, 2>

分区

之后,分区器将来自这些合并器的数据分配到归约器,并对这些数据进行排序。

至归约器的输入如下所示:

归约器 1: <Exception A> {3,2,3,1}
归约器 2: <Exception B> {1,2,1,1}
归约器 3: <Exception C> {1,1,2}

如未涉及合并器,则至归约器的输入如下所示:

归约器 1: <Exception A> {1,1,1,1,1,1,1,1,1}
归约器 2: <Exception B> {1,1,1,1,1}
归约器 3: <Exception C> {1,1,1,1}

此处所列的例子非常简单,但当涉及 TB 级的数据时,合并器流程对带宽的改善非常重要。

归约

现在,每一归约器计算异常总计数,如下所示:

归约器 1: <Exception A, 9>
归约器 2: <Exception B, 5>
归约器 3: <Exception C, 4>

以上数据显示,Exception A 被抛出的次数比其他异常更多,因此需多加注意。当要一起处理超过数周或数月的数据时,MapReduce 程序的潜力将真正发挥出来。

如何实施 MapReduce

MapReduce 程序不仅限于 Java,也可使用 C、C++、Python、Ruby、Perl 等编写。以下是典型 MapReduce 作业的主函数示例:

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(ExceptionCount.class);
conf.setJobName("exceptioncount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setCombinerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

}

主函数中定义了各种参数,如 MapReduce 类名称,映射、归约和合并类,输入和输出类型,输入和输出文件路径等。映射器类扩展 MapReduceBase 并实施映射器接口。归约器类扩展 MapReduceBase 并实施归约器接口。

如需了解详细的代码示例,请参阅 Hadoop 教程

Talend MapReduce 教程

虽然 MapReduce 是解决大数据问题的一种敏捷且具有弹性的方案,但它本身非常复杂,这就意味着,开发人员需要花费大把时间来学习专业知识。企业拥有技能娴熟的人才和稳健的基础架构后,才能利用 MapReduce 有效处理大数据集。

在这种情况下就需要用到 Talend 的数据集成解决方案。它采用一个即用型框架,可整合 Hadoop 生态系统中使用的各种工具,例如 Hive、Pig、Flume、Kafka、HBase 等。借助 Talend Studio 提供的基于 UI 的环境,用户能够从 HDFS 加载和提取数据。

观看 Talend Studio 简介视频。 →

特别是对于 MapReduce 而言,Talend Studio 可帮助用户更为轻松地创建可在 Hadoop 集群上运行的作业,并设置映射器类、归约器类、输入格式和输出格式等参数。

创建 Talend MapReduce 作业(与 Apache Hadoop 作业的定义不同)后,便可将其部署为一项可在大数据集群中源生运行的服务、可执行文件或独立作业。它可以生成一个或多个可反过来执行 MapReduce 算法的 Hadoop MapReduce 作业。

运行 MapReduce 作业前,需配置好 Hadoop 连接。如需详细了解如何使用 Talend 设置 MapReduce 作业,请参阅相关教程

利用 MapReduce 解决大数据问题

MapReduce 编程范式适用于任何可通过并行化解决的复杂问题。

社交媒体网站可利用它来确定在过去一个月获得了多少来自不同国家/地区的新注册用户,进而评估其在不同地域的受欢迎程度。贸易公司可以更快地执行批量对账,并确定哪些情况通常会造成交易失败。搜索引擎可以确定浏览量,而营销人员则可以使用 MapReduce 开展情绪分析。

如需进一步了解 MapReduce 并查看与上述类似的用例,请立即下载 Talend Studio 试用版

 

| Last Updated: January 22nd, 2019