对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:检测流上的热点 (HOTSPOTS 函数)
Amazon Kinesis Data Analytics 提供了该HOTSPOTS功能,它可以定位和返回有关数据中相对密集区域的信息。有关更多信息,请参阅 Amazon Kinesis Data Analytics SQL 参考中的热点。
在本练习中,您将编写应用程序代码以查找应用程序的流式传输源上的热点。要设置应用程序,请执行以下步骤:
-
设置流媒体源-设置 Kinesis 直播并写入样本坐标数据,如下所示:
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}本示例提供了用于填充流的 Python 脚本。
x和y值是随机生成的,一些记录集中在特定位置周围。如果脚本有意生成值作为热点的一部分,
is_hot字段将作为指示器提供。这可以帮助您评估热点检测函数是否正常运行。 -
创建应用程序-然后使用创建 Kinesis 数据分析应用程序。Amazon Web Services Management Console通过将流式传输源映射到应用程序内部流 (
SOURCE_SQL_STREAM_001) 来配置应用程序输入。应用程序启动时,Kinesis Data Analytics 会持续读取流媒体源并将记录插入应用程序内流。在本练习中,您将为应用程序使用以下代码:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "x" DOUBLE, "y" DOUBLE, "is_hot" VARCHAR(4), HOTSPOTS_RESULT VARCHAR(10000) ); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" FROM TABLE ( HOTSPOTS( CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 1000, 0.2, 17) );此代码读取
SOURCE_SQL_STREAM_001中的行,分析它是否有大量热点,并将生成的数据写入到另一个应用程序内部流 (DESTINATION_SQL_STREAM)。您使用数据泵将流插入到应用程序内部流。有关更多信息,请参阅应用程序内部流和数据泵: -
配置输出-将应用程序输出配置为将数据从应用程序发送到外部目的地,即另一个 Kinesis 数据流。查看热点分数并确定哪些分数表明出现了热点 (并且您需要收到警报)。您可以使用 Amazon Lambda 函数进一步处理热点信息并配置警报。
-
验证输出-该示例包括一个从输出流读取数据并以图形方式显示的 JavaScript 应用程序,因此您可以实时查看该应用程序生成的热点。
该练习使用美国西部(俄勒冈)(us-west-2) 来创建这些直播和您的应用程序。如果您使用任何其他区域,请相应地更新代码。