示例:在 Scala 中将流媒体数据发送到 Amazon S3 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:在 Scala 中将流媒体数据发送到 Amazon S3

注意

从 1.15 版本开始,Flink 是免费的 Scala。应用程序现在可以使用任何 Scala 版本的 Java API。Flink 仍然在一些关键组件中使用 Scala,但不会将 Scala 暴露给用户代码类加载器。因此,用户需要将 Scala 依赖项添加到他们的 jar 存档中。

有关 Flink 1.15 中 Scala 变更的更多信息,请参阅 One Feifteen 中的 Scala Free

在本练习中,您将创建一个使用 Scala 3.2.0 和 Flink 的 Java DataStream API 的简单流媒体应用程序。该应用程序从 Kinesis 流读取数据,使用滑动窗口聚合数据,并将结果写入 S3。

注意

要设置本练习所需的先决条件,请先完成入门 (Scala) 练习。您只需要在 Amazon S3 存储桶data/中创建一个额外的文件夹 ka-app-code-<username>.

下载并检查应用程序代码

此示例的 Python 应用程序代码可从以下网址获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/scala/S3Sink目录。

请注意有关应用程序代码的以下信息:

  • build.sbt文件包含有关应用程序配置和依赖关系的信息,包括 Kinesis Data Analytics 库。

  • BasicStreamingJob.scala文件包含定义应用程序功能的主要方法。

  • 该应用程序使用 Kinesis 源从源码流中读取数据。以下片段创建了 Kinesis 源代码:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    该应用程序还使用 a 写 StreamingFileSink 入 Amazon S3 存储桶的内容:

    def createSink: StreamingFileSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val s3SinkPath = applicationProperties.get("ProducerConfigProperties").getProperty("s3.sink.path") StreamingFileSink .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder[String]("UTF-8")) .build() }
  • 应用程序创建源和接收连接器以使用 StreamExecutionEnvironment 对象访问外部资源。

  • 应用程序使用动态应用程序属性创建源连接器和接收器连接器。读取运行时应用程序的属性以配置连接器。有关运行时属性的更多信息,请参阅运行时属性

编译并上传应用程序代码

在本节中,您将编译应用程序代码并上传到 Amazon S3 存储桶。

编译应用程序代码

使用 SBT 编译工具为应用程序构建 Scala 代码。要安装 SBT,请参阅使用 cs 设置安装 sbt。您还需要安装 Java 开发工具包 (JDK)。请参阅完成练习的先决条件

  1. 要使用您的应用程序代码,您将其编译和打包成 JAR 文件。你可以用 SBT 编译和打包你的代码:

    sbt assembly
  2. 如果应用程序成功编译,则创建以下文件:

    target/scala-3.2.0/s3-sink-scala-1.0.jar
上传 Apache Flink 直播 Scala 代码

在本节中,您将创建一个 Simple Storage S3 存储桶并上传您的应用程序代码。

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择创建存储桶

  3. ka-app-code-<username>存储段名称字段中输入。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择下一步

  4. 配置选项中,保持设置不变,然后选择下一步

  5. 设置权限中,保持设置不变,然后选择下一步

  6. 选择创建桶

  7. 选择ka-app-code-<username>存储桶然后选择 Upload(上载)。

  8. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 s3-sink-scala-1.0.jar 文件。

  9. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 桶中,您的应用程序可以在其中访问。

创建并运行应用程序(控制台)

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建 应用程序

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在 Kinesis Data Analytics 仪表板上,选择创建分析应用程序

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于描述,输入 My java test app

    • 对于 Runtime (运行时),请选择 Apache Flink

    • 将该版本保留为 Apache Flink 版本 1.15.2(推荐版本)

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

注意

当您使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

配置应用程序

可以按照以下步骤配置应用程序。

配置应用程序
  1. MyApplication页面上,选择配置

  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 对于 Amazon S3 存储桶,输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入s3-sink-scala-1.0.jar

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. 在 “属性” 下,选择 “添加群组”。

  5. 输入以下信息:

    组 ID
    ConsumerConfigProperties input.stream.name ExampleInputStream
    ConsumerConfigProperties aws.region us-west-2
    ConsumerConfigProperties flink.stream.initpos LATEST

    选择保存

  6. 在 “属性” 下,选择 “添加群组”。

  7. 输入以下信息:

    组 ID
    ProducerConfigProperties s3.sink.path s3a://ka-app-code-<user-name>/data
  8. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  9. 要进行CloudWatch 记录,请选中 “启用” 复选框。

  10. 选择更新

注意

当您选择启用亚马逊 CloudWatch 日志记录时,Kinesis Data Analytics 会为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

编辑 IAM 策略

编辑 IAM 策略以添加权限以访问 Amazon S3 存储桶的权限。

编辑 IAM 策略以添加 S3 存储桶权限
  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-code-<username>", "arn:aws:s3:::ka-app-code-<username>/*" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" } ] }

运行应用程序

可以通过运行应用程序、打开 Apache Flink 仪表板并选择所需的 Flink 作业来查看 Flink 任务图。

停止应用程序

要停止应用程序,请在MyApplication页面上选择停止。确认该操作。

创建并运行应用程序 (CLI)

在本节中,您将使用创建和运行 Kinesis Data Analytics 应用程序。Amazon Command Line Interface使用 k inesisanalyticsv2Amazon CLI 命令创建 Kinesis Data Analytics 应用程序并与之交互。

创建权限策略

注意

您必须为应用程序创建一个权限策略和角色。如果未创建这些 IAM 资源,应用程序将无法访问其数据和日志流。

首先,您创建一个包含两个语句的权限策略:一个语句授予对源流进行读取操作的权限,另一个语句授予对接收流进行写入操作的权限。然后,您将策略附加到 IAM 角色(您在下一节中创建)。因此,当 Kinesis Data Analytics 担任该角色时,该服务具有从源流读取和写入接收流的必要权限。

使用以下代码创建 KAReadSourceStreamWriteSinkStream 权限策略。username替换为用于创建 Amazon S3 存储桶以存储应用程序代码的用户名。将亚马逊资源名称 (ARN) 中的账户 ID 替换(012345678901)为您的账户 ID。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

有关创建权限策略的 step-by-step 说明,请参阅 IAM 用户指南中的教程:创建并附加您的第一个客户托管策略

创建 IAM 角色

在本节中,您将创建一个 IAM 角色,Kinesis Data Analytics 应用程序可以假定该角色来读取源流并写入接收流。

未经许可,Kinesis Data Analytics 无法访问您的直播。您可以通过 IAM 角色授予这些权限。每个 IAM 角色都附加了两个策略。信任策略授予 Kinesis Data Analytics 代入角色,权限策略决定了 Kinesis Data Analytics a

您将在上一部分中创建的权限策略附加到此角色。

创建 IAM 角色
  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航窗格中,选择角色,然后选择创建角色

  3. “选择可信身份类型” 下,选择 “Amazon服务

  4. Choose the service that will use this role (选择将使用此角色的服务) 下,选择 Kinesis

  5. 选择您的使用案例下,选择 Kinesis Analytics

  6. 选择Next: Permissions(下一步: 权限)

  7. Attach permissions policies 页面上,选择 Next: Review。在创建角色后,您可以附加权限策略。

  8. 创建角色页面上KA-stream-rw-role,输入角色名称。选择 Create role(创建角色)。

    现在,您已经创建了一个名为的新 IAM 角色KA-stream-rw-role。接下来,更新角色的信任和权限策略

  9. 将权限策略附加到角色。

    注意

    在本练习中,Kinesis Data Analytics 扮演这个角色,既可以从 Kinesis 数据流(源)读取数据,也可以将输出写入另一个 Kinesis 数据流。因此,您可以附加您在上一步中创建的策略,即创建权限策略

    1. Summary (摘要) 页上,选择 Permissions (权限) 选项卡。

    2. 选择附加策略

    3. 在搜索框中,输入 KAReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。

    4. 选择KAReadSourceStreamWriteSinkStream策略,然后选择附加策略

现在,您已经创建了应用程序用来访问资源的服务执行角色。记下新角色的 ARN。

有关创建角色的 step-by-step 说明,请参阅 IAM 用户指南中的创建 IAM 角色(控制台)

创建应用程序

将以下 JSON 代码保存到名为 create_request.json 的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀(用户名)替换为您在上一节中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。

{ "ApplicationName": "s3_sink", "ApplicationDescription": "Scala tumbling window application", "RuntimeEnvironment": "FLINK-1_15", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "s3-sink-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "s3.sink.path" : "s3a://ka-app-code-<username>/data" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }

CreateApplication使用以下请求执行以创建应用程序:

aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

应用程序现已创建。您在下一步中启动应用程序。

启动应用程序

在本节中,您使用 StartApplication 操作来启动应用程序。

启动应用程序
  1. 将以下 JSON 代码保存到名为 start_request.json 的文件中。

    {{ "ApplicationName": "s3_sink", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 使用上述请求执行 StartApplication 操作来启动应用程序:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看 Kinesis Data Analytics 指标,以验证应用程序是否正常运行。

停止应用程序

在本节中,您使用 StopApplication 操作来停止应用程序。

停止应用程序
  1. 将以下 JSON 代码保存到名为 stop_request.json 的文件中。

    { "ApplicationName": "s3_sink" }
  2. 使用上述请求执行StopApplication操作以停止应用程序:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

应用程序现已停止。

添加 CloudWatch 日志选项

您可以使用Amazon CLI向您的应用程序添加 Amazon CloudWatch 日志流。有关在应用程序中使用 CloudWatch 日志的信息,请参阅设置应用程序日志

更新环境属性

在本节中,您使用 UpdateApplication 操作更改应用程序的环境属性,而无需重新编译应用程序代码。在此示例中,您将更改源存储桶和目标直播的区域。

更新应用程序的环境属性
  1. 将以下 JSON 代码保存到名为 update_properties_request.json 的文件中。

    {"ApplicationName": "s3_sink", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "s3.sink.path" : "s3a://ka-app-code-<username>/data" } } ] } } }
  2. 使用前面的请求执行 UpdateApplication 操作以更新环境属性:

    aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json

更新应用程序代码

当您需要使用新版本的代码包更新应用程序代码时,可以使用 UpdateApplicationCLI 操作。

注意

要加载具有相同文件名的新版本的应用程序代码,必须指定新的对象版本。有关使用 Amazon S3 对象版本的更多信息,请参阅启用或禁用版本控制

要使用Amazon CLI,请从 Amazon S3 存储桶中删除您之前的代码包,上传新版本,然后调用UpdateApplication,指定相同的 Amazon S3 存储桶和对象名称以及新的对象版本。应用程序将使用新的代码包重新启动。

以下示例 UpdateApplication 操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId 更新为当前的应用程序版本。您可以使用 ListApplicationsDescribeApplication 操作检查当前的应用程序版本。<username>使用您在部分中选择的后缀更新存储创建相关资源段名称后缀 ()。

{ "ApplicationName": "s3_sink", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-username", "FileKeyUpdate": "s3-sink-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }

清理 Amazon 资源

本节包含清理在滚动窗口教程中创建的 Amazon 资源的过程。

删除 Kinesis Data Analytics

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在 Kinesis Data Analytics 面板中,选择MyApplication

  3. 在应用程序的页面中,选择 Delete (删除),然后确认删除。

删除 Kinesis Data Streams

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Kinesis Data Streams 面板中,选择ExampleInputStream

  3. ExampleInputStream页面中,选择 “删除 Kinesis Stream”,然后确认删除。

  4. Kinesis stream s 页面中 ExampleOutputStream,选择,选择操作,选择删除,然后确认删除。

删除您的Amazon S3 对象和存储桶

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择 ka-app-code- 存储桶。 <username>

  3. 选择 Delete (删除),然后输入存储桶名称以确认删除。

删除您的 IAM 资源

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择 kinesis-analytics-service-MyApplication- <your-region>策略。

  5. 选择 Policy Actions (策略操作),然后选择 Delete (删除)

  6. 在导航栏中,选择 Roles(角色)

  7. 选择 k inesis-analyticsMyApplication- <your-region>角色。

  8. 选择 Delete role (删除角色),然后确认删除。

删除您的 CloudWatch 资源

  1. 通过 https://console.aws.amazon.com/cloudwatch/ 打开 CloudWatch 主机。

  2. 在导航栏中,选择 Logs (日志)

  3. 选择 /aws/kinesis-analytics/MyApplication 日志组。

  4. 选择 Delete Log Group (删除日志组),然后确认删除。