本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建并运行应用程序 (CLI)
在本节中,您将使用创建和运行 Kinesis Data Analytics 应用程序。Amazon Command Line Interface使用 k inesisanalyticsv2Amazon CLI 命令创建 Kinesis Data Analytics 应用程序并与之交互。
创建权限策略
注意
您必须为应用程序创建一个权限策略和角色。如果未创建这些 IAM 资源,应用程序将无法访问其数据和日志流。
首先,您创建一个包含两个语句的权限策略:一个语句授予对源流进行读取操作的权限,另一个语句授予对接收流进行写入操作的权限。然后,您将策略附加到 IAM 角色(您在下一节中创建)。因此,当 Kinesis Data Analytics 担任该角色时,该服务具有从源流读取和写入接收流的必要权限。
使用以下代码创建 KAReadSourceStreamWriteSinkStream 权限策略。username替换为用于创建 Amazon S3 存储桶以存储应用程序代码的用户名。将亚马逊资源名称 (ARN) 中的账户 ID 替换(012345678901)为您的账户 ID。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }
有关创建权限策略的 step-by-step 说明,请参阅 IAM 用户指南中的教程:创建并附加您的第一个客户托管策略。
创建 IAM policy
在本节中,您将创建一个 IAM 角色,Kinesis Data Analytics 应用程序可以假定该角色来读取源流并写入接收流。
未经许可,Kinesis Data Analytics 无法访问您的直播。您可以通过 IAM 角色授予这些权限。每个 IAM 角色都附加了两个策略。信任策略授予 Kinesis Data Analytics 代入角色的权限,权限策略决定了Kinesis Data Analytics 代入角色后可以执行的操作。
您将在上一部分中创建的权限策略附加到此角色。
创建 IAM 角色
通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/
。 在导航窗格中,选择角色,然后选择创建角色。
在 “选择可信身份类型” 下,选择 “Amazon服务”
在 Choose the service that will use this role (选择将使用此角色的服务) 下,选择 Kinesis。
在选择您的使用案例下,选择 Kinesis Analytics。
选择Next: Permissions(下一步: 权限)。
在 Attach permissions policies 页面上,选择 Next: Review。在创建角色后,您可以附加权限策略。
在创建角色页面上
KA-stream-rw-role,输入角色名称。选择 Create role(创建角色)。现在,您已经创建了一个名为的新 IAM 角色
KA-stream-rw-role。接下来,更新角色的信任和权限策略将权限策略附加到角色。
注意
在本练习中,Kinesis Data Analytics 扮演这个角色,既可以从 Kinesis 数据流(源)读取数据,也可以将输出写入另一个 Kinesis 数据流。因此,您可以附加在上一步中创建一个 “创建权限策略” 中创建的策略。
在 Summary (摘要) 页上,选择 Permissions (权限) 选项卡。
选择附加策略。
在搜索框中,输入
KAReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。选择
KAReadSourceStreamWriteSinkStream策略,然后选择附加策略。
现在,您已经创建了应用程序用来访问资源的服务执行角色。记下新角色的 ARN。
有关创建角色的 step-by-step 说明,请参阅 IAM 用户指南中的创建 IAM 角色(控制台)。
创建应用程序
将以下 JSON 代码保存到名为 create_request.json 的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀(用户名)替换为您在上一节中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。
{ "ApplicationName": "getting_started", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_15", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "getting-started-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }
CreateApplication使用以下请求执行以创建应用程序:
aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
应用程序现已创建。您在下一步中启动应用程序。
启动应用程序
在本节中,您使用 StartApplication 操作来启动应用程序。
启动应用程序
将以下 JSON 代码保存到名为
start_request.json的文件中。{ "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }使用上述请求执行
StartApplication操作来启动应用程序:aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看 Kinesis Data Analytics 指标,以验证应用程序是否正常运行。
停止应用程序
在本节中,您使用 StopApplication 操作来停止应用程序。
停止应用程序
将以下 JSON 代码保存到名为
stop_request.json的文件中。{ "ApplicationName": "s3_sink" }使用上述请求执行
StopApplication操作以停止应用程序:aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
应用程序现已停止。
添加 CloudWatch 日志选项
您可以使用Amazon CLI向您的应用程序添加 Amazon CloudWatch 日志流。有关在应用程序中使用 CloudWatch 日志的信息,请参阅设置应用程序日志。
更新环境属性
在本节中,您使用 UpdateApplication 操作更改应用程序的环境属性,而无需重新编译应用程序代码。在此示例中,您将更改源流和目标流的区域。
更新应用程序的环境属性
将以下 JSON 代码保存到名为
update_properties_request.json的文件中。{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }使用前面的请求执行
UpdateApplication操作以更新环境属性:aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json
更新应用程序代码
当您需要使用新版本的代码包更新应用程序代码时,可以使用 UpdateApplicationCLI 操作。
注意
要加载具有相同文件名的新版本的应用程序代码,必须指定新的对象版本。有关使用 Amazon S3 对象版本的更多信息,请参阅启用或禁用版本控制。
要使用Amazon CLI,请从 Amazon S3 存储桶中删除您之前的代码包,上传新版本,然后调用UpdateApplication,指定相同的 Amazon S3 存储桶和对象名称以及新的对象版本。应用程序将使用新的代码包重新启动。
以下示例 UpdateApplication 操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId 更新为当前的应用程序版本。您可以使用 ListApplications 或 DescribeApplication 操作检查当前的应用程序版本。<username>使用您在部分中选择的后缀更新存储创建相关资源段名称后缀 ()。
{{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>", "FileKeyUpdate": "getting-started-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }