本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 3:创建并运行 Kinesis Data Analytics 应用程序
在本练习中,您将创建一个以数据流作为源和汇点的 Kinesis Data Analytics 应用程序。
本节包含以下步骤:
创建两个Amazon Kinesis Data Streams
在为本练习创建 Kinesis Data Analytics 应用程序之前,请创建两个 Kinesis 数据流(ExampleInputStream和ExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。
您可以使用 Amazon Kinesis 控制台或以下Amazon CLI命令创建这些直播。有关控制台的说明,请参阅 Amazon Kinesis 数据流开发者指南中的创建和更新数据流。
创建数据流 (Amazon CLI)
-
要创建第一个直播 (
ExampleInputStream),请使用以下 Amazon Kinesiscreate-streamAmazon CLI 命令。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser -
要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为
ExampleOutputStream)。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
将示例记录写入输入流
在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。
注意
此部分需要 Amazon SDK for Python (Boto)
-
使用以下内容创建名为
stock.py的文件:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis')) -
在本教程的后面部分,您运行
stock.py脚本,以将数据发送到应用程序。$ python stock.py
下载并检查 Apache Flink 流式处理 Java 代码
此示例的 Java 应用程序代码可从以下网址获得 GitHub。要下载应用程序代码,请执行以下操作:
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples导航到
amazon-kinesis-data-analytics-java-examples/GettingStarted_1_6目录。
请注意有关应用程序代码的以下信息:
项目对象模型 (pom.xml)
文件包含有关应用程序配置和依赖关系的信息,包括适用于 Apache Flink 的 Kinesis Data Analytics 库。 BasicStreamingJob.java文件包含定义应用程序功能的main方法。该应用程序使用 Kinesis 源从源码流中读取数据。以下片段创建了 Kinesis 源代码:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));您的应用程序使用
StreamExecutionEnvironment对象创建源和接收连接器以访问外部资源。该应用程序将使用静态属性创建源和接收连接器。要使用动态应用程序属性,请使用
createSourceFromApplicationProperties和createSinkFromApplicationProperties方法以创建连接器。这些方法读取应用程序的属性来配置连接器。有关运行时属性的更多信息,请参阅运行时属性。
编译应用程序代码
在本节中,您使用 Apache Maven 编译器创建应用程序的 Java 代码。有关安装 Apache Maven 和 Java 开发工具包 (JDK) 的信息,请参阅完成练习的先决条件。
注意
要在 1.11 之前的 Apache Flink 版本中使用 Kinesis 连接器,你需要下载连接器的源代码并按照 Apache Flink 文档
编译应用程序代码
-
要使用您的应用程序代码,您将其编译和打包成 JAR 文件。您可以通过两种方式之一编译和打包您的代码:
使用命令行 Maven 工具。在包含
pom.xml文件的目录中通过运行以下命令创建您的 JAR 文件:mvn package注意
对于 Kinesis Data Analytics Runtime 版本 1.0.1,不需要-dflink.Version 参数;只有版本 1.1.0 及更高版本才需要该参数。有关更多信息,请参阅指定应用程序的 Apache Flink 版本:
设置开发环境。有关详细信息,请参阅您的开发环境文档。
您可以作为 JAR 文件上传您的包,也可以将包压缩为 ZIP 文件并上传。如果您使用 Amazon CLI 创建应用程序,您可以指定您的代码内容类型(JAR 或 ZIP)。
-
如果编译时出错,请验证
JAVA_HOME环境变量设置正确。
如果应用程序成功编译,则创建以下文件:
target/aws-kinesis-analytics-java-apps-1.0.jar
上传 Apache Flink 流式处理 Java 代码
在此部分,您创建了一个Amazon Smple Simple Storage Service (Amazon S3) 存储桶,并上传了应用程序代码。
上传应用程序代码
通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/
。 -
选择 Create bucket (创建存储桶)。
-
ka-app-code-在存储段名称字段中输入。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择下一步。<username> -
在配置选项步骤中,让设置保持原样,然后选择下一步。
-
在设置权限步骤中,让设置保持原样,然后选择下一步。
-
选择创建桶。
-
在 Amazon S3 控制台中,选择 ka-app-code-
<username>存储桶,然后选择上传。 -
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
aws-kinesis-analytics-java-apps-1.0.jar文件。选择下一步。 -
在设置权限步骤中,让设置保持原样。选择下一步。
-
在设置属性步骤中,让设置保持原样。请选择 Upload(上传)。
您的应用程序代码现在存储在 Amazon S3 存储桶中,您的应用程序可以在其中访问它。
创建并运行 Kinesis Data Analytics 应用程序
您可以使用控制台或创建和运行 Kinesis Data Analytics 应用程序Amazon CLI。
注意
当您使用控制台创建应用程序时,会为您创建Amazon Identity and Access Management (IAM) 和 A CloudWatch mazon Logs 资源。当您使用 Amazon CLI 创建应用程序时,您可以单独创建这些资源。
创建并运行应用程序(控制台)
按照以下步骤,使用控制台创建、配置、更新和运行应用程序。
创建 应用程序
打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
在 Amazon Kinesis Data Analytics 控制面板上,选择创建分析应用程序。
-
在 Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 Application name (应用程序名称),输入
MyApplication。 -
对于描述,输入
My java test app。 -
对于 Runtime (运行时),请选择 Apache Flink。
注意
Kinesis Data Analytics 使用 Apache Flink 版本 1.8.2 或 1.6.2。
将版本下拉列表更改为 Apache Flink 1.6。
-
-
对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。 -
选择创建应用程序。
注意
当您使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-west-2 -
角色:
kinesis-analytics-MyApplication-us-west-2
编辑 IAM 策略
编辑 IAM 策略以添加访问 Kinesis 数据流的权限。
通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/
。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-west-2策略。 -
在 Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901) 替换为您的账户 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:] }012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 Configure application (配置应用程序) 页面上,提供 Code location (代码位置):
-
对于 Amazon S3 存储桶,输入
ka-app-code-。<username> -
在 Amazon S3 对象的路径中,输入
java-getting-started-1.0.jar。
-
-
在 Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。 -
输入以下应用程序属性和值:
组 ID 键 值 ProducerConfigPropertiesflink.inputstream.initposLATESTProducerConfigPropertiesaws.regionus-west-2ProducerConfigPropertiesAggregationEnabledfalse -
在 Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)。
-
要进行CloudWatch 记录,请选中 “启用” 复选框。
-
选择更新。
注意
当您选择启用亚马逊 CloudWatch 日志记录时,Kinesis Data Analytics 会为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
运行应用程序
-
在MyApplication页面上,选择 “运行”。确认该操作。
-
当应用程序正在运行时,请刷新页面。控制台将显示 Application graph (应用程序图表)。
停止应用程序
在MyApplication页面上,选择 “停止”。确认该操作。
更新应用程序
使用控制台,您可以更新应用程序设置,例如应用程序属性、监控设置,或应用程序 JAR 文件的位置和文件名。如果您需要更新应用程序代码,也可以从 Amazon S3 存储桶重新加载应用程序 JAR。
在MyApplication页面上,选择配置。更新应用程序设置,然后选择更新。
创建并运行应用程序 (Amazon CLI)
在本节中,您将使用创建和运行 Kinesis Data Analytics 应用程序。Amazon CLI适用于 Apache Flink 的 Kinesis Data Analytics 使用该kinesisanalyticsv2Amazon CLI命令创建 Kinesis Data Analytics 应用程序并与之交互。
创建权限策略
首先,使用两个语句创建权限策略:一个语句授予对源流执行 read 操作的权限,另一个语句授予对接收器流执行 write 操作的权限。然后,您将策略附加到 IAM 角色(您在下一节中创建)。因此,当 Kinesis Data Analytics 担任该角色时,该服务具有从源流读取和写入接收流的必要权限。
使用以下代码创建 KAReadSourceStreamWriteSinkStream 权限策略。替换为用于创建 Amazon S3 存储桶以存储应用程序代码的用户名。将 Amazon 资源名称 (ARN) 中的账户 ID (username) 替换为您的账户 ID。012345678901
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }
有关创建权限策略的 step-by-step 说明,请参阅 IAM 用户指南中的教程:创建并附加您的第一个客户托管策略。
注意
要访问其他亚马逊服务,您可以使用Amazon SDK for Java。Kinesis Data Analytics 会自动将软件开发工具包所需的证书设置为与您的应用程序关联的服务执行 IAM 角色的证书。无需执行其他步骤。
创建 IAM 角色
在本节中,您将创建一个 IAM 角色,Kinesis Data Analytics 应用程序可以假定该角色来读取源流并写入接收流。
未经许可,Kinesis Data Analytics 无法访问您的直播。您可以通过 IAM 角色授予这些权限。每个 IAM 角色都附加了两个策略。信任策略授予 Kinesis Data Analytics a Kinesis Data Analytics,权限策略决定了
您将在上一部分中创建的权限策略附加到此角色。
创建 IAM 角色
通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/
。 -
在导航窗格中,选择 Roles (角色) 和 Create Role (创建角色)。
-
在 “选择可信身份类型” 下,选择 “Amazon服务”。在 Choose the service that will use this role (选择将使用此角色的服务) 下,选择 Kinesis。在选择您的使用案例下,选择 Kinesis Analytics。
选择Next: Permissions(下一步: 权限)。
-
在 Attach permissions policies 页面上,选择 Next: Review。在创建角色后,您可以附加权限策略。
-
在创建角色页面上
KA-stream-rw-role,输入角色名称。选择 Create role(创建角色)。现在,您已经创建了一个名为的新 IAM 角色
KA-stream-rw-role。接下来,您更新角色的信任和权限策略。 -
将权限策略附加到角色。
注意
在本练习中,Kinesis Data Analytics 扮演这个角色,既可以从 Kinesis 数据流(源)读取数据,也可以将输出写入另一个 Kinesis 数据流。因此,您附加在上一步(创建权限策略)中创建的策略。
-
在 Summary (摘要) 页上,选择 Permissions (权限) 选项卡。
-
选择附加策略。
-
在搜索框中,输入
KAReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。 -
选择 KAReadSourceStreamWriteSinkStream 策略,然后选择 “附加策略”。
-
现在,您已经创建了应用程序用来访问资源的服务执行角色。记下新角色的 ARN。
有关创建角色的 step-by-step 说明,请参阅 IAM 用户指南中的创建 IAM 角色(控制台)。
Kinesis Data Analytics
-
将以下 JSON 代码保存到名为
create_request.json的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀 () 替换为在前一部分中选择的后缀。将服务执行角色中的示例账户 ID (username) 替换为您的账户 ID。012345678901{ "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } } -
使用上述请求执行
CreateApplication操作来创建应用程序:aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
应用程序现已创建。您在下一步中启动应用程序。
启动应用程序
在本节中,您使用 StartApplication 操作来启动应用程序。
启动应用程序
-
将以下 JSON 代码保存到名为
start_request.json的文件中。{ "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } } -
使用上述请求执行
StartApplication操作来启动应用程序:aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看 Kinesis Data Analytics 指标,以验证应用程序是否正常运行。
停止应用程序
在本节中,您使用 StopApplication 操作来停止应用程序。
停止应用程序
-
将以下 JSON 代码保存到名为
stop_request.json的文件中。{ "ApplicationName": "test" } -
使用下面的请求执行
StopApplication操作来停止应用程序:aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
应用程序现已停止。
添加 CloudWatch 日志选项
您可以使用Amazon CLI向您的应用程序添加 Amazon CloudWatch 日志流。有关在应用程序中使用 CloudWatch Logs 的信息,请参阅设置应用程序日志记录。
更新环境属性
在本节中,您使用 UpdateApplication 操作更改应用程序的环境属性,而无需重新编译应用程序代码。在此示例中,您更改了源流和目标流的区域。
更新应用程序的环境属性
-
将以下 JSON 代码保存到名为
update_properties_request.json的文件中。{"ApplicationName": "test", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } } -
使用前面的请求执行
UpdateApplication操作以更新环境属性:aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json
更新应用程序代码
在您需要使用新版本的代码包更新应用程序代码时,您可以使用 UpdateApplication Amazon CLI 操作。
要使用Amazon CLI,请从 Amazon S3 存储桶中删除之前的代码包,上传新版本,然后调用UpdateApplication,指定相同的 Amazon S3 存储桶和对象名称。应用程序将使用新的代码包重新启动。
以下示例 UpdateApplication 操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId 更新为当前的应用程序版本。您可以使用 ListApplications 或 DescribeApplication 操作检查当前的应用程序版本。将存储桶名称后缀 (<username>) 更新为在创建两个Amazon Kinesis Data Streams一节中选择的后缀。
{ "ApplicationName": "test", "CurrentApplicationVersionId":1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-username", "FileKeyUpdate": "java-getting-started-1.0.jar" } } } } }