说明
当前使用的flink-connector-jdbc仅支持Flink DataStreamAPI的方式向ClickHouse导入数据,TableAPI和FlinkSQL尚不支持。
依赖
需要在pom.xml中添加如下依赖,分别为flink connector和clickhouse jdbc驱动
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.12.7</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch8</version>
<classifier>http</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
代码示例
在如下代码中,主要介绍了如何通过flink向clickhouse sink中写入数据,source部分可以稍加修改变成kafka等其他服务
public class SinkClickHouse {
private static final String CLICKHOUSE_URL = "jdbc:ch://{host}:{port}/{database}";
private static final String CLICKHOUSE_JDBC_DRIVER = "com.clickhouse.jdbc.ClickHouseDriver";
private static final String CLICKHOUSE_USER_NAME = "";
private static final String CLICKHOUSE_PASSWD = "";
public static void main(String [] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 这里可以修改kafka或者其他服务作为source
**/
// sql语句,用问号做占位符
String sql = "insert into test_jdbc(name, user_id) values(?, ?)";
// 伪造数据
Tuple2<String, Integer> bjTp = Tuple2.of("zhaoliwei", 10086);
JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(CLICKHOUSE_URL) // ck服务地址
.withDriverName(CLICKHOUSE_JDBC_DRIVER) // ck jdbc driver
.withUsername(CLICKHOUSE_USER_NAME) // ck user name
.withPassword(CLICKHOUSE_PASSWD) // ck password
.build();
env.fromElements(bjTp)
.returns(Types.TUPLE(Types.STRING, Types.INT))
// 添加JDBCSink
.addSink(JdbcSink.sink(
sql,
(ps, tp) -> {
ps.setString(1, tp.f0);
ps.setInt(2, tp.f1);
},
jdbcOptions)
);
// 执行
try {
env.execute();
} catch (Exception e) {
}
}
}
部署运行
将上述代码打包成jar包,部署开启flink任务即可。