老师请问,使用window滚动窗口和不加窗口的结果为什么不一样?

来源:7-12 基于ProcessingTime的Keyed滚动窗口实战

Java___小菜鸟

2022-02-03

1、socket终端输入:w,q,q,q,q,q,q,q,q,q,w,w,w,w,w,w 即为9个q和7个w
使用滚动窗口打印结果为:(w,16)
public static void test02(StreamExecutionEnvironment env) {
//数据源接入
env.socketTextStream(“192.168.1.201”,9527)
.flatMap(new FlatMapFunction<String, String>() {
@Override
//数据读进来处理后,然后返回
public void flatMap(String value, Collector out) throws Exception {
String[] words = value.split(",");
//增强的for循环
for(String word:words){
out.collect(word.toLowerCase().trim());
}
}
}).filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotEmpty(value);
}
//每个单词赋值一个1
}).map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value,1);
}
//根据key的值统计出现的次数
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).
sum(1).print();
}
图片描述

2.socket终端同样输入:w,q,q,q,q,q,q,q,q,q,w,w,w,w,w,w 即为9个q和7个w
注释掉//windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).
则打印的结果是正确的。
代码如下:
public static void test02(StreamExecutionEnvironment env) {
//数据源接入
env.socketTextStream(“192.168.1.201”,9527)
.flatMap(new FlatMapFunction<String, String>() {
@Override
//数据读进来处理后,然后返回
public void flatMap(String value, Collector out) throws Exception {
String[] words = value.split(",");
//增强的for循环
for(String word:words){
out.collect(word.toLowerCase().trim());
}
}
}).filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotEmpty(value);
}
//每个单词赋值一个1
}).map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value,1);
}
//根据key的值统计出现的次数
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).//windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).
sum(1).print();
}
图片描述

写回答

1回答

Michael_PK

2022-02-07

1)第二个是没有开窗的,所以你看到的结果是对的

2)第一个是因为你开了窗口,直接使用sum函数出来结果就不对。你可以这么测试,就能看出中间的执行过程:你keyBy之后先把后面的代码都注释掉,直接print出来,发现其实就是每个单词的个数。但是结合上了window,就后面就得跟上windowfunction才能正确计算了。

1
1
Java___小菜鸟
非常感谢!已经解决
2022-02-15
共1条回复

Flink+ClickHouse 玩转企业级实时大数据开发

已经在做大数据,Flink助力轻松提薪;尚未入行,让你弯道超车

1000 学习 · 225 问题

查看课程