使用 Lambda 函数预处理数据 - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Lambda 函数预处理数据

如果流中的数据需要转变格式、转换、扩充或筛选,您可以使用 Amazon Lambda 函数预处理数据。您可以在执行应用程序 SQL 代码或应用程序通过数据流创建架构之前执行此操作。

使用 Lambda 函数预处理记录在以下场景中很有用:

  • 将记录从其他格式(例如 KPL 或 GZIP)转换为 Kinesis Data Analytics 可以分析的格式。Kinesis Data Analytics 目前支持 JSON 或 CSV 数据格式。

  • 将数据扩展为聚合或异常检测等操作更易访问的格式。例如,如果多个数据值存储在同一字符串中,您可以将数据展开为多个分开的列。

  • 使用其他亚马逊服务(例如外推或错误更正)丰富数据。

  • 将复杂的字符串转换应用于记录字段。

  • 用于整理数据的数据筛选。

使用 Lambda 函数预处理记录

创建 Kinesis Data Analytics 应用程序时,您可以在 “Connect 源” 页面中启用 Lambda 预处理。

使用 Lambda 函数预处理 Kinesis Data Analytics 应用程序中的记录
  1. 登录Amazon Web Services Management Console并打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在应用程序的 “Connect 源” 页面上,在 “记录预处理方式”Amazon Lambda 部分中选择 “启用”。

  3. 要使用您已经创建的 Lambda 函数,请在 Lambda 函数下拉列表中选择该函数。

  4. 要从其中一个 Lambda 预处理模板创建新的 Lambda 函数,请从下拉列表中选择该模板。然后,选择 View <template name> in Lambda (在 Lambda 中查看 <模板名称>) 以编辑该函数。

  5. 要创建新的 Lambda 函数,请选择新建。有关创建 Lambda 函数的信息,请参阅Amazon Lambda开发人员指南中的创建 HelloWorld Lambda 函数和浏览控制台

  6. 选择要使用的 Lambda 函数的版本。要使用最新版本,请选择 $LATEST

当您选择或创建 Lambda 函数进行记录预处理时,将在执行应用程序 SQL 代码或应用程序根据记录生成架构之前对记录进行预处理。

Lambda 预处理权限

要使用 Lambda 预处理,应用程序的 IAM 角色需要以下权限策略:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Lambda 预处理指标

您可以使用 Amazon CloudWatch 监控 Lambda 调用的数量、处理的字节数、成功和失败等。有关 Kinesis Data Analytics Lambda 预处理发布的 CloudWatch 指标的信息,请参阅 Amazon Kinesis Analytics 指标

Amazon Lambda与 Kinesis 制作人库一起使用

Kinesis Producer Lib rary (KPL) 将用户格式化的小型记录聚合为最大 1 MB 的较大记录,以更好地利用 Amazon Kinesis Data Streams 的吞吐量。适用于 Java 的 Kinesis Client Library (KCL) 支持分解这些记录。但在将 Amazon Lambda 作为流使用者时,必须使用特殊模块取消聚合记录。

要获取必要的项目代码和说明,请参阅Amazon Lambda上的 Kinesis Producer 库分解模块 GitHub。您可以使用此项目中的组件在 Amazon Lambda 中通过 Java、Node.js 和 Python 处理 KPL 序列化数据。您也可以将这些组件用在多语言 KCL 应用程序中。

数据预处理事件输入数据模型/记录响应模型

要预处理记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。

事件输入数据模型

Kinesis Data Analytics 持续读取来自 Kinesis 数据流或 Kinesis Data Firehose 传输流的数据。对于检索的每批记录,该服务管理如何将每批记录传递给您的 Lambda 函数。您的函数将接收到的记录列表作为输入。在您的函数中,您对列表进行迭代,并应用业务逻辑来完成您的预处理要求 (如数据格式转换或扩充)。

预处理函数的输入模型略有不同,具体取决于数据是从 Kinesis 数据流还是 Kinesis Data Firehose 传输流接收的。

如果源是一个 Kinesis Data Firehose 传输流,则事件输入数据模型如下所示:

Kinesis Data Firehose 请求数据模型

字段 描述
invocationId Lambda 调用 ID(随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序Amazon 资源名称(ARN
streamArn 传输流 ARN
记录
字段 描述
recordId 记录 ID (随机 GUID)
kinesisFirehoseRecordMetadata
字段 描述
approximateArrivalTimestamp 传输流记录大致到达时间
data Base64 编码的源记录负载

以下示例显示来自 Firehose 传输流的输入:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

如果源是 Kinesis 数据流,则事件输入数据模型如下所示:

Kinesis 流请求数据模型

字段 描述
invocationId Lambda 调用 ID(随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 ARN
streamArn 传输流 ARN
记录
字段 描述
recordId 基于 Kinesis 记录序列号的记录 ID
kinesisStreamRecordMetadata
字段 描述
sequenceNumber 从 Kinesis 流记录中得到的序列号
partitionKey 从 Kinesis 流记录中得到的分区键
shardId 从 Kinesis 流记录中得到的 ShardId
approximateArrivalTimestamp 传输流记录大致到达时间
data Base64 编码的源记录负载

以下示例显示来自 Kinesis 数据流的输入:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

记录响应模型

必须返回发送到 Lambda 函数,并从您的 Lambda 预处理函数返回的所有记录 (带记录 ID)。它们必须包含以下参数,否则 Kinesis Data Analytics 会拒绝这些参数并将其视为数据预处理故障。可对记录的数据负载部分进行转换,以满足预处理要求。

响应数据模型

记录
字段 描述
recordId 在调用期间,记录 ID 从 Kinesis Data Analytics 传递到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据预处理失败。
result 记录的数据转换的状态。可能的值包括:
  • Ok:记录已成功转换。Kinesis Data Analytics 提取记录进行 SQL 处理。

  • Dropped:该记录是您的处理逻辑故意删除的。Kinesis Data Analytics 从 SQL 处理中删除记录。对于 Dropped 记录,数据负载字段是可选的。

  • ProcessingFailed:记录无法转换。Kinesis Data Analytics 认为您的 Lambda 函数未成功对其进行处理,并将错误写入错误流。有关错误流的更多信息,请参阅错误处理。对于 ProcessingFailed 记录,数据负载字段是可选的。

data 转换后的数据负载 (使用 base64 编码之后)。如果应用程序提取数据格式为 JSON,则每个数据负载可能包含多个 JSON 文档。或者,如果应用程序提取数据格式为 CSV,则每个数据负载可能包含多个 CSV 行 (在每一行中指定行分隔符)。Kinesis Data Analytics 服务成功解析和处理同一数据负载中包含多个 JSON 文档或 CSV 行的数据。

以下示例显示来自 Lambda 函数的输出:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

常见的数据预处理失败情况

以下是预处理失败的常见原因。

  • 并非批量中发送到 Lambda 函数的所有记录(带有记录 ID)都会返回到 Kinesis Data Analytics 服务。

  • 响应中缺少记录 ID、状态或数据负载字段。对于 DroppedProcessingFailed 记录,数据负载字段是可选的。

  • Lambda 函数超时不足以预处理数据。

  • Lambda 函数的响应超过了Amazon Lambda服务施加的响应限制。

对于数据预处理失败,Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用,直到成功为止。您可以监控以下 CloudWatch 指标,以深入了解故障。

  • Kinesis Data Analytics 应用程序MillisBehindLatest:表示应用程序在读取流媒体源时落后多远。

  • Kinesis Data Analytics 应用程序InputPreprocessing CloudWatch 指标:指明成功和失败的数量以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标

  • Amazon Lambda函数 CloudWatch 指标和日志。