对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:转换 DateTime 值
Amazon Kinesis Data Analytics 支持将列转换为时间戳。例如,除了ROWTIME列之外,您可能还想将自己的时间戳作为GROUP BY子句的一部分用作另一个基于时间的窗口。Kinesis Data Analytics 提供用于处理日期和时间字段的操作和 SQL 函数。
-
日期和时间运算符-您可以对日期、时间和间隔数据类型执行算术运算。有关更多信息,请参阅 Amazon Kinesis Data Analytics SQL 参考中的日期、时间戳和间隔运算符。
-
SQL 函数-包括以下内容。有关更多信息,请参阅 Amazon Kinesis Data Analytics SQL 参考中的日期和时间函数。
-
EXTRACT()- 从日期、时间、时间戳或间隔表达式中提取一个字段。 -
CURRENT_TIME— 返回执行查询的时间 (UTC)。 -
CURRENT_DATE— 返回执行查询的日期 (UTC)。 -
CURRENT_TIMESTAMP- 返回执行查询的时间戳 (UTC)。 -
LOCALTIME— 返回查询的当前执行时间,由 Kinesis Data Analytics 运行的环境 (UTC) 定义。 -
LOCALTIMESTAMP— 返回由 Kinesis Data Analytics 运行的环境 (UTC) 定义的当前时间戳。
-
-
SQL 扩展-包括以下内容。有关更多信息,请参阅 Amazon Kinesis Data Analytics SQL 参考中的日期和时间函数以及日期时间转换函数。
-
CURRENT_ROW_TIMESTAMP- 为流中的每一行返回一个新的时间戳。 -
TSDIFF- 返回两个时间戳的差异 (以毫秒为单位)。 -
CHAR_TO_DATE— 将串串串串串串串串串串串串串串串 -
CHAR_TO_TIME— 将串串串串串串串串串串串串串 -
CHAR_TO_TIMESTAMP- 将字符串转换为时间戳。 -
DATE_TO_CHAR— 将日期转换为字符串。 -
TIME_TO_CHAR— 将时间转换为字符串。 -
TIMESTAMP_TO_CHAR- 将时间戳转换为字符串。
-
上面大多数 SQL 函数都采用一种格式来转换列。格式是灵活多变的。例如,您可以指定采用格式 yyyy-MM-dd hh:mm:ss 将输入字符串 2009-09-16 03:15:24 转换为时间戳。有关更多信息,请参阅 A mazon Kinesis 数据分析 SQL 参考中的 Char To Timestamp (Sys)。
示例:转换日期
在此示例中,将以下串串串串串串串串串串串串串串串串串串串串串串串串串串串串串
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...
然后,你在主机上创建一个 Amazon Kinesis 数据分析应用程序,将 Kinesis 直播作为流媒体源。发现过程读取流式传输源中的示例记录,并推断出具有两个列 (EVENT_TIME 和 TICKER) 的如下应用程序内部架构。
然后,将该应用程序代码与 SQL 函数结合使用,以多种方式转换 EVENT_TIME 时间戳字段。随后将结果数据插入另一个应用程序内部流,如下面的屏幕截图所示:
步骤 1:创建 Kinesis Data Streams
创建 Amazon Kinesis 数据流并在其中填充事件时间和行情记录,如下所示:
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 Data Streams (数据流)。
-
选择 Create Kinesis stream (创建 Kinesis 流),然后创建带有一个分片的流。
-
运行以下 Python 代码以便用示例数据填充流。此简单代码会不断将具有随机股票代号和当前时间戳的记录写入流中。
import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
步骤 2:创建Amazon Kinesis Data Analytics 应用程序
按如下方式创建应用程序:
打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
选择 Create application (创建应用程序),键入应用程序名称,然后选择 Create application (创建应用程序)。
-
在应用程序详细信息页面上,选择 Connect streaming data (连接流数据),以连接到源。
-
在 Connect to source (连接到源) 页面上,执行以下操作:
-
选择在上一部分中创建的流。
-
选择以创建 IAM 角色。
-
选择 Discover schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。
-
选择 Edit Schema (编辑架构)。将 EVENT_TIME 列的 Column type (列类型) 更改为
TIMESTAMP。 -
选择 Save schema and update stream samples。在控制台保存架构后,选择 Exit (退出)。
-
选择 Save and continue。
-
-
在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)。
-
在 SQL 编辑器中编写应用程序代码并确认结果,如下所示:
-
复制下面的应用程序代码并将其粘贴到编辑器中。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001" -
选择 Save and run SQL。在 Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。
-