sink 不报错 也不能插入clickhouse

来源:13-6 Flink整合CH插入数据

weixin_宝慕林4291967

2022-02-17

package com.chen.flink.app;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
public class ClickHouseApp {

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // col1,col2,col3
    SingleOutputStreamOperator<Tuple3<String, String, String>> map = env.socketTextStream("localhost", 9527)
            .map(new MapFunction<String, Tuple3<String, String, String>>() {
                @Override
                public Tuple3<String, String, String> map(String value) throws Exception {
                    String[] splits = value.split(",");
                    return Tuple3.of(splits[0].trim(), splits[1].trim(), splits[2].trim());
                }
            });
    map.print("接收到的数据:");
    map.addSink(JdbcSink.sink(
            "insert into ch_test values(?,?,?)",
            (pstmt, x)->{
                pstmt.setString(1, x.f0);
                pstmt.setString(2, x.f1);
                pstmt.setString(3, x.f2);
            },
            JdbcExecutionOptions.builder().withBatchSize(2).build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:clickhouse://192.168.137.222:8123/chen")
                    .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                    .build()
    ));

    env.execute();
}

}

写回答

1回答

Michael_PK

2022-02-20

先确认一个问题:map.print("接收到的数据:");  这句话是否有结果数据打印出来?

0
0

Flink+ClickHouse 玩转企业级实时大数据开发

已经在做大数据,Flink助力轻松提薪;尚未入行,让你弯道超车

1024 学习 · 225 问题

查看课程