步骤 1:创建输入和输出流 - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 1:创建输入和输出流

热点示例创建 Amazon Kinesis Data Analytics 应用程序,创建 Kinesis Data Analytics 应用程序。将其中一个流配置为应用程序的流媒体源,将另一个流配置为 Kinesis Data Analytics 保存应用程序输出的目的地。

步骤 1.1:创建 Kinesis Data Streams

在本节中,您将创建两个 Kinesis 数据流:ExampleInputStreamExampleOutputStream

使用控制台或 Amazon CLI 创建这些数据流。

  • 使用控制台创建数据流:

    1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

    2. 在导航窗格中,选择 Data Streams (数据流)

    3. 选择创建 Kinesis 流,然后创建带有一个名为 ExampleInputStream 的分片的流。

    4. 重复上一步骤以创建带有一个名为 ExampleOutputStream 的分片的流。

  • 要使用 Amazon CLI 创建数据流,请执行以下操作:

    • 使用以下 Kinesiscreate-streamAmazon CLI 命令创建直播(ExampleInputStreamExampleOutputStream)。要创建另一个流 (应用程序将用于写入输出),请运行同一命令以将流名称更改为 ExampleOutputStream

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

步骤 1.2:将示例记录写入输入流

在此步骤中,您运行 Python 代码以持续生成示例记录并将其写入 ExampleInputStream 流。

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. 安装 Python 和 pip

    有关安装 Python 的信息,请访问 Python 网站。

    您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装

  2. 运行以下 Python 代码。此代码将执行以下操作:

    • 在 (X, Y) 平面上的某个位置生成潜在热点。

    • 为每个热点生成一系列点 (1000 个)。这些点中有 20% 集中在热点周围。其余的点在整个空间内随机生成。

    • put-record 命令将 JSON 记录写入到流。

    重要

    不要将此文件上传到 Web 服务器,因为它包含您的Amazon证书。

    import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { 'left': field['left'] + random.random() * (field['width'] - spot_size), 'width': spot_size, 'top': field['top'] + random.random() * (field['height'] - spot_size), 'height': spot_size } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { 'x': rectangle['left'] + random.random() * rectangle['width'], 'y': rectangle['top'] + random.random() * rectangle['height'], 'is_hot': 'Y' if rectangle is hotspot else 'N' } return {'Data': json.dumps(point), 'PartitionKey': 'partition_key'} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={'left': 0, 'width': 10, 'top': 0, 'height': 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client('kinesis'))

下一个步骤

步骤 2:创建 Kinesis Data Analytics 应用程序