sink 不报错 也不能插入clickhouse
来源:13-6 Flink整合CH插入数据
weixin_宝慕林4291967
2022-02-17
package com.chen.flink.app;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
public class ClickHouseApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// col1,col2,col3
SingleOutputStreamOperator<Tuple3<String, String, String>> map = env.socketTextStream("localhost", 9527)
.map(new MapFunction<String, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(String value) throws Exception {
String[] splits = value.split(",");
return Tuple3.of(splits[0].trim(), splits[1].trim(), splits[2].trim());
}
});
map.print("接收到的数据:");
map.addSink(JdbcSink.sink(
"insert into ch_test values(?,?,?)",
(pstmt, x)->{
pstmt.setString(1, x.f0);
pstmt.setString(2, x.f1);
pstmt.setString(3, x.f2);
},
JdbcExecutionOptions.builder().withBatchSize(2).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://192.168.137.222:8123/chen")
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.build()
));
env.execute();
}
}
写回答
1回答
-
Michael_PK
2022-02-20
先确认一个问题:map.print("接收到的数据:"); 这句话是否有结果数据打印出来?
00
相似问题