表 API 连接连接器和连接 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

表 API 连接连接器和连接

在 Apache Flink 编程模型中,连接器是应用程序用来从外部来源(例如其他Amazon服务)读取或写入数据的组件。

通过 Apache Flink Table API,你可以使用以下类型的连接器:

  • 表 API 源代码:您可以使用 Table API 源连接器使用 API 调TableEnvironment用或 SQL 查询在自己的数据库中创建表。

  • 表 API 接收器:您可以使用 SQL 命令将表数据写入外部来源,例如 Amazon MSK 主题或 Amazon S3 存储桶。

表 API 源代码

您可以从数据流创建表源。以下代码基于 Amazon MSK 主题创建表:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

有关表源的更多信息,请参阅 Apache Flink 文档中的表和连接器

表 API 接收器

要将表数据写入接收器,请在 SQL 中创建接收器,然后在该StreamTableEnvironment对象上运行基于 SQL 的接收器。

以下代码示例演示了如何将表数据写到 Amazon S3 接收器:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

你可以使用format参数控制 Kinesis Data Analytics 使用什么格式将输出写入接收器。有关格式的信息,请参阅 Apache Flink 文档中的格式

有关表格接收器的更多信息,请参阅 Apache Flink 文档中的表格和连接器

用户定义的源头和汇点

您可以使用现有的 Apache Kafka 连接器向其他Amazon服务(例如亚马逊 MSK 和Amazon S3)发送数据。要与其他数据源和目标进行交互,您可以定义自己的源和汇点。有关更多信息,请参阅 Apache Flink 文档中的用户定义源和接收器