对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
为应用程序目标创建 Lambda 函数
您的 Kinesis Data Analytics 应用程序可以使用Amazon Lambda函数作为输出。Kinesis Data Analytics 提供了用于创建 Lambda 函数的模板,这些函数用作应用程序的目的地。可以将这些模板作为应用程序输出后处理的起点。
在 Node.js 中创建 Lambda 函数目标
控制台上提供了以下用于在 Node.js 中创建目标 Lambda 函数的模板:
| Lambda 作为输出蓝图 | 语言和版本 | 描述 |
|---|---|---|
kinesis-analytics-output |
Node.js 12.x | 将输出记录从 Kinesis Data Analytics 应用程序传输到自定义目的地。 |
在 Python 中创建 Lambda 函数目标
控制台上提供了以下用于在 Python 中创建目标 Lambda 函数的模板:
| Lambda 作为输出蓝图 | 语言和版本 | 描述 |
|---|---|---|
kinesis-analytics-output-sns |
Python 2.7 | 将输出记录从 Kinesis Data Analytics 应用程序传递到Amazon SNS。 |
kinesis-analytics-output-ddb |
Python 2.7 | 将输出记录从 Kinesis Data Analytics 应用程序传递到Amazon DynamoDB。 |
在 Java 中创建 Lambda 函数目标
要在 Java 中创建目标 Lambda 函数,请使用 Java 事件
以下代码演示了使用 Java 的目标 Lambda 函数示例:
public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> { @Override public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>(); KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint); // Add logic here to transform and send the record to final destination of your choice. response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok)); }); return response; } }
在.NET 中创建 Lambda 函数目标
要在.NET 中创建目标 Lambda 函数,请使用 .NET 事件
以下代码演示了使用 C# 的目标 Lambda 函数示例:
public class Function { public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsOutputDeliveryResponse { Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add logic here to send to the record to final destination of your choice. var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsOutputDeliveryResponse.OK }; response.Records.Add(deliveredRecord); } return response; } }
有关在.NET 中创建用于预处理的 Lambda 函数和目标的更多信息,请参阅Amazon.Lambda.KinesisAnalyticsEvents