示例:在流中检测数据异常情况 (RANDOM_CUT_FOREST 函数) - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:在流中检测数据异常情况 (RANDOM_CUT_FOREST 函数)

Amazon Kinesis Data Analytics 提供了一个函数 (RANDOM_CUT_FOREST),该函数可以根据数字列中的值为每条记录分配异常分数。有关更多信息,请参阅 Amazon Kinesis Data Analytics SQL 参考中的RANDOM_CUT_FOREST函数

在本练习中,您将编写应用程序代码以将异常分数分配给应用程序的流式传输源中的记录。要设置应用程序,请执行以下操作:

  1. 设置流媒体源-设置 Kinesis 数据流并写入示例heartRate数据,如下所示:

    {"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}

    此过程提供用于填充流的 Python 脚本。heartRate 值将随机生成,99% 的记录具有的 heartRate 值介于 60 和 100 之间,仅 1% 的记录具有的 heartRate 值介于 150 和 200 之间。因此,heartRate 值介于 150 和 200 之间的记录是异常情况。

  2. 配置输入-使用控制台创建 Kinesis Data Analytics 应用程序,并通过将流媒体源映射到应用程序内部流来配置应用程序输入(SOURCE_SQL_STREAM_001)。应用程序启动时,Kinesis Data Analytics 会持续读取流媒体源并将记录插入应用程序内流。

  3. 指定应用程序代码 - 示例使用以下应用程序代码:

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

    此代码读取 SOURCE_SQL_STREAM_001 中的行,分配异常分数,并将结果行写入另一个应用程序内部流 (TEMP_STREAM)。随后,应用程序代码将对 TEMP_STREAM 中的记录进行排序,并将结果保存到另一个应用程序内部流 (DESTINATION_SQL_STREAM)。您使用数据泵将流插入到应用程序内部流。有关更多信息,请参阅应用程序内部流和数据泵

  4. 配置输出-将应用程序输出配置为将中的数据保存DESTINATION_SQL_STREAM到外部目的地,即另一个 Kinesis 数据流。查看分配给每条记录的异常分数并确定哪个分数指示应用程序外部的异常情况 (您需要收到这些异常情况的警报)。您可以使用 Amazon Lambda 函数处理这些异常分数并配置警报。

此练习使用美国东部(弗吉尼亚北部us-east-1)() 来创建这些直播和应用程序。如果您使用任何其他区域,则必须相应地更新代码。

下一个步骤

步骤 1:准备