Apache Flinkの基本実装ガイド

バッチ処理

Maven依存関係設定

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.example</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0.0</version>

    <properties>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
</project>

バッチワードカウント実装

package com.example.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        env.readTextFile("data/input.txt")
            .flatMap(new Tokenizer())
            .groupBy(0)
            .sum(1)
            .print();
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) {
            for (String term : input.split("\\s+")) {
                collector.collect(new Tuple2<>(term, 1));
            }
        }
    }
}

有界ストリーム処理

package com.example.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.readTextFile("data/input.txt")
            .flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
                for (String token : line.split(" ")) {
                    out.collect(Tuple2.of(token, 1));
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(t -> t.f0)
            .sum(1)
            .print();
        
        env.execute();
    }
}

無界ストリーム処理

package com.example.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;

public class UnboundedStreamProcessor {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.socketTextStream("localhost", 9999)
            .flatMap((String sentence, Collector<String> out) -> {
                for (String word : sentence.split(" ")) {
                    out.collect(word);
                }
            })
            .returns(Types.STRING)
            .map(word -> new Tuple2<>(word, 1))
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(t -> t.f0)
            .sum(1)
            .print();
        
        env.execute();
    }
}

タグ: ApacheFlink DataStreamAPI バッチ処理 ストリーム処理 Java

5月18日 08:11 投稿