如何用高并行度在文件夹里面同时读取不同csv

来源:5-6 自定义数据源方式RichParallelSourceFunction之Scala实现

LucienLi

2019-08-29

老师你好,

我最近在开发csv source文件读取,设置成并行度1的时候并没有什么问题,但是设置多并行度的时候发现程序会重复读取csv文件,如果文件夹中几百个csv就会出现重复读取多次,用的是RichParallelSourceFunction,请问怎么解决?
代码如下
getFileTree(path).filter(_.getName.endsWith(fileExtension)).foreach(file => {
println(file)
val csvInput = env.readCsvFile[RawDepthTickData](file.getAbsolutePath, ignoreFirstLine = true)
.collect()
csvInput.foreach(tick => ctx.collect(tick))
del(file)
})

写回答

1回答

Michael_PK

2019-08-29

重复的图贴下,看看,

0
0

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程