本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:从不同账户的 Kinesis 流中读取
此示例演示如何创建一个 Amazon Kinesis Data Analytics 应用程序,该应用程序从不同账户的 Kinesis 直播中读取数据。在此示例中,您将为源 Kinesis 直播使用一个帐户,为 Kinesis Data Analytics 应用程序和接收器 Kinesis 直播使用第二个帐户。
先决条件
在本教程中,您将修改入门示例以读取其他账户中的 Kinesis 直播中的数据。在继续之前,请完成入门指南 (DataStream API)教程。
您需要使用两个 Amazon 账户以完成本教程:一个账户用于源流,另一个账户用于应用程序和接收器流。将您用于入门教程的 Amazon 账户用于应用程序和接收器流。将一个不同的 Amazon 账户用于源流。
设置
您将使用命名的配置文件访问两个 Amazon 账户。修改您的Amazon凭证和配置文件,使其包含两个配置文件,其中包含两个账户的区域和连接信息。
以下示例凭证文件包含两个命名的配置文件:ka-source-stream-account-profile 和 ka-sink-stream-account-profile。将您用于入门教程的账户作为接收器流账户。
[ka-source-stream-account-profile] aws_access_key_id=AKIAIOSFODNN7EXAMPLE aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY [ka-sink-stream-account-profile] aws_access_key_id=AKIAI44QH8DHBEXAMPLE aws_secret_access_key=je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY
以下示例配置文件包含具有区域和输出格式信息的相同命名配置文件。
[profile ka-source-stream-account-profile] region=us-west-2 output=json [profile ka-sink-stream-account-profile] region=us-west-2 output=json
注意
本教程不使用 ka-sink-stream-account-profile。它是作为如何使用配置文件访问两个不同 Amazon 账户的示例提供的。
有关将命名配置文件与一起使用的更多信息Amazon CLI,请参阅Amazon Command Line Interface文档中的命名配置文件。
创建源 Kinesis 流
在本节中,您将在源账户中创建 Kinesis 直播。
输入以下命令以创建应用程序将用于输入的 Kinesis 流。请注意,--profile 参数指定要使用的账户配置文件。
$ aws kinesis create-stream \ --stream-name SourceAccountExampleInputStream \ --shard-count 1 \ --profile ka-source-stream-account-profile
创建和更新 IAM 角色和策略
要允许跨Amazon账户访问对象,您需要在源账户中创建 IAM 角色和策略。然后,您修改 sink 账户中的 IAM 策略。有关创建 IAM 角色和策略的信息,请参阅《Amazon Identity and Access Management用户指南》中的以下主题:
接收器账户角色和策略
编辑入门教程中的
kinesis-analytics-service-MyApplication-us-west-2策略。该策略允许担任源账户中的角色,以便读取源流。注意
当您使用控制台创建应用程序时,控制台会创建一个名为的策略和一个名
kinesis-analytics-service-为的角色<application name>-<application region>kinesis-analytics-。<application name>-<application region>将下面突出显示的部分添加到策略中。将示例账户 ID (
SOURCE01234567) 替换为将用于源流的账户的 ID。{ "Version": "2012-10-17", "Statement": [{ "Sid": "AssumeRoleInSourceAccount", "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "arn:aws:iam::SOURCE01234567:role/KA-Source-Stream-Role" },{ "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:SINK012345678:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] } ] }打开
kinesis-analytics-MyApplication-us-west-2角色,并记下其 Amazon 资源名称 (ARN)。您需要在下一节中使用该名称。角色 ARN 如下所示。arn:aws:iam::SINK012345678:role/service-role/kinesis-analytics-MyApplication-us-west-2
源账户角色和策略
在名为
KA-Source-Stream-Policy的源账户中创建一个策略。将以下 JSON 用于该策略。将示例账号替换为源账户的账号。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadInputStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-west-2:SOURCE123456784:stream/SourceAccountExampleInputStream" } ] }在名为
KA-Source-Stream-Role的源账户中创建一个角色。执行以下操作以使用 Kinesis Analytics 使用案例创建角色:在 IAM 管理控制台中,选择创建角色。
在 “创建角色” 页面上,选择 “Amazon服务”。在服务列表中,选择 Kinesis。
在 Select your use case (选择使用案例) 部分中,选择 Kinesis Analytics。
选择Next: Permissions(下一步: 权限)。
添加您在上一步中创建的
KA-Source-Stream-Policy权限策略。选择 Next:Tags (下一步: 标签)。选择 Next: Review(下一步: 审核)。
将角色命名为
KA-Source-Stream-Role。应用程序将使用该角色以访问源流。
将接收器账户中的
kinesis-analytics-MyApplication-us-west-2ARN 添加到源账户中的KA-Source-Stream-Role角色的信任关系中:KA-Source-Stream-Role在 IAM 控制台中打开。选择 Trust Relationships 选项卡。
选择 Edit trust relationship (编辑信任关系)。
将以下代码用于信任关系。将示例账户 ID (
) 替换为接收器账户 ID。SINK012345678{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam:::role/service-role/kinesis-analytics-MyApplication-us-west-2" }, "Action": "sts:AssumeRole" } ] }SINK012345678
更新 Python 脚本
在本节中,您更新生成示例数据的 Python 脚本以使用源账户配置文件。
使用以下突出显示的更改更新 stock.py 脚本。
import json import boto3 import random import datetimeimport os os.environ['AWS_PROFILE'] ='ka-source-stream-account-profile' os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'kinesis = boto3.client('kinesis') def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) kinesis.put_record( StreamName="SourceAccountExampleInputStream", Data=data, PartitionKey="partitionkey")
更新 Java 应用程序
在本节中,您更新 Java 应用程序代码,以便从源流中读取时担任源账户角色。
对 BasicStreamingJob.java 文件进行以下更改。将示例源账号 (SOURCE01234567) 替换为您的源账号。
package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;import java.io.IOException; import java.util.Map; import java.util.Properties; /** * A basic Kinesis Data Analytics for Java application with Kinesis data streams * as source and sink. */ public class BasicStreamingJob { private static final String region = "us-west-2"; private static final String inputStreamName ="SourceAccountExampleInputStream";private static final String outputStreamName = ExampleOutputStream;private static final String roleArn = "arn:aws:iam::SOURCE01234567:role/KA-Source-Stream-Role"; private static final String roleSessionName = "ksassumedrolesession";private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties();inputProperties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"); inputProperties.setProperty(AWSConfigConstants.AWS_ROLE_ARN, roleArn); inputProperties.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, roleSessionName);inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static KinesisStreamsSink<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(AWSConfigConstants.AWS_REGION, region); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputProperties.getProperty("OUTPUT_STREAM", "ExampleOutputStream")) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = createSourceFromStaticConfig(env); input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
构建、上传和运行应用程序
执行以下操作以更新和运行应用程序:
在具有
pom.xml文件的目录中运行以下命令,以再次构建应用程序。mvn package -Dflink.version=1.15.3从您的Amazon Simple Storage Service (Amazon S3) 存储桶删除之前的 JAR 文件,然后将新
aws-kinesis-analytics-java-apps-1.0.jar文件上传到 S3 存储桶。在 Kinesis Data Analytics 控制台的应用程序页面中,选择配置、更新以重新加载应用程序 JAR 文件。
运行
stock.py脚本以将数据发送到源流。python stock.py
现在,该应用程序从另一个账户的 Kinesis 流中读取数据。
您可以检查 ExampleOutputStream 流的 PutRecords.Bytes 指标,以验证应用程序是否正常工作。如果在输出流中具有活动,则应用程序正常工作。