对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Lambda 函数预处理数据
如果流中的数据需要转变格式、转换、扩充或筛选,您可以使用 Amazon Lambda 函数预处理数据。您可以在执行应用程序 SQL 代码或应用程序通过数据流创建架构之前执行此操作。
使用 Lambda 函数预处理记录在以下场景中很有用:
-
将记录从其他格式(例如 KPL 或 GZIP)转换为 Kinesis Data Analytics 可以分析的格式。Kinesis Data Analytics 目前支持 JSON 或 CSV 数据格式。
-
将数据扩展为聚合或异常检测等操作更易访问的格式。例如,如果多个数据值存储在同一字符串中,您可以将数据展开为多个分开的列。
-
使用其他亚马逊服务(例如外推或错误更正)丰富数据。
-
将复杂的字符串转换应用于记录字段。
-
用于整理数据的数据筛选。
使用 Lambda 函数预处理记录
创建 Kinesis Data Analytics 应用程序时,您可以在 “Connect 源” 页面中启用 Lambda 预处理。
使用 Lambda 函数预处理 Kinesis Data Analytics 应用程序中的记录
登录Amazon Web Services Management Console并打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
在应用程序的 “Connect 源” 页面上,在 “记录预处理方式”Amazon Lambda 部分中选择 “启用”。
-
要使用您已经创建的 Lambda 函数,请在 Lambda 函数下拉列表中选择该函数。
-
要从其中一个 Lambda 预处理模板创建新的 Lambda 函数,请从下拉列表中选择该模板。然后,选择 View <template name> in Lambda (在 Lambda 中查看 <模板名称>) 以编辑该函数。
-
要创建新的 Lambda 函数,请选择新建。有关创建 Lambda 函数的信息,请参阅Amazon Lambda开发人员指南中的创建 HelloWorld Lambda 函数和浏览控制台。
-
选择要使用的 Lambda 函数的版本。要使用最新版本,请选择 $LATEST。
当您选择或创建 Lambda 函数进行记录预处理时,将在执行应用程序 SQL 代码或应用程序根据记录生成架构之前对记录进行预处理。
Lambda 预处理权限
要使用 Lambda 预处理,应用程序的 IAM 角色需要以下权限策略:
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }
Lambda 预处理指标
您可以使用 Amazon CloudWatch 监控 Lambda 调用的数量、处理的字节数、成功和失败等。有关 Kinesis Data Analytics Lambda 预处理发布的 CloudWatch 指标的信息,请参阅 Amazon Kinesis Analytics 指标。
Amazon Lambda与 Kinesis 制作人库一起使用
Kinesis Producer Lib rary (KPL) 将用户格式化的小型记录聚合为最大 1 MB 的较大记录,以更好地利用 Amazon Kinesis Data Streams 的吞吐量。适用于 Java 的 Kinesis Client Library (KCL) 支持分解这些记录。但在将 Amazon Lambda 作为流使用者时,必须使用特殊模块取消聚合记录。
要获取必要的项目代码和说明,请参阅Amazon Lambda上的 Kinesis Producer 库分解模块
数据预处理事件输入数据模型/记录响应模型
要预处理记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。
事件输入数据模型
Kinesis Data Analytics 持续读取来自 Kinesis 数据流或 Kinesis Data Firehose 传输流的数据。对于检索的每批记录,该服务管理如何将每批记录传递给您的 Lambda 函数。您的函数将接收到的记录列表作为输入。在您的函数中,您对列表进行迭代,并应用业务逻辑来完成您的预处理要求 (如数据格式转换或扩充)。
预处理函数的输入模型略有不同,具体取决于数据是从 Kinesis 数据流还是 Kinesis Data Firehose 传输流接收的。
如果源是一个 Kinesis Data Firehose 传输流,则事件输入数据模型如下所示:
Kinesis Data Firehose 请求数据模型
| 字段 | 描述 | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 调用 ID(随机 GUID)。 | ||||||||||||
applicationArn |
Kinesis Data Analytics 应用程序Amazon 资源名称(ARN | ||||||||||||
streamArn |
传输流 ARN | ||||||||||||
记录
|
|||||||||||||
以下示例显示来自 Firehose 传输流的输入:
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }
如果源是 Kinesis 数据流,则事件输入数据模型如下所示:
Kinesis 流请求数据模型
| 字段 | 描述 | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 调用 ID(随机 GUID)。 | ||||||||||||||||||
applicationArn |
Kinesis Data Analytics 应用程序 ARN | ||||||||||||||||||
streamArn |
传输流 ARN | ||||||||||||||||||
记录
|
|||||||||||||||||||
以下示例显示来自 Kinesis 数据流的输入:
{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }
记录响应模型
必须返回发送到 Lambda 函数,并从您的 Lambda 预处理函数返回的所有记录 (带记录 ID)。它们必须包含以下参数,否则 Kinesis Data Analytics 会拒绝这些参数并将其视为数据预处理故障。可对记录的数据负载部分进行转换,以满足预处理要求。
响应数据模型
记录
|
以下示例显示来自 Lambda 函数的输出:
{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }
常见的数据预处理失败情况
以下是预处理失败的常见原因。
-
并非批量中发送到 Lambda 函数的所有记录(带有记录 ID)都会返回到 Kinesis Data Analytics 服务。
-
响应中缺少记录 ID、状态或数据负载字段。对于
Dropped或ProcessingFailed记录,数据负载字段是可选的。 -
Lambda 函数超时不足以预处理数据。
-
Lambda 函数的响应超过了Amazon Lambda服务施加的响应限制。
对于数据预处理失败,Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用,直到成功为止。您可以监控以下 CloudWatch 指标,以深入了解故障。
-
Kinesis Data Analytics 应用程序
MillisBehindLatest:表示应用程序在读取流媒体源时落后多远。 -
Kinesis Data Analytics 应用程序
InputPreprocessingCloudWatch 指标:指明成功和失败的数量以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标。 -
Amazon Lambda函数 CloudWatch 指标和日志。