本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
教程:使用 Kinesis Data Analytics 应用程序将数据从 MSK 集群中的一个主题复制到 VPC 中的另一个主题
以下教程演示了如何创建带有 Amazon MSK 集群和两个主题的 Amazon VPC,以及如何创建从一个 Amazon MSK 主题读取并写入另一个主题的 Kinesis Data Analytics 应用程序。
注意
要为本练习设置所需的先决条件,请先完成入门指南 (DataStream API)练习。
本教程包含以下部分:
使用 Amazon MSVPC 集群创建一个 Amazon MSK 集群
要创建示例 VPC 和 Amazon MSK 集群以从 Kinesis Data Analytics 应用程序进行访问,请按照亚马逊 MSK 入门教程进行操作。
在完成本教程时,请注意以下几点:
在步骤 3:创建主题中,重复该
kafka-topics.sh --create命令以创建名为的目标主题AWSKafkaTutorialTopicDestination:bin/kafka-topics.sh --create --zookeeperZooKeeperConnectionString--replication-factor 3 --partitions 1 --topic AmazonKafkaTutorialTopicDestination记录集群的引导服务器列表。您可以使用以下命令获取引导服务器列表(
ClusterArn替换为 MSK 集群的 ARN):aws kafka get-bootstrap-brokers --region us-west-2 --cluster-arnClusterArn{... "BootstrapBrokerStringTls": "b-2.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-1.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-3.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094" }按照教程中的步骤进行操作时,请务必在代码、命令和控制台条目中使用所选Amazon区域。
创建应用程序代码
在本节中,您下载并编译应用程序 JAR 文件。我们建议使用 Java 11。
此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples应用程序代码位于
amazon-kinesis-data-analytics-java-examples/KafkaConnectors/KafkaGettingStartedJob.java文件中。您可以检查代码以熟悉 Kinesis Data Analytics 应用程序代码的结构。使用命令行 Maven 工具或首选的开发环境以创建 JAR 文件。要使用命令行 Maven 工具编译 JAR 文件,请输入以下内容:
mvn package -Dflink.version=1.15.3如果构建成功,则会创建以下文件:
target/KafkaGettingStartedJob-1.0.jar注意
提供的源代码依赖于 Java 11 中的库。如果你使用的是开发环境,
上传 Apache Flink 流式处理 Java 代码
在本节中,您将应用程序代码上传到您在入门指南 (DataStream API)教程中创建的 Amazon S3 存储桶。
注意
如果您从入门教程中删除了 Amazon S3 存储桶,请再次执行该上传 Apache Flink 流式处理 Java 代码步骤。
-
在 Amazon S3 控制台中,选择 ka-app-code-
<username>存储桶,然后选择上传。 -
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
KafkaGettingStartedJob-1.0.jar文件。 您无需更改该对象的任何设置,因此,请选择 Upload (上传)。
您的应用程序代码现在存储在 Amazon S3 桶中,您的应用程序可以在其中访问。
创建 应用程序
打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
在 Amazon Kinesis Data Analytics 控制面板上,选择创建分析应用程序。
-
在 Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 Application name (应用程序名称),输入
MyApplication。 -
对于运行时,选择 Apache Flink 版本 1.15.2。
-
-
对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。 -
选择创建应用程序。
注意
当您使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-west-2 -
角色:
kinesis-analytics-MyApplication-us-west-2
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 Configure application (配置应用程序) 页面上,提供 Code location (代码位置):
-
对于 Amazon S3 存储桶,输入
ka-app-code-。<username> -
在 Amazon S3 对象的路径中,输入
KafkaGettingStartedJob-1.0.jar。
-
-
在 Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。注意
当您使用控制台(例如 CloudWatch 日志或 Amazon VPC)指定应用程序资源时,控制台会修改您的应用程序执行角色以授予访问这些资源的权限。
-
在 Properties (属性) 下面,选择 Add Group (添加组)。输入以下属性:
组 ID 键 值 KafkaSourcetopic AmazonKafkaTutorialTopic KafkaSourcebootstrap.servers 您以前保存的引导服务器列表KafkaSourcesecurity.protocol SSL KafkaSourcessl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts KafkaSourcessl.truststore.password changeit 注意
默认证书的 ssl.truststore.password 为“changeit”;如果使用默认证书,则不需要更改该值。
再次选择 Add Group (添加组)。输入以下属性:
组 ID 键 值 KafkaSinktopic AmazonKafkaTutorialTopicDestination KafkaSinkbootstrap.servers 您以前保存的引导服务器列表KafkaSinksecurity.protocol SSL KafkaSinkssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts KafkaSinkssl.truststore.password changeit KafkaSink交易.timeout.ms 1000 应用程序代码读取上述应用程序属性以配置用于与您的 VPC 和 Amazon MSK 集群交互的源和接收器。有关使用属性的更多信息,请参阅运行时属性。
-
在 Snapshots (快照) 下面,选择 Disable (禁用)。这样,就可以轻松更新应用程序,而无需加载无效的应用程序状态数据。
-
在 Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)。
-
要进行CloudWatch 记录,请选中 “启用” 复选框。
-
在 Virtual Private Cloud (VPC) 部分中,选择要与应用程序关联的 VPC。选择与您的 VPC 关联的子网和安全组,您希望应用程序使用它们访问 VPC 资源。
-
选择更新。
注意
当您选择启用 CloudWatch 日志记录时,Kinesis Data Analytics 会为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
该日志流用于监控应用程序。
运行应用程序
可以通过运行应用程序、打开 Apache Flink 仪表板并选择所需的 Flink 作业来查看 Flink 任务图。
测试应用程序
在本节中,您将记录写入到源主题。应用程序从源主题中读取记录,并将其写入到目标主题中。您可以将记录写入到源主题以及从目标主题中读取记录,以验证应用程序是否正常工作。
要写入和读取主题中的记录,请按照 Amazon MSK 入门教程中的步骤 6:生成和使用数据中的步骤进行操作。
要从目标主题中读取,请在到集群的第二个连接中使用目标主题名称,而不是源主题:
bin/kafka-console-consumer.sh --bootstrap-serverBootstrapBrokerString--consumer.config client.properties --topic AmazonKafkaTutorialTopicDestination --from-beginning
如果在目标主题中没有任何记录,请参阅问题排查主题中的无法访问 VPC 中的资源一节。