步骤 1:准备数据 - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 1:准备数据

在为本示例创建 AAmazon Kinesis Data Analytics 应用程序之前,您需要创建一个 Kinesis 数据流用作应用程序的流媒体源。另外,您还需运行 Python 代码来将模拟血压数据写入流中。

步骤 1.1:创建 Kinesis Data 流

在本节中,您将创建名为的 Kinesis 数据流ExampleInputStream。您可以使用 Amazon Web Services Management Console或 Amazon CLI 创建该数据流。

  • 使用控制台:

    1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

    2. 在导航窗格中,选择 Data Streams (数据流)。然后选择创建 Kinesis 流

    3. 对于名称,请键入 ExampleInputStream。对于分片数,请键入 1

  • 或者,要使用 Amazon CLI 创建数据流,请运行以下命令:

    $ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1

步骤 1.2:将示例记录写入输入流

在此步骤中,您运行 Python 代码以不断生成示例记录并将其写入您创建的数据流中。

  1. 安装 Python 和 pip。

    有关安装 Python 的信息,请参阅 Python

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 文档中的安装

  2. 运行以下 Python 代码。可以将区域改为您要用于此示例的区域。代码中的 put-record 命令将 JSON 记录写入到流。

    from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class PressureType(Enum): low = 'LOW' normal = 'NORMAL' high = 'HIGH' def get_blood_pressure(pressure_type): pressure = {'BloodPressureLevel': pressure_type.value} if pressure_type == PressureType.low: pressure['Systolic'] = random.randint(50, 80) pressure['Diastolic'] = random.randint(30, 50) elif pressure_type == PressureType.normal: pressure['Systolic'] = random.randint(90, 120) pressure['Diastolic'] = random.randint(60, 80) elif pressure_type == PressureType.high: pressure['Systolic'] = random.randint(130, 200) pressure['Diastolic'] = random.randint(90, 150) else: raise TypeError return pressure def generate(stream_name, kinesis_client): while True: rnd = random.random() pressure_type = ( PressureType.low if rnd < 0.005 else PressureType.high if rnd > 0.995 else PressureType.normal) blood_pressure = get_blood_pressure(pressure_type) print(blood_pressure) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(blood_pressure), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
下一个步骤

步骤 2:创建分析应用程序