对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:创建受限警报
在这个 Amazon Kinesis 数据分析应用程序中,查询在通过演示流创建的应用程序内流上持续运行。有关更多信息,请参阅连续查询:如果任何行显示股票价格变动大于 1%,这些行将被插入另一个应用程序内部流中。应用程序会限制警报,以便在股票价格变化时立即发送警报。但每个股票代号每分钟向应用程序内部流发送的警报不超过一个。
创建受限警报应用程序
按照 Kinesis 数据分析入门练习中的说明创建 Kinesis Data Analytics 应用程序。
在 Kinesis Data Analytics 的 SQL 编辑器中,将应用程序代码替换为以下代码:
CREATE OR REPLACE STREAM "CHANGE_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE (ABS(Change / (Price - Change)) * 100) > 1; -- ** Trigger Count and Limit ** -- Counts "triggers" or those values that evaluated true against the previous where clause -- Then provides its own limit on the number of triggers per hour per ticker symbol to what -- is specified in the WHERE clause CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM ( ticker_symbol VARCHAR(4), change REAL, trigger_count INTEGER); CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAM SELECT STREAM ticker_symbol, change, trigger_count FROM ( SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_count FROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers WINDOW W1 AS (PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING) ) WHERE trigger_count >= 1;应用程序代码中的
SELECT语句将在SOURCE_SQL_STREAM_001中筛选出显示股票价格更改大于 1% 的行,并使用数据泵将这些行插入另一个应用程序内部流CHANGE_STREAM。然后,应用程序为受限警报创建第二个名为
TRIGGER_COUNT_STREAM的流。第二个查询从一个窗口中选择记录,每次记录被允许进入该窗口时,该窗口都向前跳,以便每个股票报价每分钟只有一个记录被写入到流中。-
选择 Save and run SQL。
该示例将流输出到与以下内容类似的 TRIGGER_COUNT_STREAM: