本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
教程:在 Amazon MSK 上使用自定义信任库
以下教程演示了如何安全地连接(传输中加密)到 Kafka 集群,该集群使用自定义、私有甚至自托管的证书颁发机构 (CA) 颁发的服务器证书。
为了通过 TLS 将任何 Kafka 客户端安全地连接到 Kafka 集群,Kafka 客户端(例如示例 Flink 应用程序)必须信任 Kafka 集群的服务器证书(从发行 CA 到根级 CA)提供的完整信任链。作为自定义 Truststore 的示例,我们将使用启用双向 TLS (MTLS) 身份验证的 Amazon MSK 集群。这意味着 MSK 集群节点使用由Certificate Manager 私有Amazon证书颁发机构 (ACM Private CA) 颁发的服务器证书,该证书是您的账户和区域的私有证书,因此执行 Flink 应用程序的 Java 虚拟机 (JVM) 的默认信任库不受信任。
注意
-
密钥库用于存储应用程序应向服务器或客户端出示的私钥和身份证书以供验证。
-
Truststore 用于存储来自认证机构 (CA) 的证书,这些证书用于验证服务器在 SSL 连接中提供的证书。
您还可以使用本教程中的技术进行 Kinesis Data Analytics 应用程序与其他 Apache Kafka 源之间的交互,例如:
托管在Amazon(A mazon EC2 或亚马逊
EKS )中的自定义 Apache Kafka 集群 托管在 Confluent Kafka
集群Amazon 通过Amazon Direct Connect
或 VPN 访问的本地 Kafka 集群
您的应用程序将使用自定义序列化和反序列化架构来替代加载自定义信任库open的方法。这使信任库在应用程序重新启动或替换线程后可供应用程序使用。
使用以下代码检索和存储自定义信任库:
public static void initializeKafkaTruststore() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); URL inputUrl = classLoader.getResource("kafka.client.truststore.jks"); File dest = new File("/tmp/kafka.client.truststore.jks"); try { FileUtils.copyURLToFile(inputUrl, dest); } catch (Exception ex) { throw new FlinkRuntimeException("Failed to initialize Kakfa truststore", ex); } }
注意
Apache Flink 要求信任库采用 。
注意
要设置本练习所需的先决条件,请先完成入门指南 (DataStream API)练习。
本教程包含以下部分:
使用亚马逊 MSK 集群创建 VPC
要创建示例 VPC 和 Amazon MSK 集群以从 Kinesis Data Analytics 应用程序进行访问,请按照亚马逊 MSK 入门教程进行操作。
完成本教程时,还要执行以下操作:
在步骤 3:创建主题中,重复该
kafka-topics.sh --create命令以创建名为的目标主题AmazonKafkaTutorialTopicDestination:bin/kafka-topics.sh --create --bootstrap-serverZooKeeperConnectionString--replication-factor 3 --partitions 1 --topic AWSKafkaTutorialTopicDestination注意
如果
kafka-topics.sh命令返回ZooKeeperClientTimeoutException,请验证 Kafka 集群的安全组是否具有允许来自客户端实例私有 IP 地址的所有流量的入站规则。记录集群的引导服务器列表。您可以使用以下命令获取引导服务器列表(
ClusterArn替换为 MSK 集群的 ARN):aws kafka get-bootstrap-brokers --region us-west-2 --cluster-arnClusterArn{... "BootstrapBrokerStringTls": "b-2.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-1.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-3.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094" }在按照本教程和先决条件教程中的步骤进行操作时,请务必在代码、命令和控制台条目中使用所选Amazon区域。
创建自定义信任库并将其应用于您的集群
在本节中,您将创建自定义证书颁发机构 (CA),使用它生成自定义信任库,并将其应用于您的 MSK 集群。
要创建和应用您的自定义信任库,请按照适用于 A pache Kafka 的 Amazon Managed Streaming Streaming 开发者指南中的客户端身份验证教程进行操作。
创建应用程序代码
在本节中,您将下载并编译应用程序 JAR 文件。
此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples应用程序代码位于
amazon-kinesis-data-analytics-java-examples/CustomKeystore。你可以检查代码,熟悉适用于 Apache FlAmazon Kinesis Data Analytics 代码的结构。使用命令行 Maven 工具或首选开发环境创建 JAR 文件。要使用命令行 Maven 工具编译 JAR 文件,请输入以下内容:
mvn package -Dflink.version=1.15.3如果构建成功,则会创建以下文件:
target/flink-app-1.0-SNAPSHOT.jar注意
提供的源代码依赖于 Java 11 中的库。如果你使用的是开发环境,
上传 Apache Flink 流式处理 Java 代码
在本节中,您将应用程序代码上传到您入门指南 (DataStream API)在本教程中创建的 Amazon S3 存储桶。
注意
如果您从入门教程中删除了 Amazon S3 存储桶,请再次执行该上传 Apache Flink 流式处理 Java 代码步骤。
-
在 Amazon S3 控制台中,选择 ka-app-code-
<username>存储桶,然后选择上传。 -
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
flink-app-1.0-SNAPSHOT.jar文件。 您无需更改该对象的任何设置,因此,请选择 Upload (上传)。
您的应用程序代码现在存储在 Amazon S3 桶中,您的应用程序可以在其中访问。
创建 应用程序
打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
在 Amazon Kinesis Data Analytics 控制面板上,选择创建分析应用程序。
-
在 Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 Application name (应用程序名称),输入
MyApplication。 -
对于运行时,选择 Apache Flink 版本 1.15.2。
-
-
对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。 -
选择创建应用程序。
注意
当您使用控制台创建适用于 Apache Flink 的 Amazon Kinesis 数据分析时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-west-2 -
角色:
kinesis-analytics-MyApplication-us-west-2
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 Configure application (配置应用程序) 页面上,提供 Code location (代码位置):
-
对于 Amazon S3 存储桶,输入
ka-app-code-。<username> -
在 Amazon S3 对象的路径中,输入
flink-app-1.0-SNAPSHOT.jar。
-
-
在 Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。注意
当您使用控制台指定应用程序资源(例如日志或 VPC)时,控制台会修改您的应用程序执行角色以授予访问这些资源的权限。
-
在 Properties (属性) 下面,选择 Add Group (添加组)。输入以下属性:
组 ID 键 值 KafkaSourcetopic AmazonKafkaTutorialTopic KafkaSourcebootstrap.servers 您以前保存的引导服务器列表KafkaSourcesecurity.protocol SSL KafkaSourcessl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts KafkaSourcessl.truststore.password changeit 注意
默认证书的 ssl.truststore.pas sword 是 “更改它”,如果您使用的是默认证书,则无需更改此值。
再次选择 Add Group (添加组)。输入以下属性:
组 ID 键 值 KafkaSinktopic AmazonKafkaTutorialTopicDestination KafkaSinkbootstrap.servers 您以前保存的引导服务器列表KafkaSinksecurity.protocol SSL KafkaSinkssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts KafkaSinkssl.truststore.password changeit KafkaSink交易.timeout.ms 1000 应用程序代码读取上述应用程序属性以配置用于与您的 VPC 和 Amazon MSK 集群交互的源和接收器。有关使用属性的更多信息,请参阅运行时属性。
-
在 Snapshots (快照) 下面,选择 Disable (禁用)。这样,就可以轻松更新应用程序,而无需加载无效的应用程序状态数据。
-
在 Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)。
-
要进行CloudWatch 记录,请选中 “启用” 复选框。
-
在 Virtual Private Cloud (VPC) 部分中,选择要与应用程序关联的 VPC。选择与您的 VPC 关联的子网和安全组,您希望应用程序使用它们访问 VPC 资源。
-
选择更新。
注意
当您选择启用 CloudWatch 日志记录时,Kinesis Data Analytics 会为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
该日志流用于监控应用程序。
运行应用程序
可以通过运行应用程序、打开 Apache Flink 仪表板并选择所需的 Flink 作业来查看 Flink 任务图。
测试应用程序
在本节中,您将记录写入到源主题。应用程序从源主题中读取记录,并将其写入到目标主题中。您可以通过向源主题写入记录并从目标主题读取记录来验证应用程序是否正常运行。
要写入和读取主题中的记录,请按照 Amazon MSK 入门教程中的步骤 6:生成和使用数据中的步骤进行操作。
要从目标主题中读取,请在到集群的第二个连接中使用目标主题名称,而不是源主题:
bin/kafka-console-consumer.sh --bootstrap-serverBootstrapBrokerString--consumer.config client.properties --topic AmazonKafkaTutorialTopicDestination --from-beginning
如果在目标主题中没有任何记录,请参阅问题排查主题中的无法访问 VPC 中的资源一节。