步骤 1:准备 - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 1:准备

为本练习创建 Amazon Kinesis Data Analytics 应用程序,创建 Kinesis Data Analytics 应用程序。将其中一个流配置为应用程序的流媒体源,将另一个流配置为 Kinesis Data Analytics 保存应用程序输出的目的地。

步骤 1.1:创建输入和输出数据流

在本节中,您将创建两个 Kinesis 直播:ExampleInputStreamExampleOutputStream。您可以使用 Amazon Web Services Management Console或 Amazon CLI 创建这些流。

  • 要使用 控制台
    1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

    2. 选择创建数据流。创建带有一个名为 ExampleInputStream 的分片的流。有关更多信息,请参阅 Amazon Kinesis D ata Stre ams 开发人员指南的。

    3. 重复上一步骤以创建带有一个名为 ExampleOutputStream 的分片的流。

  • 使用 Amazon CLI
    1. 使用以下 Kinesiscreate-streamAmazon CLI 命令创建第一个数据流 (ExampleInputStream)。

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser
    2. 运行同一命令,同时将流名称更改为 ExampleOutputStream。此命令创建应用程序用来写入输出的第二个流。

步骤 1.2:将示例记录写入输入流

在此步骤中,您运行 Python 代码以持续生成示例记录,并将这些记录写入 ExampleInputStream 流。

{"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}
  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。代码中的 put-record 命令将 JSON 记录写入到流。

    from enum import Enum import json import random import boto3 STREAM_NAME = 'ExampleInputStream' class RateType(Enum): normal = 'NORMAL' high = 'HIGH' def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {'heartRate': rate, 'rateType': rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

下一个步骤

步骤 2:创建应用程序