Apache Flinkを用いたニュース記事のリアルタイム人気ランキング計算手法について解説します。5分間のスライディングウィンドウを1分間隔で分析し、クリック数上位N件のニュースを算出する処理を実装します。
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.io.File;
import java.net.URL;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.PriorityQueue;
public class NewsPopularityTracker {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String[] fieldOrder = {"userId", "newsId", "categoryId", "action", "eventTime"};
DataStream<UserActivity> rawData = loadCsvData(env, "userbehavior.csv", fieldOrder);
DataStream<UserActivity> timestampedStream = rawData
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserActivity>() {
@Override
public long extractAscendingTimestamp(UserActivity activity) {
return activity.getEventTime() * 1000L;
}
});
DataStream<UserActivity> clickStream = timestampedStream.filter(
(FilterFunction<UserActivity>) activity -> "click".equals(activity.getAction())
);
DataStream<NewsViewCount> aggregatedStream = clickStream
.keyBy("newsId")
.timeWindow(Time.minutes(5), Time.minutes(1))
.aggregate(new ClickCounter(), new WindowResultFormatter());
DataStream<String> topResults = aggregatedStream
.keyBy(window -> window.getWindowEnd())
.process(new TopNNewsProcessor(5));
topResults.print();
env.execute("News Popularity Tracking");
}
private static DataStream<UserActivity> loadCsvData(StreamExecutionEnvironment env,
String fileName,
String[] fields) {
URL resource = NewsPopularityTracker.class.getClassLoader().getResource(fileName);
PojoTypeInfo<UserActivity> typeInfo = (PojoTypeInfo<UserActivity>)
TypeExtractor.createTypeInfo(UserActivity.class);
return env.createInput(
new PojoCsvInputFormat<>(Path.fromLocalFile(new File(resource.getPath())),
typeInfo,
fields),
typeInfo
);
}
public static class ClickCounter implements AggregateFunction<UserActivity, Long, Long> {
@Override public Long createAccumulator() { return 0L; }
@Override public Long add(UserActivity value, Long acc) { return acc + 1; }
@Override public Long getResult(Long acc) { return acc; }
@Override public Long merge(Long a, Long b) { return a + b; }
}
public static class WindowResultFormatter
implements org.apache.flink.api.common.functions.WindowFunction<
Long, NewsViewCount, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow window, Iterable<Long> counts,
Collector<NewsViewCount> out) {
long newsId = ((Tuple)key).getField(0);
long viewCount = counts.iterator().next();
out.collect(NewsViewCount.create(newsId, window.getEnd(), viewCount));
}
}
public static class TopNNewsProcessor extends KeyedProcessFunction<Long, NewsViewCount, String> {
private final int topNCount;
private transient ListState<NewsViewCount> newsState;
public TopNNewsProcessor(int count) {
this.topNCount = count;
}
@Override
public void open(Configuration parameters) {
newsState = getRuntimeContext().getListState(
new ListStateDescriptor<>("news-state", NewsViewCount.class)
);
}
@Override
public void processElement(NewsViewCount view, Context ctx, Collector<String> out) {
newsState.add(view);
ctx.timerService().registerEventTimeTimer(view.getWindowEnd() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
PriorityQueue<NewsViewCount> topItems = new PriorityQueue<>(
Comparator.comparingLong(NewsViewCount::getViewCount)
);
for (NewsViewCount item : newsState.get()) {
if (topItems.size() < topNCount) {
topItems.offer(item);
} else if (item.getViewCount() > topItems.peek().getViewCount()) {
topItems.poll();
topItems.offer(item);
}
}
List<NewsViewCount> sortedResults = new ArrayList<>(topItems);
sortedResults.sort((a, b) -> Long.compare(b.getViewCount(), a.getViewCount()));
newsState.clear();
StringBuilder report = new StringBuilder();
report.append("【人気ランキング】\n");
report.append("集計時刻: ").append(new Timestamp(timestamp - 1)).append("\n");
for (NewsViewCount item : sortedResults) {
report.append("記事ID: ").append(item.getNewsId())
.append(" | ヒット数: ").append(item.getViewCount()).append("\n");
}
out.collect(report.toString());
}
}
}
public class UserActivity {
private long userId;
private long newsId;
private int categoryId;
private String action;
private long eventTime;
// getter/setterメソッドは省略
}
public class NewsViewCount {
private long newsId;
private long windowEnd;
private long viewCount;
public static NewsViewCount create(long id, long end, long count) {
NewsViewCount result = new NewsViewCount();
result.newsId = id;
result.windowEnd = end;
result.viewCount = count;
return result;
}
// getterメソッドは省略
}