示例:从查询中聚合部分结果 - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:从查询中聚合部分结果

如果 Amazon Kinesis 数据流包含的事件时间与摄取时间不完全匹配的记录,则滚动窗口中的精选结果包含在该窗口内到达但未必发生的记录。在这种情况下,滚动窗口只包含您需要的部分结果集。您可以通过多种方法来纠正这一问题:

  • 仅使用滚动窗口,并使用 upsert 通过数据库或数据仓库在后处理中聚合部分结果。这种方法在处理应用程序时很有效。它为聚合运算符(summinmax 等)无限期地处理后期数据。这种方法的缺点是,您必须在数据库层开发和维护额外的应用程序逻辑。

  • 使用滚动和滑动窗口,这会在早期生成部分结果,还会继续在滑动窗口期间生成完整的结果。此方法使用 overwrite 操作而非 upsert 操作来处理新近数据,这样就不需要在数据库层添加任何其他应用程序逻辑。这种方法的缺点是它使用更多的 Kinesis 处理单元 (KPU),但仍然会产生两个结果,这可能不适用于某些用例。

有关滚动和滑动窗口的更多信息,请参阅窗口式查询

在以下过程中,滚动窗口聚合会生成两个部分结果(发送到 CALC_COUNT_SQL_STREAM 应用程序内部流),它们必须合并以生成最终结果。然后,应用程序生成第二个聚合(发送到 DESTINATION_SQL_STREAM 应用程序内部流),它合并这两个部分结果。

创建使用事件时间聚合部分结果的应用程序
  1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

  2. 在导航窗格中,选择 Data Analytics (数据分析)。按照教程中所述创建 Kinesis Data Analytics 应用程序。

  3. 在 SQL 编辑器中,将应用程序代码替换为以下内容:

    CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);

    应用程序代码中的 SELECT 语句将在 SOURCE_SQL_STREAM_001 中筛选出显示股票价格更改大于 1% 的行,并使用数据泵将这些行插入另一个应用程序内部流 CHANGE_STREAM

  4. 选择 Save and run SQL

第一个数据泵将流输出到与以下内容类似的 CALC_COUNT_SQL_STREAM。请注意,结果集不完整:


                显示部分结果的控制台屏幕截图。

然后,第二个泵将流输出到 DESTINATION_SQL_STREAM,其中包含完整的结果集:


                显示完整结果的控制台屏幕截图。