TALEND WEBINAR : March 27th, 2018 | Step-by-Step to Enterprise Data Integration

Apache Spark 和 Talend:性能和调优

Apache Spark 和 Talend:性能和调优

  • Petros Nomikos
    I have 3 years of experience with installation, configuration, and troubleshooting of Big Data platforms such as Cloudera, MapR, and HortonWorks. I joined Talend in 2014, and prior to Talend I held positions as manager of technical support, and project manager for data warehouse implementations. In my current role, I assist companies in understanding how to implement Talend in their Big Data Ecosystem.
  • April 12, 2018

首先,我希望感谢我前 2 篇有关“Talend 和 Apache Spark”主题的博客的所有读者。

如果您是第一次阅读此博客系列,且未阅读过我之前的帖子,可以单击此处阅读“Talend 和 Apache Spark:技术入门指南”,和单击此处阅读第二部分“Talend 和 Spark Submit 配置:有何区别?”。

该系列中前两篇有关 Apache Spark 的帖子概述了 Talend 如何与 Spark 协同工作,Talend 和 Spark Submit 之间有何相同点,以及 Talend 中可用于 Spark 作业的配置选项。

在本篇博客中,我们将介绍 Apache Spark 性能和调优。几乎所有使用 Apache Spark 的人都在热议这个话题,即使 Talend 之外的人也不例外。在首次开发和运行 Spark 作业时,您很难绕过下面这些问题。

  • 我应该为我的 Spark 作业分配多少个执行器?
  • 每一执行器需要多少内存?
  • 我应该使用多少个核心?
  • 为何一些 Spark 作业需要耗费数小时来处理 10GB 的数据,我如何解决这个问题?

在本篇博客中,我将逐一探讨这些问题,并给出相关解答和见解。继续这个主题之前,我们先来了解一下本篇博客中将会用到的一些主要概念:

分区:分区指的是分布式数据集的一部分,通常按默认 HDFS 块大小创建。Spark 利用分区对数据集执行并行处理。

任务:任务是指可以在执行器内运行的工作单元。

核心:核心是 CPU 中的处理单元,用于确定可以在执行器内运行的 Spark 并行任务数量。

执行器:在工作线程节点上开启的进程,用于在内存或磁盘中运行您的作业提交。

Application Master:每一 YARN 应用程序都运行着一个负责从资源管理器请求资源的应用程序主进程。分配好资源后,该进程将与节点管理器协同工作,启动其中所需的容器。

Spark 调优

首先,我们介绍如何调优Talend 内的 Apache Spark 作业。如前所述,在 Talend Spark 作业中,您可以找到“Spark 配置”选项卡,并在该选项卡设置调优属性。Talend 中始终默认未勾选此选项

利用此部分的选项,您将可以设置您的 Application Master 和执行器将使用的内存大小和核心数量,以及您的作业需要多少个执行器。当开始在此部分填充数值时,问题就来了,“如何确定需要多少核心或内存才能确保 Application Master 或执行器达到的良好性能?”让我们一起来解决这个问题。

如何为您的 Spark 作业选择核心数量

就此而言,在继续探讨前我们需要考虑几个因素,即:

  1. 数据集大小
  2. 作业完成时间范围要求
  3. 作业正在执行的操作和动作

将这些因素思考一番过后,我们便可以开始配置作业,以便最大限度提高性能。我们先从 Application Master 调优开始。对于 Application Master 来说,我们可以保留默认值,因为它仅执行资源协调而不执行任何处理,这意味着无需大量的内存或核心。

下一步是配置执行器的内存和核心。这里的主要问题在于,应当使用多少执行器、内存和核心。为找到这个问题的答案,我们假定我们拥有一个 Hadoop 集群,其中包含 6 个各带 32 个核心和 120GB 内存的工作线程节点。我们首先想到的可能就是,每个执行器能够执行的并发任务越多,性能就越好。在研究这个问题时,我们可以参考来自 Hadoop 发行版(如以下链接中的 Cloudera 示例)的性能调优指南。根据该指南,如果每个执行器的核心超过 5 个,将会导致 HDFS I/O 出错。因此,要获得最佳性能,建议核心数量保持在 5 个。

接下来,我们看看我们要启动多少个执行器。根据核心和节点数量,我们即可轻松确定执行器数量。如上所述,每个执行器最好使用 5 个核心。那么,我们必须从每个节点所具有的 32 个核心中删除那些不能用于执行作业的核心,因为操作系统和节点上运行的 Hadoop 守护进程需要这些核心。Hadoop 集群管理工具已经帮助我们完成了这项工作,因此我们可以轻易确定出每节点有多少核心可用于执行 Spark 作业。

计算过后,让我们假设每个节点剩余 30 个核心可以使用。鉴于我们已经确定出每个执行器的最佳核心数量是 5 个,这意味着每个节点最多可运行 6 个执行器。非常简单!

最后,我们再计算一下可以使用的内存容量。按照以上硬件规格,我们看到每个节点有 120GB 的内存,但是,正如我在讨论核心数量时所提到的那样,我们不能将全部内存用于执行作业,因为操作系统还会占用一些内存。而我们的 Hadoop 集群管理工具也可以帮我们确定这些内存中有多少可用于执行作业。如果操作系统和 Hadoop 守护进程需要 2GB 内存,那么将剩下 118GB 的内存可用于执行 Spark 作业。鉴于我们已经确定出每个节点可以运行 6 个执行器,这表示每个执行器最多可使用大约 20GB 的内存。即便这并非 100% 准确,我们也应将每个执行器的内存开销计算在内。在上一篇博客中,我曾提到默认开销为 384MB。那么,如果我要从这 20GB 中扣除这一内存开销,粗略估计一下,可以为每一执行器分配的最大内存将是 19GB。

集群资源的动态和固定分配

上文中的数字适用于 Spark 作业中集群资源的固定或动态分配。两者的区别在于动态分配。就动态分配来说,您可以指定所使用执行器的初始数量,即作业可以使用的最小执行器数量(工作负载较少时)以及最大数量(需要更大处理能力时)。虽然将集群的所有资源都用于我们的作业非常好,但是我们需要与在集群上运行的其他作业共享这一处理能力。因此,我们需要根据在回顾之前为调优 Talend Spark 作业而定义的考量因素时确定的需求,来确定可使用最大值的百分比。

配置好作业之后,接下来便可以实际运行作业了!我们做个假设,即便已经设为上述最大设置,我们的 Spark 作业仍旧需要耗费大量时间来完成。我们需要返回到我们的作业,再检查几项设置,确保它们可用于实现最佳性能。

Spark 性能

首先,我们假设我们要连接 Spark 作业中的两个表,在开始优化 Spark 作业前,我们需要考量的一个因素就是数据集的大小。现在,我们查看表的大小,并确定其中一个大小为 50GB,另一个为 100MB 时,我们需要看看是否利用了复制连接的Talend 组件

复制连接

复制连接(又称为 Map 端连接)使用广泛,常用于连接大型表与小型表,以便将来自小型表的数据广播到所有执行器。在这种情况下,由于较小数据集可以放入内存,我们可以使用复制连接将其广播到每一执行器,并优化 Spark 作业的性能。

鉴于表数据需要在执行器级别与端数据结合,通过将较小数据集广播到所有执行器,我们将可以避免通过网络发送大型表。Spark 中许多性能问题都是因为通过网络对大量数据进行随机排序而引起的。对于这一点,我们可以在 Talend 作业中轻松进行检查,只需启用 tMap 中的“使用复制连接”选项即可,如下所示。这样便可以将查找表的数据广播到所有执行器。

 

下一步是看看我们的作业中有哪些操作在执行成本高昂的重新计算

Spark 缓存

为了便于大家了解重新计算,我们举一个简单的例子,即加载包含客户购买数据的文件。从这些数据中,我们希望获取一些指标 –

  • 客户总数量
  • 购买产品数量

在这个例子中,如果我们不使用 Spark 缓存,上述每一操作都将加载数据。而这会造成成本高昂的重新计算,进而影响性能。由于我们知道在之后的作业执行流程中将用到这一数据集,因此最好使用 Spark 缓存,将其缓存在内存中以供日后使用,这样我们就不必不停重新加载了。

在 Talend Spark 作业中,这由 Talend Apache Spark 调色板中可用的tCacheIntCacheOut组件完成,您可以利用 Spark 缓存机制提供不同的可用选项

另外,您可以选择是否仅在磁盘上缓存数据,然后您便可以利用提供的选项为内存、磁盘或二者兼有,序列化缓存的数据。最后,您还可以选择要在 2 个其他节点上复制的缓存数据。最常用的选项是无序列化内存,因为它非常快速,但是当我们知道缓存 RDD 无法放入内存,而我们也不想溢出到磁盘时,就可以选择进行序列化,以便减少数据集使用的空间,但这会带来额外的开销成本,进而影响性能。因此,需要评估您的选项,并选择最适合您需求的选项。

 

如果在这之后仍旧存在性能问题,我们就需要查看一下 Spark 历史记录 Web 界面,了解发生了什么情况。如我在之前的博客中所述,在 Talend“Spark 配置”选项卡的“Spark 历史记录”部分,我们可以启用 Spark 日志记录。Spark 日志记录可在作业完成后保日志并通过 Spark 历史记录 Web 界面提供日志,从而帮助排除 Spark 作业出现的故障问题。最好的做法就是在 Spark 作业中启用 Spark 事件日志记录,以便轻松排除性能问题。

 

启用 Spark 事件日志记录后,即可前往 Spark 历史记录 Web 界面,在该界面中可以看到在查看作业的应用程序数量时,我们可以使用以下选项卡:

在上述 Spark UI 中,我们想要查看“阶段”选项卡,确定影响作业性能的阶段,查看其详情,并检查是否出现类似以下的行为:

 

我们发现,大部分数据都是由一个执行器在处理,剩余的执行器则全部闲置,即便我们已经分配了 10 个执行器。那么,为什么会是这样呢?要回答这个问题,我们需要确定出现问题的作业阶段。比如说,我们注意到,问题是出在 Spark 作业的从压缩文件读取数据的这一环节。由于存档文件在读取时默认为未分区,需要为我们读取的每一存档文件创建一个带单独分区的 RDD,这便造成了我们所见的行为。现在,如果压缩文件的存档格式为可分割式,比如 BZIP2,并且可以在读取时分区,那么我们就可以在 tFileInputDelimited 的高级设置中启用“设置最小分区”属性,然后至少设置与执行器同样多的分区作为起点。

但是,对于像 GZIP 这种无法在读取时重新分区的存档文件,我们可以使用tPartition组件对其进行明确的分区。如下所示,利用该组件我们可以对文件进行重新分区,从而在各执行器之间均匀地分配负载。

在使用我们的 tJDBC 组件从数据库读取时,也可以使用读取时分区,需用到以下属性:

如上所示,仅在特定情况下才会用到重新分区。如果我们确定数据集在我们用于连接的键上发生倾斜,则需要使用不同的方法。那么我们如何确定数据倾斜呢?首先按分区查看数据集,看看数据在我们用于连接的各个键中的分布。以下便是按分区显示的倾斜数据集的一个示例:

在此例中,如果我们无法按不同的键进行重新分区,则需要寻找别的方法来改善 Talend Spark 作业。其中一项广泛使用的技术便是“加盐”。通过加盐,您可以为实际键添加一个伪键,以便均衡数据在各分区的分布。这通过我们在 Spark 作业中的tmap组件即可完成,例如:


如上所示,我们在 tmap 级别将伪键作为数值随机键进行添加,并连同我们的实际键将其与我们另已添加伪键的查找数据集连接。由于连接是基于我们的实际键和为分布而生成的伪键所完成,这将可以帮助我们在连接 Spark 中的数据集时避免可能影响性能的倾斜分区。

结语

我们可以使用多种不同的技术来提升性能并调优 Talend Spark 作业。希望本篇博客中介绍的几点可以为您带来一定帮助。立即在 Talend 上构建更多 Spark 作业吧!

参考文献:

https://spark.apache.org/docs/latest/tuning.html

https://www.cloudera.com/documentation/enterprise/latest/topics/admin_spark_tuning1.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-tuning.html

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_spark-component-guide/content/ch_tuning-spark.html

Most Downloaded Resources

Browse our most popular resources - You can never just have one.

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *