对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 1:准备
为本练习创建 Amazon Kinesis Data Analytics 应用程序,创建 Kinesis Data Analytics 应用程序。将其中一个流配置为应用程序的流媒体源,将另一个流配置为 Kinesis Data Analytics 保存应用程序输出的目的地。
步骤 1.1:创建输入和输出数据流
在本节中,您将创建两个 Kinesis 直播:ExampleInputStream和ExampleOutputStream。您可以使用 Amazon Web Services Management Console或 Amazon CLI 创建这些流。
-
要使用 控制台
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
选择创建数据流。创建带有一个名为
ExampleInputStream的分片的流。有关更多信息,请参阅 Amazon Kinesis D ata Stre ams 开发人员指南的。 -
重复上一步骤以创建带有一个名为
ExampleOutputStream的分片的流。
-
使用 Amazon CLI
-
使用以下 Kinesis
create-streamAmazon CLI 命令创建第一个数据流 (ExampleInputStream)。$ aws kinesis create-stream \ --stream-nameExampleInputStream\ --shard-count 1 \ --region us-east-1 \ --profile adminuser -
运行同一命令,同时将流名称更改为
ExampleOutputStream。此命令创建应用程序用来写入输出的第二个流。
-
步骤 1.2:将示例记录写入输入流
在此步骤中,您运行 Python 代码以持续生成示例记录,并将这些记录写入 ExampleInputStream 流。
{"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}
-
安装 Python 和
pip。有关安装 Python 的信息,请访问 Python
网站。 您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装
。 -
运行以下 Python 代码。代码中的
put-record命令将 JSON 记录写入到流。from enum import Enum import json import random import boto3 STREAM_NAME = 'ExampleInputStream' class RateType(Enum): normal = 'NORMAL' high = 'HIGH' def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {'heartRate': rate, 'rateType': rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))