为应用程序目标创建 Lambda 函数 - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为应用程序目标创建 Lambda 函数

您的 Kinesis Data Analytics 应用程序可以使用Amazon Lambda函数作为输出。Kinesis Data Analytics 提供了用于创建 Lambda 函数的模板,这些函数用作应用程序的目的地。可以将这些模板作为应用程序输出后处理的起点。

在 Node.js 中创建 Lambda 函数目标

控制台上提供了以下用于在 Node.js 中创建目标 Lambda 函数的模板:

Lambda 作为输出蓝图 语言和版本 描述
kinesis-analytics-output Node.js 12.x 将输出记录从 Kinesis Data Analytics 应用程序传输到自定义目的地。

在 Python 中创建 Lambda 函数目标

控制台上提供了以下用于在 Python 中创建目标 Lambda 函数的模板:

Lambda 作为输出蓝图 语言和版本 描述
kinesis-analytics-output-sns Python 2.7 将输出记录从 Kinesis Data Analytics 应用程序传递到Amazon SNS。
kinesis-analytics-output-ddb Python 2.7 将输出记录从 Kinesis Data Analytics 应用程序传递到Amazon DynamoDB。

在 Java 中创建 Lambda 函数目标

要在 Java 中创建目标 Lambda 函数,请使用 Java 事件类。

以下代码演示了使用 Java 的目标 Lambda 函数示例:

public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> { @Override public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>(); KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint); // Add logic here to transform and send the record to final destination of your choice. response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok)); }); return response; } }

在.NET 中创建 Lambda 函数目标

要在.NET 中创建目标 Lambda 函数,请使用 .NET 事件类。

以下代码演示了使用 C# 的目标 Lambda 函数示例:

public class Function { public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsOutputDeliveryResponse { Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add logic here to send to the record to final destination of your choice. var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsOutputDeliveryResponse.OK }; response.Records.Add(deliveredRecord); } return response; } }

有关在.NET 中创建用于预处理的 Lambda 函数和目标的更多信息,请参阅Amazon.Lambda.KinesisAnalyticsEvents