对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 2:创建应用程序
在本节中,创建 Amazon Kinesis Data Analytics 应用程序,创建,如下所示:
-
将应用程序输入配置为使用您在中创建的 Kinesis 数据流步骤 1:准备作为流式传输源。
-
在控制台上使用 Anomaly Detection (异常检测) 模板。
创建应用程序
-
按照 Kinesis Data Analytics 入门练习中的步骤 1、2 和 3 进行操作(参见步骤 3.1:创建应用程序)。
-
在源配置中,执行以下操作:
-
指定您在上一部分中创建的流式传输源。
-
在控制台推断架构后,编辑架构并将
heartRate列类型设置为INTEGER.大多数心率值是正常的,发现过程最有可能将
TINYINT类型分配给此列。但有小部分值显示了高心率。如果这些高值不适合TINYINT类型,Kinesis Data Analytics 会将这些行发送到错误流。将数据类型更新为INTEGER,以便能适合所有生成的心率数据。
-
-
在控制台上使用 Anomaly Detection (异常检测) 模板。随后,您更新模板代码以提供适当的列名称。
-
-
通过提供列名称来更新应用程序代码。下面显示了生成的应用程序代码 (将此代码粘贴到 SQL 编辑器中):
--Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC; -
运行 SQL 代码并在 Kinesis Data Analytics 控制台上查看结果: