示例:提取部分字符串 (SUBSTRING 函数) - Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:提取部分字符串 (SUBSTRING 函数)

此示例使用SUBSTRING函数在 Amazon Kinesis Data Analytics 中转换字符串。SUBSTRING 函数从特定位置开始提取部分源字符串。有关更多信息,请参阅 Amazon Kinesis Data Analytics SQL 参考中的 S UBST RING。

在本示例中,您将以下记录写入 Amazon Kinesis Data Streams。

{ "REFERRER" : "http://www.amazon.com" } { "REFERRER" : "http://www.amazon.com"} { "REFERRER" : "http://www.amazon.com"} ...

然后,您使用 Kinesis 数据流作为流媒体源,在主机上创建 Amazon Kinesis 数据分析应用程序。发现过程读取有关流式传输源的示例记录,并推断出具有一列 (REFERRER) 的应用程序内部架构,如下所示。


                控制台屏幕截图,显示具有引用站点列中的 URL 列表的应用程序内部架构。

然后,您可以将应用程序代码与 SUBSTRING 函数结合使用,来解析 URL 字符串以检索公司名称。随后将结果数据插入另一个应用程序内部流,如下所示:


                控制台屏幕截图,显示具有应用程序内部流中的结果数据的 Real-time analytics (实时分析) 选项卡。

步骤 1:创建 Kinesis Data Streams

创建 Amazon Kinesis 数据流并按如下方式填充日志记录:

  1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

  2. 在导航窗格中,选择 Data Streams (数据流)

  3. 选择 Create Kinesis stream (创建 Kinesis 流),然后创建带有一个分片的流。有关更多信息,请参阅 Amazon Kinesis Dat a Streams 开发人员指南中的创建

  4. 运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。

    import json import boto3 STREAM_NAME = 'ExampleInputStream' def get_data(): return {'REFERRER': 'http://www.amazon.com'} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey='partitionkey') if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

步骤 2:创建 Kinesis Data Analytics 应用程序

接下来,按如下方式创建 Amazon Kinesis 数据分析应用程序:

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 选择 Create application (创建应用程序),键入应用程序名称,然后选择 Create application (创建应用程序)

  3. 在应用程序详细信息页面上,选择 Connect streaming data (连接流数据)

  4. Connect to source (连接到源) 页面上,执行以下操作:

    1. 选择在上一部分中创建的流。

    2. 选择创建 IAM 角色的选项。

    3. 选择 Discover schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。

    4. 选择 Save and continue

  5. 在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)

  6. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示:

    1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      -- CREATE OR REPLACE STREAM for cleaned up referrer CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", SUBSTRING("referrer", 12, (POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4)) FROM "SOURCE_SQL_STREAM_001";
    2. 选择 Save and run SQL。在 Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。