对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Lambda 函数作为输出
通过将 Amazon Lambda 作为目标,您可以更轻松地执行 SQL 结果后处理,然后再将其发送到最终目标。常见的后处理任务包括:
-
将多行聚合为一条记录
-
将当前结果与过去的结果相结合以解决迟到数据的问题
-
根据信息类型传输到不同的目标
-
记录格式转换 (如转换为 Protobuf)
-
字符串操作或转换
-
分析处理后的数据扩充
-
地理空间使用案例的自定义处理
-
数据加密
Lambda 函数可以向各种Amazon服务和其他目的地提供分析信息,包括:
有关创建 Lambda 应用程序的更多信息,请参阅入门Amazon Lambda。
主题
Lambda 作为输出权限
要使用 Lambda 作为输出,应用程序的 Lambda 输出 IAM 角色需要以下权限策略:
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "FunctionARN" }
Lambda 作为输出指标
您可以使用 Amazon CloudWatch 监控发送的字节数、成功和失败等。有关 Kinesis Data Analytics 使用 Lambda 作为输出发布的 CloudWatch 指标的信息,请参阅 Amazon Kinesis Analytics 指标。
Lambda 作为输出事件输入数据模型和记录响应模型
要发送 Kinesis Data Analytics 输出记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。
事件输入数据模型
Kinesis Data Analytics 使用以下请求模型将应用程序的输出记录作为输出函数持续发送到 Lambda。在您的函数中,遍历列表并应用业务逻辑来完成输出要求 (例如,在将数据发送到最终目标之前先进行数据转换)。
| 字段 | 描述 | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 调用 ID(随机 GUID)。 | ||||||||||||
applicationArn |
Kinesis Data Analytics 应用程序ARN Amazon 资源名称。 | ||||||||||||
记录
|
|||||||||||||
注意
retryHint 是一个每次传输失败时都会增加的值。该值不会持久不变,并在应用程序中断时重置。
记录响应模型
作为输出函数(带有记录 ID)发送到您的 Lambda 的每条记录都必须使用Ok或进行确认DeliveryFailed,并且必须包含以下参数。否则,Kinesis Data Analytics 会将其视为交付失败。
记录
|
Lambda 输出调用频率
Kinesis Data Analytics 应用程序会缓冲输出记录并频繁调用Amazon Lambda目标函数。
-
如果在数据分析应用程序中使用滚动窗口将记录发送到目标应用程序内部流,每次触发滚动窗口时,都会调用 Amazon Lambda 目标函数。例如,如果使用 60 秒的滚动窗口将记录发送到目标应用程序内流,则每 60 秒调用一次 Lambda 函数。
-
如果记录作为连续查询或滑动窗口发送到应用程序内的目标应用程序流,则 Lambda 目标函数大约每秒调用一次。
注意
每个 Lambda 函数调用请求的有效负载大小限制适用。超过这些限制会导致输出记录被拆分并在多个 Lambda 函数调用之间发送。
添加一个 Lambda 函数以用作输出
以下过程演示如何添加 Lambda 函数作为 Kinesis Data Analytics 应用程序的输出。
登录Amazon Web Services Management Console并打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
选择列表中的应用程序,然后选择 Application details。
-
在 Destination 部分,选择 Connect new destination。
-
对于 Destination 项,选择 Amazon Lambda function。
-
在将记录传送到Amazon Lambda部分中,选择现有的 Lambda 函数和版本,或者选择新建。
-
如果您要创建新的 Lambda 函数,请记下:
-
选择提供的模板之一。有关更多信息,请参阅 为应用程序目标创建 Lambda 函数。
-
将在新的浏览器选项卡中打开 Create Function (创建函数) 页。在 Name (名称) 框中,为函数指定一个有意义的名称(例如,
myLambdaFunction)。 -
针对您的应用程序用后处理功能更新模板。有关创建 Lambda 函数的信息,请参阅Amazon Lambda开发人员指南中的入门。
-
在 Kinesis Data Analytics 控制台的 Lambda 函数列表中,选择您刚刚创建的 Lambda 函数。选择 $LATES T 获取 Lambda 函数版本。
-
-
在 In-application stream 部分,选择 Choose an existing in-application stream。对于 In-application stream name,选择应用程序的输出流。所选输出流的结果将发送到 Lambda 输出函数。
-
保持表单其余部分为默认值,然后选择 Save and continue。
现在,您的应用程序将记录从应用程序内流发送到您的 Lambda 函数。您可以在亚马逊 CloudWatch 控制台中查看默认模板的结果。监控AWS/KinesisAnalytics/LambdaDelivery.OkRecords指标以查看传输到 Lambda 函数的记录数量。
常见的 Lambda 作为输出故障
以下是向 Lambda 函数交付失败的常见原因。
-
并非批处理中发送到 Lambda 函数的所有记录(带有记录 ID)都会返回到 Kinesis Data Analytics 服务。
-
响应中缺少记录 ID 或状态字段。
-
Lambda 函数超时不足以完成 Lambda 函数中的业务逻辑。
-
Lambda 函数中的业务逻辑不会捕获所有错误,导致因未处理的异常而产生超时和反向压力。这些消息通常称为“毒丸”消息。
对于数据传输失败,Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用,直到成功为止。要深入了解故障,您可以监控以下 CloudWatch 指标:
-
Kinesis Data Analytics 应用程序 Lambda as 输出 CloudWatch 指标:指明成功和失败的数量以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标。
-
Amazon Lambda函数 CloudWatch 指标和日志。