示例:向 Kinesis Data Analytics 应用程序添加参考数据 - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:向 Kinesis Data Analytics 应用程序添加参考数据

在本练习中,您将参考数据添加到现有的 Amazon Kinesis 数据分析应用程序。有关引用数据的信息,请参阅以下主题:

在本练习中,您将参考数据添加到在 Kinesis Data Analytics 入门练习中创建的应用程序中。引用数据提供每个股票代号对应的公司名称;例如:

Ticker, Company AMZN,Amazon ASD, SomeCompanyA MMB, SomeCompanyB WAS, SomeCompanyC

首先,请完成入门练习中的步骤,以创建一个启动应用程序。然后,执行以下步骤进行设置,并将引用数据添加到应用程序:

  1. 准备数据

    • 将上述参考数据作为对象存储在 Amazon Simple Storage Service (Amazon S3) 中。

    • 创建一个让 Kinesis Data Analytics 可代入以代表您读取 Amazon S3 对象的 IAM 角色。

  2. 将引用数据源添加到您的应用程序。

    Kinesis Data Analytics 读取 Amazon S3 对象并创建应用程序内部参考表,您可以在应用程序代码中查询该参考表。

  3. 测试代码。

    在应用程序代码中,将编写一个联接查询来联接应用程序内部流和应用程序内部引用表,从而获取每个股票代码的对应公司名称。

步骤 1:准备

在本节中,您将示例参考数据作为对象存储在 Amazon S3 存储桶中。您还创建可由 Kinesis Data Analytics 代入以代表您读取对象的 IAM 角色。

将参考数据存储为 Amazon S3 对象

在此步骤中,您将示例参考数据存储为 Amazon S3 对象。

  1. 打开文本编辑器,添加以下数据,然后将文件另存为 TickerReference.csv

    Ticker, Company AMZN,Amazon ASD, SomeCompanyA MMB, SomeCompanyB WAS, SomeCompanyC

  2. TickerReference.csv 文件上传到 S3 存储桶。有关说明,请参阅 Amazon Simple Storage Service 用户指南中的将对象上传到 Amazon S3。

创建 IAM 角色

接下来,创建一个 Kinesis Data Analytics 可以代入的 IAM 角色并读取 Amazon S3 对象。

  1. 在Amazon Identity and Access Management (IAM) 中,创建一个名为的 IAM 角色KinesisAnalytics-ReadS3Object。要创建角色,请按照 IAM 用户指南为亚马逊服务创建角色 (Amazon Web Services Management Console) 中的说明进行操作。

    在 IAM 控制台上,指定以下内容:

    • 对于 “选择角色类型”,选择Amazon Lambda。创建角色后,您将更改信任策略以允许 Kinesis Data Analytics(不是Amazon Lambda)担任该角色。

    • 不要在 Attach Policy 页面上附加任何策略。

  2. 更新 IAM 角色策略:

    1. 在 IAM 控制台上,选择您创建的角色。

    2. 在 “信任关系” 选项卡上,更新信任策略以授予 Kinesis Data Analytics 担任该角色的权限。下面显示了信任策略:

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "kinesisanalytics.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

    3. 在 “权限” 选项卡上,附加名为 AmazonS3 的亚马逊托管政策ReadOnlyAccess。这向角色授予对 Amazon S3 对象的读取权限。下面显示了此策略:

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:Get*", "s3:List*" ], "Resource": "*" } ] }

步骤 2:将引用数据源添加到应用程序配置中

在该步骤中,将引用数据源添加到应用程序配置中。要开始操作,需要以下信息:

  • S3 存储桶名称和对象键名称

  • IAM 角色的 Amazon Resource ARN ame (

  1. 在应用程序主页面中,选择 Connect reference data (连接引用数据)

  2. Connect 参考数据源页面中,选择包含您的参考数据对象的 Amazon S3 存储桶,然后输入该对象的密钥名称。

  3. 输入CompanyName应用程序内参考表名称

  4. 在 “访问所选资源” 部分,选择 “从 Kinesis Analytics 可以担任的 IAM 角色中选择”,然后选择您在上一节中创建的 KinesisAnalytics-reads3Objec t IAM 角色。

  5. 选择 Discover schema (发现架构)。控制台检测到引用数据中的两列。

  6. 选择 Save and close

步骤 3:测试:查询应用程序内部引用表

现在,您可以查询应用程序内部引用表 CompanyName。您可以联接股票价格数据和引用表以使用引用信息扩充应用程序。结果显示公司名称。

  1. 用以下内容替换您的应用程序代码。查询会联接应用程序内部输入流和应用程序内部引用表。应用程序代码将结果写入到另一应用程序内部流 DESTINATION_SQL_STREAM

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, "c"."Company", sector, change, price FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
  2. 验证 SQLResults 选项卡中是否会显示应用程序输出。确保某些行显示公司名称 (示例引用数据不包含所有公司名称)。