示例:写入 Amazon S3 存储桶 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:写入 Amazon S3 存储桶

在本练习中,您将创建适用于 Apache FlAmazon Kinesis Data Analytics,该数据流以 Kinesis 数据流作为源,将 Amazon S3 存储桶作为接收器。使用 simple Storage S3 控制台中验证应用程序的输出。

注意

要为本练习设置所需的先决条件,请先完成入门指南 (DataStream API)练习。

创建相关资源

在为本练习创建适用于 Apache Flink 的 Amazon Kinesis 数据分析之前,您需要创建以下依赖资源:

  • Kinesis 数据流 (ExampleInputStream)。

  • 用于存储应用程序代码和输出的 Amazon S3 存储桶和输出(ka-app-code-<username>

注意

在 Kinesis Data Analytics 上启用服务器端加密后,适用于 Apache Flink 的 Kinesis Data Analytics 无法将数据写入Amazon S3。

您可以使用控制台创建 Kinesis 直播桶和Amazon S3 存储桶和Amazon S3 存储桶 有关创建这些资源的说明,请参阅以下主题:

  • Amazon Kinesis Data Streams 开发人员指南中@@ 创建和更新数据流。将数据流命名为 ExampleInputStream

  • 如何创建 S3 存储桶?Amazon Simple Storage Serv ice 通过附加您的登录名,为 Amazon S3 存储桶指定一个全球唯一的名称,例如ka-app-code-<username>。在 Amazon S3 存储桶中创建两个文件夹(codedata)。

如果以下 CloudWatch 资源尚不存在,则应用程序将创建这些资源:

  • 名为 /aws/kinesis-analytics-java/MyApplication 的日志组。

  • 名为 kinesis-analytics-log-stream 的日志流。

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/S3Sink目录。

应用程序代码位于 S3StreamingSinkJob.java 文件中。请注意有关应用程序代码的以下信息:

  • 该应用程序使用 Kinesis 源从源码流中读取数据。以下片段创建了 Kinesis 源代码:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
  • 您需要添加以下导入语句:

    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  • 该应用程序使用 Amazon S3 S3 接收桶来写入 Amazon S3 S3。

    接收器在滚动窗口中读取消息,将消息编码为 S3 存储桶对象,然后将编码的对象发送到 S3 接收器。以下代码对要发送到 Amazon S3 的对象进行编码:

    input.map(value -> { // Parse the JSON JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class); return new Tuple2<>(jsonNode.get("ticker").toString(), 1); }).returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(v -> v.f0) // Logically partition the stream for each word .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .sum(1) // Count the appearances by ticker per partition .map(value -> value.f0 + " count: " + value.f1.toString() + "\n") .addSink(createS3SinkFromStaticConfig());
注意

该应用程序使用 FlinkStreamingFileSink 对象写入Amazon S3。有关 Apache 的更多信息StreamingFileSink,请参阅 StreamingFileSinkApache Flink 文档

修改应用程序代码

在本节中,您将修改应用程序代码以将输出写入您的 Amazon S3 存储桶。

使用您的用户名更新以下行,以指定应用程序的输出位置:

private static final String s3SinkPath = "s3a://ka-app-code-<username>/data";

编译应用程序代码

要编译应用程序,请执行以下操作:

  1. 如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅入门指南 (DataStream API)教程中的先决条件

  2. 使用以下命令编译应用程序:

    mvn package -Dflink.version=1.15.3

编译应用程序将创建应用程序 JAR 文件 (target/aws-kinesis-analytics-java-apps-1.0.jar)。

注意

提供的源代码依赖于 Java 11 中的库。

上传 Apache Flink 流式处理 Java 代码

在本部分中,您将应用程序代码上传到您在该创建相关资源 将示例记录写入输入流部分创建的 Amazon S3 存储桶。

  1. 在 Amazon S3 控制台中,选择 ka-app-code- <username>存储桶,导航到代码文件夹,然后选择上传

  2. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 aws-kinesis-analytics-java-apps-1.0.jar 文件。

  3. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 桶中,您的应用程序可以在其中访问。

创建并运行 Kinesis Data Analytics 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建 应用程序

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在 Kinesis Data Analytics 仪表板上,选择创建分析应用程序

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于 Runtime (运行时),请选择 Apache Flink

    • 将版本下拉列表保留为 Apache Flink 版本 1.15.2(推荐版本)

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

    注意

    当您使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于 Runtime (运行时),请选择 Apache Flink

    • 将该版本保留为 Apache Flink 版本 1.15.2(推荐版本)

  6. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  7. 选择创建应用程序

注意

当您使用控制台创建适用于 Apache Flink 的 Amazon Kinesis 数据分析时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑 IAM 策略

编辑 IAM 策略以添加权限以访问 Kinesis 数据流。

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。<username>替换为您的用户名。

    { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-code-<username>", "arn:aws:s3:::ka-app-code-<username>/*" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:%LOG_GROUP_PLACEHOLDER%:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:%LOG_GROUP_PLACEHOLDER%:log-stream:%LOG_STREAM_PLACEHOLDER%" ] } , { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, ] }

配置应用程序

  1. MyApplication页面上,选择配置

  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 对于 Amazon S3 存储桶,输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入code/aws-kinesis-analytics-java-apps-1.0.jar

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  5. 要进行CloudWatch 记录,请选中 “启用” 复选框。

  6. 选择更新

注意

当您选择启用 CloudWatch 日志记录时,Kinesis Data Analytics 会为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。

运行应用程序

  1. MyApplication页面上,选择 “运行”。保持 “不使用快照运行” 选项处于选中状态,然后确认操作。

  2. 当应用程序正在运行时,请刷新页面。控制台将显示 Application graph (应用程序图表)

验证应用程序输出

在 Amazon S3 控制台中,打开 S3 存储桶中的数据文件夹。

几分钟后,将显示包含来自应用程序的聚合数据的对象。

注意

默认情况下,聚合在 Flink 中已启用。要禁用它,请使用下面的命令:

sink.producer.aggregation-enabled' = 'false'

可选:自定义源和接收器

在本节中,您将自定义源对象和接收器对象的设置。

注意

更改以下各节中描述的代码段后,执行以下操作以重新加载应用程序代码:

  • 重复该编译应用程序代码部分中的步骤以编译更新的应用程序代码。

  • 重复上传 Apache Flink 流式处理 Java 代码部分中的步骤,上传更新的应用程序代码。

  • 在控制台的应用程序页面上,选择配置,然后选择更新,将更新的应用程序代码重新加载到您的应用程序中。

配置Data 分区

在本节中,您将配置流式文件接收器在 S3 存储桶中创建的文件夹的名称。您可以通过向流文件接收器添加存储桶分配器来完成此操作。

要自定义在 S3 存储桶中创建的文件夹名称,请执行以下操作:

  1. 将以下导入语句添加到S3StreamingSinkJob.java文件开头:

    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
  2. 将代码中的createS3SinkFromStaticConfig()方法更新为类似于以下内容:

    private static StreamingFileSink<String> createS3SinkFromStaticConfig() { final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8")) .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")) .withRollingPolicy(DefaultRollingPolicy.create().build()) .build(); return sink; }

前面的代码示例使用DateTimeBucketAssigner带有自定义日期格式的,在 S3 存储桶中创建文件夹。DateTimeBucketAssigner使用当前系统时间来创建存储段名称。如果您想创建自定义存储桶分配器以进一步自定义创建的文件夹名称,则可以创建一个实现的类BucketAssigner。您可以使用getBucketId方法实现您的自定义逻辑。

的自定义实现BucketAssigner可以使用 Co ntext 参数获取有关记录的更多信息,以确定其目标文件夹。

配置读取频率

在本节中,您将配置源流的读取频率。

默认情况下,Kinesis Streams 消费者每秒从源数据流读取五次。如果有多个客户端从流中读取数据,或者应用程序需要重试读取记录,则此频率将导致问题。您可以通过设置消费者的读取频率来避免这些问题。

要设置 Kinesis 使用者的读取频率,请设置该SHARD_GETRECORDS_INTERVAL_MILLIS设置。

以下代码示例将设置设置SHARD_GETRECORDS_INTERVAL_MILLIS设置为一秒:

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

配置写入缓冲

在本节中,您将配置 sink 的写入频率和其他设置。

默认情况下,应用程序每分钟写入一次目标存储桶。您可以通过配置DefaultRollingPolicy对象来更改此间隔和其他设置。

注意

每次应用程序创建检查点时,Apache Flink 流文件接收器都会写入其输出存储桶。默认情况下,应用程序每分钟创建一个检查点。要增加 S3 接收器的写入间隔,还必须增加检查点间隔。

若要配置DefaultRollingPolicy对象,请执行以下操作:

  1. 增加应用程序的CheckpointInterval设置。 UpdateApplication操作的以下输入将检查点间隔设置为 10 分钟:

    { "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "ConfigurationTypeUpdate" : "CUSTOM", "CheckpointIntervalUpdate": 600000 } } }, "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5 }

    要使用上述代码,请指定当前应用程序版本。您可以使用ListApplications操作检索应用程序版本。

  2. 将以下导入语句添加到S3StreamingSinkJob.java文件开头:

    import java.util.concurrent.TimeUnit;
  3. 更新S3StreamingSinkJob.java文件中的createS3SinkFromStaticConfig方法,使其如下所示:

    private static StreamingFileSink<String> createS3SinkFromStaticConfig() { final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8")) .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")) .withRollingPolicy( DefaultRollingPolicy.create() .withRolloverInterval(TimeUnit.MINUTES.toMillis(8)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .build(); return sink; }

    前面的代码示例将写入 Amazon S3 存储桶的频率设置为 8 分钟。

有关配置 Apache Flink 流式文件接收器的更多信息,请参阅 Apache Flink 文档中的行编码格式

清理 Amazon 资源

本节包括清理您在 Amazon S3 教程中创建的Amazon资源的过程。

删除 Kinesis Data Analytics

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在 Kinesis Data Analytics 面板中,选择MyApplication

  3. 在应用程序页面上,选择 “删除”,然后确认删除。

删除 Kinesis Data Streams

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Kinesis Data Streams 面板中,选择ExampleInputStream

  3. ExampleInputStream页面上,选择 “删除 Kinesis Stream”,然后确认删除。

删除您的Amazon S3 对象和存储桶

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择 ka-app-code- 存储桶。 <username>

  3. 选择 Delete (删除),然后输入存储桶名称以确认删除。

删除您的 IAM 资源

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择 kinesis-analytics-service-MyApplication- <your-region>策略。

  5. 选择 Policy Actions (策略操作),然后选择 Delete (删除)

  6. 在导航栏中,选择 Roles(角色)。

  7. 选择 k inesis-analyticsMyApplication- <your-region>角色。

  8. 选择 Delete role (删除角色),然后确认删除。

删除您的 CloudWatch 资源

  1. 通过 https://console.aws.amazon.com/cloudwatch/ 打开 CloudWatch 主机。

  2. 在导航栏中,选择 Logs(日志)。

  3. 选择 /aws/kinesis-analytics/MyApplication 日志组。

  4. 选择 Delete Log Group (删除日志组),然后确认删除。