バッチ処理
Maven依存関係設定
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0.0</version>
<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
バッチワードカウント実装
package com.example.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("data/input.txt")
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1)
.print();
}
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) {
for (String term : input.split("\\s+")) {
collector.collect(new Tuple2<>(term, 1));
}
}
}
}
有界ストリーム処理
package com.example.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("data/input.txt")
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String token : line.split(" ")) {
out.collect(Tuple2.of(token, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t -> t.f0)
.sum(1)
.print();
env.execute();
}
}
無界ストリーム処理
package com.example.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
public class UnboundedStreamProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9999)
.flatMap((String sentence, Collector<String> out) -> {
for (String word : sentence.split(" ")) {
out.collect(word);
}
})
.returns(Types.STRING)
.map(word -> new Tuple2<>(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t -> t.f0)
.sum(1)
.print();
env.execute();
}
}