对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
时间戳和 ROWTIME 列
应用程序内部流包含名为 ROWTIME 的特殊列。当 Amazon Kinesis Data Analytics 在第一个应用程序内流中插入一行时,它会存储时间戳。 ROWTIME反映了 Amazon Kinesis Data Analytics 在读取流媒体源后将记录插入到第一个应用程序内数据流的时间戳。之后,该 ROWTIME 值在您的整个应用程序中进行维护。
注意
当您将记录从一个应用程序内数据流传输到另一个应用程序内流时,无需明确复制该ROWTIME列,Amazon Kinesis Data Analytics 会为您复制此列。
Amazon Kinesis Data Analytics 保证ROWTIME数值是单调增加的。您可以在基于时间的窗口式查询中使用此时间戳。有关更多信息,请参阅窗口式查询:
您可以访问 SELECT 语句中的 ROWTIME 列,就像应用程序内部流中的任何其他列一样。例如:
SELECT STREAM ROWTIME, some_col_1, some_col_2 FROM SOURCE_SQL_STREAM_001
了解流式分析中的各种时间
除了 ROWTIME 之外,在实时流式应用程序中还存在其他类型的时间。这些时间是:
-
事件时间-事件发生时的时间戳。它有时也称为客户端时间。经常需要在分析中使用此时间,因为它是事件发生时的时间。但是,许多事件源(例如手机和 Web 客户端)没有可靠的时钟,这可能会导致时间不准确。此外,连接问题可能会导致记录没有按照事件发生顺序出现在流中。
-
采集时间 — 将记录添加到流媒体源的时间戳。Amazon Kinesis Data Streams
APPROXIMATE_ARRIVAL_TIME在提供此时间戳的每条记录中都包含一个名为的字段。有时这也称为服务器端时间。此接收时间通常非常接近事件时间。如果记录接收到流时存在任何种类的延迟,会导致不准确,这种情况通常很少见。此外,接收时间很少出现顺序问题,但由于流数据的分布特点,它也会出现。因此,接收时间通常准确地反映按顺序排列的事件时间。 -
处理时间 — Amazon Kinesis Data Analytics 在第一个应用程序内数据流中插入一行时的时间戳。Amazon Kinesis Data Analytics 在每个应用程序内流中存在的
ROWTIME列中提供此时间戳。处理时间总是单调增加。但如果应用程序滞后,则处理时间不准确。(如果应用程序滞后,则处理时间无法准确反映事件时间)。此ROWTIME相对于时钟来说很准确,但可能不是事件实际发生的时间。
在基于时间的窗口式查询中使用这些时间有优点也有缺点。我们建议您选择这些时间中的一个或多个,并根据您的使用案例场景选择一种策略来处理相关缺点。
注意
如果您使用的是基于行的窗口,则时间不是问题,您可以忽略本部分。
我们建议采用双窗口策略,这两个窗口基于不同的时间,即 ROWTIME 和其他时间(接收时间或事件时间)中的一个。
-
使用
ROWTIME作为第一个窗口,控制查询发送结果的频率,如以下示例所示。它不用作逻辑时间。 -
使用其他时间中您希望与分析关联的逻辑时间。该时间表示事件的发生时间。在以下示例中,分析目标是按股票行情机对记录分组并返回计数。
此策略的优势是它可以使用事件发生时间。它可以轻松处理您的应用程序落后或事件无序到达的情况。当将记录放入应用程序内部流时,如果应用程序落后,记录仍然按第二个窗口中的逻辑时间分组。查询使用 ROWTIME 确保处理顺序。落后的任何记录(与 ROWTIME 值相比,接收时间戳显示的值较早)也会成功处理。
针对入门练习中使用的演示流,考虑以下查询。查询使用 GROUP BY 子句,并在一分钟滚动窗口中发送股票行情机计数。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("ingest_time" timestamp, "APPROXIMATE_ARRIVAL_TIME" timestamp, "ticker_symbol" VARCHAR(12), "symbol_count" integer); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "ingest_time", STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND) AS "APPROXIMATE_ARRIVAL_TIME", "TICKER_SYMBOL", COUNT(*) AS "symbol_count" FROM "SOURCE_SQL_STREAM_001" GROUP BY "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND);
在 GROUP BY 中,您首先在一分钟窗口中基于 ROWTIME 对记录分组,然后基于 APPROXIMATE_ARRIVAL_TIME 分组。
结果中的时间戳值已向下舍入为最接近的 60 秒间隔。查询发送的第一组结果显示第一分钟的记录。发送的第二组结果基于 ROWTIME 显示后续分钟内的记录。最后一条记录指示应用程序在应用程序内部流中放入记录是最晚的(与接收时间戳相比,它显示最晚的 ROWTIME 值)。
ROWTIME INGEST_TIME TICKER_SYMBOL SYMBOL_COUNT --First one minute window. 2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 ABC 10 2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 DEF 15 2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 XYZ 6 –-Second one minute window. 2016-07-19 17:06:00.0 2016-07-19 17:06:00.0 ABC 11 2016-07-19 17:06:00.0 2016-07-19 17:06:00.0 DEF 11 2016-07-19 17:06:00.0 2016-07-19 17:05:00.0 XYZ 1 *** ***late-arriving record, instead of appearing in the result of the first 1-minute windows (based on ingest_time, it is in the result of the second 1-minute window.
您可以将结果推送到下游数据库,以汇总结果来得到每分钟的最终准确计数。例如,您可以将应用程序输出配置为将结果保存到可以写入 Amazon Redshift 表的 Kinesis Data Firehose 传输流中。结果出现在 Amazon Redshift 表中后,您可以查询该表以计算分组总数Ticker_Symbol。对于 XYZ,总数是精确的 (6+1),即使记录延迟到达也是如此。