在这篇博客文章中,我们将演示如何使用上的Apache MXNet(孵化) 和Apache Spark对大型数据集运行分布式离线推理。我们将说明离线推理如何起作用、为何离线推理具有挑战性以及如何利用 Amazon EMR 上的 MXNet 和 Spark 来应对这些挑战。
大型数据集上的分布式推理 – 需求与挑战
在进行有关深度学习模型的培训后,可以对新数据上运行推理了。可对需要即时反馈的任务 (如欺诈检测) 执行实时推理。这通常称作_在线推理_。或者,也可在预计算有用时执行离线推理。离线推理的常用案例是用于具有低延迟要求的服务,例如,要求对许多用户-产品分数进行排序和排名的推荐系统。在这些情况下,将使用离线推理来预计算推荐。结果将存储在低延迟存储中,而且将按需使用存储中的推荐。离线推理的另一个使用案例是使用从先进模型中生成的预测回填历史数据。作为一个假想的示例,报纸可利用此设置来使用从人员检测模型中预测的人员姓名回填已存档的照片。分布式推理还可用于基于历史数据测试新模型以验证这些模型在部署到生产之前是否会产生更好的结果。
通常,会在跨数百万条或更多记录的大型数据集上执行分布式推理。在合理的时间范围内处理这类大规模数据集需要一组计算机设置和深度学习功能。借助分布式群集,可使用数据分区、批处理和任务并行化来进行高吞吐量处理。但是,设置深度学习数据处理群集会面临一些挑战:
接下来,本博客文章将介绍如何使用 Amazon EMR 上的 MXNet 和 Spark 来应对这些挑战。
使用 MXNet 和 Spark 进行分布式推理
利用 Amazon EMR,可轻松、经济高效地使用 Spark 和 MXNet 来启动可扩展群集。Amazon EMR 按秒计费,并且可使用 Amazon EC2 竞价型实例来降低工作负载的成本。
Amazon EMR 与 Spark 相结合,可简化群集和分布式作业管理的任务。Spark 是一种群集计算框架,支持各种数据处理应用程序。Spark 还可跨群集对数据进行高效分区以实现处理的并行化。Spark 与Apache Hadoop生态系统及多个其他大数据解决方案紧密集成。
MXNet 是一种快速且可扩展的深度学习框架,该框架已针对 CPU 和 GPU 上的性能进行优化。
我们将演练使用 Amazon EMR 上的 Spark 和 MXNet 在大型数据集上设置和执行分布式推理的步骤。我们将使用 MXNet上提供的预训练的图像识别模型。我们将对包含 60000 张彩色图像的公开可用的 CIFAR-10 数据集运行推理。该示例演示的是在 CPU 上运行的推理,但您可以轻松将其扩展为使用 GPU。
以下列表包含设置和执行的高级步骤,并且以下部分将详述这些步骤:
Amazon EMR 上的 MXNet 和 Spark 群集设置
我们将使用 Amazon EMR 创建带 Spark 和 MXNet 的群集,可使用将该群集安装为应用程序。我们将使用创建包含 4 个 c4.8xlarge 类型的核心实例和 1 个 m3.xlarge 类型的主实例 的群集,但您也可以使用 Amazon EMR 控制台创建该群集。
用于创建群集的命令如下所示。我们假设您拥有创建该命令的正确凭证。
aws emr create-cluster \
--applications Name=MXNet Name=Spark \
--release-label emr-5.10.0 \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName=<YOUR-KEYPAIR>,SubnetId=<YOUR-SUBNET-ID> \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \
InstanceGroupType=CORE,InstanceCount=4,InstanceType=c4.8xlarge \
--log-uri 's3n://<YOUR-S3-BUCKET-FOR-EMR-LOGS>/' \
--bootstrap-actions,Path=s3://aws-dl-emr-bootstrap/mxnet-spark-demo-bootstrap.sh \
--name "mxnet-spark-demo"
复制代码
替换以下参数:
–bootstrap-actions 用于安装 Git、Pillow 和 Boto 库。
请参阅 AWS 文档以了解有关创建和使用 IAM 角色(设置 Amazon EMR 群集所需) 的更多信息。
下面将讨论的代码段位于 mxnet-spark 文件夹下的deeplearning-emrGitHub 存储库中。它包含用于使用 MXNet 和 Spark 运行推理的完整代码。我们还将在后续部分之一中讨论如何使用
spark-submit 提交 Spark 应用程序。
GitHub 存储库中的 mxnet-spark 文件夹包含以下 3 个文件:
初始化
我们将使用(Spark 的 Python 接口) 创建我们的应用程序。Spark 应用程序包含一个运行用户的主函数的 驱动程序 以及一个或多个并行运行各种任务的 执行程序 进程。
为执行 Spark 应用程序,该驱动程序会将工作拆分为多个作业。每个作业会进一步拆分为多个阶段,而每个阶段包含一系列并行运行的独立任务。任务是 Spark 中的最小工作单元并且执行相同代码,每个任务位于不同的数据分区 (大型分布式数据集的逻辑数据块) 中。
图像所有权: Apache Spark 文档
Spark 提供一个用于处理分布式数据集的抽象,即弹性分布式数据集 (RDD)。RDD 是一个已跨可并行操作的群集分区的对象的不可变的分布式集合。可通过并行处理集合或外部数据集来创建 RDD。
从较高的层面来说,分布式推理应用程序的管道如下所示:
默认情况下,Spark 在执行程序上为每个内核创建一个任务。由于 MXNet 具有内置的并行机制来有效使用所有 CPU 内核,因此,我们会将应用程序配置为仅为每个执行程序创建一个任务并让 MXNet 在该实例上使用所有内核。在以下代码中,我们将配置密钥 spark.executor.cores 设置为 1,然后在创建 SparkContext 时传递 conf 对象。在提交应用程序时,您将看到我们还将执行程序数设置为群集上可用的工作程序数。这使每个节点均有一个执行程序并关闭执行程序的动态分配。
JavaScript
conf = SparkConf().setAppName("Distributed Inference using MXNet and Spark")
conf.set('spark.executor.cores', '1')
sc = pyspark.SparkContext(conf=conf)
logger.info("Spark Context created")
复制代码
在群集上加载数据并对数据进行分区
我们已将 CIFAR-10 数据复制到存储桶
mxnet-spark-demo。由于可在所有节点上访问存储在 S3 中的数据,因此,我们无需在驱动程序与执行程序之间移动数据。我们仅在驱动程序上提取 S3 密钥并使用库创建密钥的 RDD,后者是用于访问 AWS 服务的 Python 接口。此 RDD 将进行分区并分发到群集中的执行程序,并且我们将在这些执行程序上直接提取并处理少量图像。
我们将使用辅助标记方法
fetch_s3_keys(来自) 以从 Amazon S3 存储桶获取所有密钥。此方法还采用一个前缀以列出以该前缀开头的密钥。在提交主应用程序时将传递这些参数。
s3_client = get_s3client(args['access_key'], args['secret_key'])
keys = fetch_s3_keys(args['bucket'], args['prefix'], s3_client)
复制代码
由
args[‘batch’]决定的批处理大小是在每个执行程序上可一次性提取、预处理和运行推理的图像的数量。这受每个任务可用的内存量的约束。
args[‘access_key’]和
args[‘secret_key’]是可选参数,用于在使用正确权限设置实例角色的情况下访问其他账户中的 S3 存储桶。该脚本将自动使用在启动时分配给群集的 IAM 角色。
我们会将密钥的 RDD 拆分为多个分区,其中每个分区均包含小批量图像密钥。如果密钥无法划分为多个批处理大小的分区,我们将填充最后一个分区以重用其中一些初始密钥组。这是必需的,因为我们将绑定 (请参阅以下代码) 到固定批处理大小。
JavaScript
n_partitions = n_keys // args['batch']
# if keys cannot be divided by args['batch'] .
if (n_partitions * args['batch'] != n_keys):
keys.extend(keys[: args['batch'] - (n_keys - n_partitions * args['batch'])])
n_partitions = len(keys) // args['batch']
rdd = sc.parallelize(keys, num_slices=n_partitions)
复制代码
提取数据并将数据加载到 Spark 执行程序中
在 Apache Spark 中,您可以在 RDD 上执行两类操作。 转换 对一个 RDD 中的数据运行并创建新的 RDD,而 操作 计算 RDD 中的结果。
RDD 上的转换已 延时评估。 即,Spark 将不会执行转换直到它看到操作,相反,Spark 会通过创建有向无环图来跟踪不同的 RDD 之间的依赖项,这些依赖项会引导操作以形成执行计划。这有助于按需计算 RDD 并在 RDD 的分区丢失时执行恢复。
我们将使用 Spark 的
mapPartitions,该项提供分区记录的迭代器。转换方法对 RDD 的每个分区 (数据块) 单独运行。我们将使用来自的
download_objects 方法作为 RDD 分区上的转换,以将该分区的所有图像从 Amazon S3 下载到内存中。
sc.broadcast(args['bucket'])
rdd = rdd.mapPartitions(lambda k : download_objects(args['bucket'], k))
复制代码
我们将运行其他转换以使用Python Pillow– Python Imaging Library 将内存中的每个图像转换为 numpy 数组对象。我们使用 Pillow 对内存中的图像 (采用 png 格式) 进行解码并将其转换为 numpy 对象。此操作在mxinfer.py的 read_images 和 load_images 中完成。
rdd = rdd.mapPartitions(load_images)
def load_images(images):
Decodes batch of image bytes and returns a 4-D numpy array.
import numpy as np
for image in images:
img_np = readImage(image)
batch.append(img_np)
batch_images = np.concatenate(batch)
logger.info('batch_images.shape:%s'%(str(batch_images.shape)))
return batch_images
def readImage(img_bytes):
Decodes an Image bytearray into 3-D numpy array.
from PIL import Image
import numpy as np
from array import array
img = io.BytesIO(bytearray(img_bytes))
# read the bytearray using OpenCV and convert to RGB
img = Image.open(img)
img = img.convert('RGB')
#resize the image to 224x224
img = img.resize((224, 224), Image.ANTIALIAS)
# reshape the array from (height, width, channel) to (channel, height, width)
img = np.swapaxes(img, 0, 2)
img = np.swapaxes(img, 1, 2)
# add a new axis to hold a batch of images.
img = img[np.newaxis, :]
复制代码
注意: 在此应用程序中,您将看到我们正在 mapPartitions 函数内部而不是文件顶部导入模块 (numpy、mxnet、pillow 等)。这是因为,PySpark 将尝试序列化在模块级别导入的所有模块和任何相关库,而且常常无法挑选模块及模块的任何其他关联二进制文件。否则,Spark 将预计例程和库在节点上可用。我们将在使用
spark-submit 脚本提交应用程序时以代码文件形式发送例程。这些库已安装到所有节点上。还需要注意一点,如果您在函数中使用某个对象的成员,则 Spark 可终止对整个对象的序列化。
在执行程序上使用 MXNet 进行推理
如之前所述,对于此应用程序,我们将对每个节点运行一个执行程序,并对每个执行程序运行一个任务。
运行推理前,我们必须加载模型文件。
MXModel 类 (在mxinfer.py中) 从 MXNet Model Zoo 下载模型文件,然后创建 MXNet 模块,并在第一次使用时将其存储在
MXModel 类中。我们实施了单例模式,这样一来,我们无需在每次预测中都实例化和加载模型。
download_model_files 方法 (在 MXModel 单例类中) 将下载 ResNet-18 模型文件。该模型包含一个扩展名为 .json 的描述神经网络图的符号文件和一个扩展名为 .params 的包含模型参数的二进制文件。对于分类模型,将有一个包含类及其相应标签的 synsets.txt 文件。
下载模型文件后,我们将加载这些文件并在执行以下步骤的
init_module 例程中实例化 MXNet 模块对象:
def init_module(self, s_fname, p_fname, batch_size):
logger.info("initializing model")
import mxnet as mx
#load the symbol file
sym = mx.symbol.load(s_fname)
#load parameters
save_dict = mx.nd.load(p_fname)
arg_params = {}
aux_params = {}
for k, v in save_dict.items():
tp, name = k.split(':', 1)
if tp == 'arg':
arg_params[name] = v
if tp == 'aux':
aux_params[name] = v
mod = mx.mod.Module(symbol=sym)
#bind>mod.bind(for_training = False,>label_shapes = None)
#set parameters
mod.set_params(arg_params, aux_params, allow_missing=True)
复制代码
我们将在首次调用预测方法后立即下载并实例化 MXModel 对象。该预测转换方法还采用包含一批 (大小为
args[‘batch’]) 彩色图像的四维数组 (RGB 的其他 3 个维度) 并调用 MXNet 模块转发方法以生成该批量图像的预测。
注意: 我们出于上一个注释中讨论的原因在此方法内导入 mxnet 和 numpy 模块。
def predict(img_batch, args):
Run predication on batch of images in 4-D numpy array format and return the top_5 probability along with their classes.
import mxnet as mx
import numpy as np
logger.info('predict-args:%s' %(args))
if not MXModel.model_loaded:
MXModel(args['sym_url'], args['param_url'], args['label_url'], args['batch'])
MXModel.mod.forward(Batch([mx.nd.array(img_batch)]))
复制代码
运行 Inference Spark 应用程序
首先克隆deeplearning-emrGitHub 存储库,该库包含用于使用 MXNet 和 Spark 运行推理的代码。
git clone&& cd deeplearning-emr/mxnet-spark
复制代码
我们将使用来自的
spark-submit 脚本运行 Spark 应用程序。
export LD_PATH=$LD_LIBRARY_PATH ; spark-submit --deploy-mode cluster \
--master yarn --conf spark.dynamicAllocation.enabled=false \
--conf spark.executor.memory=40g \
--conf spark.executorEnv.LD_LIBRARY_PATH=\$LD_PATH \
--driver-library-path \$LD_PATH \
--num-executors 4 \
--py-files utils.py,mxinfer.py \
infer_main.py --sym_url 'http://data.mxnet.io/models/imagenet/resnet/18-layers/resnet-18-symbol.json' \
--param_url 'http://data.mxnet.io/models/imagenet/resnet/18-layers/resnet-18-0000.params' \
--label_url 'http://data.mxnet.io/models/imagenet/resnet/synset.txt' \
--batch 64 \
--bucket 'mxnet-spark-demo' \
--prefix 'cifar10/test' \
--output_s3_bucket '<YOUR_S3_BUCKET>' \
--output_s3_key 'cifar10_test_results'
复制代码
注意: 将
<YOUR_S3_BUCKET>替换为要在其中存储结果的 Amazon S3 存储桶。您应具有传递访问/私有密钥或具有实例 IAM 角色中的权限。
spark-submit 的参数包括:
收集预测
最后,我们将使用 Spark 收集操作来收集为每个分区生成的预测并将这些预测写入 Amazon S3。结果将写入到的 S3 位置 (
args[‘output_s3_bucket’],
args[‘output_s3_key’]) 可作为参数传递至
infer_main.py。
JavaScript
output = rdd.collect()
# drop the extra keys that we added to fill the last batch
keys = keys[:n_keys]
output = output[:n_keys]
if args['output_s3_key'] and args['output_s3_bucket']:
with open('/tmp/' + args['output_s3_key'] , 'w+') as f:
for k, o in zip(keys, output):
f.write("Key %s: Prediction: %s\n" % (k, o))
upload_file(args['output_s3_bucket'], args['output_s3_key'], '/tmp/' + args['output_s3_key'], s3_client)
复制代码
监控 Spark 应用程序
您可在 Amazon EMR 控制台中查看 Spark 应用程序历史记录和 YARN 应用程序状态。在整个运行时期间近实时更新应用程序历史记录,并且历史记录可在应用程序完成后 ( 甚至在您终止群集后 ) 最多保留 7 天。它还在一个位置集中提供高级指标,如内存使用率、S3 读取数、HDFS 利用率等。这还将消除对 SSH 转发的需求,这与您使用 YARN UI 时不同。您可以查找这些功能并了解如何在EMR 控制台上的 Spark 应用程序历史记录中使用它们。
来自 EMR 控制台应用程序历史记录的以下屏幕截图显示了应用程序任务、执行时间等。
还可使用驱动程序主机端口 8088 上的 Yarn ResourceManager Web UI 监控在 Amazon EMR 上运行的 Spark 应用程序。下面列出了 Amazon EMR 群集上可用的各种 Web UI 及其访问方式:EMR 上的 YARN Web UI。
以下屏幕截图说明了 Web 监控工具。我们可以查看执行时间表、作业持续时间和任务成功和失败。
Amazon EMR 的另一项出色功能是与Amazon CloudWatch的集成,这将允许监控群集资源和应用程序。在以下屏幕截图中,我们可以看到跨群集节点的 CPU 利用率,该值保持在 25% 以下。
结论
简而言之,我们演示了如何设置一个包含 4 个节点的 Spark 群集,该群集使用 MXNet 跨 Amazon S3 上存储的 10000 个图像运行分布式推理,同时在 5 (4.4) 分钟内完成该处理。
了解更多
未来改进
作者介绍:
Naveen Swamy 是 Amazon AI 的一名软件开发人员 ,负责构建创新性的深度学习工具。他专注于使深度学习可供软件工程师使用以发挥其技能并将其应用于日常应用。在业余时间,他喜欢和家人呆在一起。原文链接: