本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
为Data Analytics for Apache Flink Powe Flink
Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中,您可以使用 Apache Flink 源
Kinesis Data Streams
该FlinkKinesisConsumer源从 Amazon Kinesis 数据流向您的应用程序提供流式数据。
创建FlinkKinesisConsumer
以下代码示例说明了如何创建 FlinkKinesisConsumer:
Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
有关使用 FlinkKinesisConsumer 的更多信息,请参阅下载并检查 Apache Flink 流式处理 Java 代码。
创建使用FlinkKinesisConsumer EFO 消费者的
FlinkKinesisConsumer 现在支持增强型扇出 (EFO)
如果 Kinesis 使用者使用 EFO,则 Kinesis Data Streams 服务会为其提供自己的专用带宽,而不是让使用者与其他使用者共享直播的固定带宽。
有关对 Kinesis 消费者使用 EFO 的更多信息,请参阅 FLIP-128:增强版Amazon Kinesis 消费者风扇输出
您可以通过在 Kinesis 使用者上设置以下参数来启用 EFO 使用者:
RECORD_PUBLISHER_TYPE:将此参数设置为 E FO,让您的应用程序使用 EFO 使用者访问 Kinesis 数据流数据。
EFO_CONSUMER_NAME:将此参数设置为在该直播的使用者中唯一的字符串值。在同一 Kinesis Data Stream 中重复使用消费者名称将导致先前使用该名称的使用者被终止。
要将 a 配置FlinkKinesisConsumer为使用 EFO,请向使用者添加以下参数:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
有关使用 EFO 使用者的 Kinesis Data Analytics 应用程序的示例,请参阅EFO 消费者。
Amazon MSK
该KafkaSource来源向您的应用程序提供来自 Amazon MSK 主题的流式传输数据。
创建KafkaSource
以下代码示例说明了如何创建 KafkaSource:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
有关使用 KafkaSource 的更多信息,请参阅MSK 复制。