Flinkは「ストリーム処理」を基盤とした分散処理エンジンで、データが到着次第すぐに処理を行います。これはバッチ処理(例:Spark)とは異なり、マルチスレッドで逐次的に動作します。以下に、テキストファイルから読み込んだ単語をカウントする最小限の例を示します。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Collector;
public class WordCountExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.readTextFile("input/word.txt");
SingleOutputStreamOperator<Tuple2<String, Long>> wordCounts = lines
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
for (String word : line.split(" ")) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(value -> value.f0)
.sum(1);
wordCounts.print();
env.execute("WordCountExample");
}
}
Flinkアプリケーションは「Source(データ取得)」「Transformation(変換処理)」「Sink(出力)」の3層で構成されます。通常SourceにはKafka、Transformationには各種演算子(map、flatMap、filterなど)、Sinkにはデータベース(HBase)やファイルシステム(HDFS)などが使われます。
基本概念
JobManager / TaskManager / スロット
- JobManager:クライアントやクラスタからのメッセージを受け取り、リソーススケジューリングとタスク分配を担当(マネージャ的役割)
- TaskManager:実際の計算を実行するワーカー(作業者)
- スロット(Slot):1つのTaskManagerが同時に処理できる並列タスク数の上限
並列度
各演算子は1つ以上のサブタスクで構成され、その数が並列度です。コード内で setParallelism(n) を用いて演算子ごとに指定できます。
ParameterTool
Flinkが提供するコマンドライン引数解析ユーティリティ。ParameterTool.fromArgs(args) でインスタンスを取得し、プログラム内で設定値を参照します。
Flink × Kafka 連携
まずMaven依存を追加します(バージョンは1.12.3を想定)。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.3</version>
</dependency>
以下のサンプルは、Kafkaから文字列メッセージを消費し、簡単な変換(大文字化)を施して別のトピックに書き出す例です。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class KafkaToKafkaExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // デモ用
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "kafka-broker:9092");
consumerProps.setProperty("group.id", "flink-group");
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "kafka-broker:9092");
DataStream<String> input = env
.addSource(new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), consumerProps));
DataStream<String> transformed = input
.map(value -> "[UPPER] " + value.toUpperCase());
transformed
.addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), producerProps));
env.execute("KafkaToKafkaTransformation");
}
}
Kafkaクラスタ上で動作させる場合、入力・出力トピックともに事前に作成しておく必要があります。