Skip to main content
Skip to main content

Flink Connector

ClickHouse Supported

This is the official Apache Flink Sink Connector supported by ClickHouse. It is built using Flink's AsyncSinkBase and the official ClickHouse java client.

The connector supports Apache Flink's DataStream API. Table API support is planned for a future release.

Requirements

  • Java 11+ (for Flink 1.17+) or 17+ (for Flink 2.0+)
  • Apache Flink 1.17+

The connector is split into two artifacts to support both Flink 1.17+ and Flink 2.0+. Choose the artifact that matches your desired Flink version:

Flink VersionArtifactClickHouse Java Client VersionRequired Java
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+

Note: the connector has not been tested against Flink versions earlier than 1.17.

Installation & Setup

Import as a Dependency

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

Download the binary

The name pattern of the binary JAR is:

flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar

where:

You can find all available released JAR files in the Maven Central Repository.

Using the DataStream API

Snippet

Configure ClickHouseClient

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

Let's say you want to insert RAW CSV data as is:

  1. Create an ElementConverter
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
  1. Create the sink and set the format using setClickHouseFormat
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString, 
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS, 
        MAX_BUFFERED_REQUESTS, 
        MAX_BATCH_SIZE_IN_BYTES, 
        MAX_TIME_IN_BUFFER_MS, 
        MAX_RECORD_SIZE_IN_BYTES, 
        clickHouseClientConfig
);

csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
  1. Finally, connect your DataStream to the sink.
data.sinkTo(csvSink);

More examples and snippets can be found in our tests:

Quick Start Example

We have created maven-based example for an easy start with the ClickHouse Sink:

For more detailed instructions, see the Example Guide

DataStream API Connection Options

Clickhouse Client Options

ParametersDescriptionDefault Value
urlFully qualified Clickhouse URLN/A
usernameClickHouse database usernameN/A
passwordClickHouse database passwordN/A
databaseClickHouse database nameN/A
tableClickHouse table nameN/A

Sink Options

The following options come directly from Flink's AsyncSinkBase:

ParametersDescriptionDefault Value
maxBatchSizeMaximum number of records inserted in a single batchN/A
maxInFlightRequestsThe maximum number of in flight requests allowed before the sink applies backpressureN/A
maxBufferedRequestsThe maximum number of records that may be buffered in the sink before backpressure is appliedN/A
maxBatchSizeInBytesThe maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this sizeN/A
maxTimeInBufferMSThe maximum time a record may stay in the sink before being flushedN/A
maxRecordSizeInBytesThe maximum record size that the sink will accept, records larger than this will be automatically rejectedN/A

Using the Table API

Table API support is planned for a future release. This section will be updated once available.

Snippet

Planned for a future release — this section will provide a usage snippet for configuring the Table API.

Quick Start Example

Planned for a future release — a complete end-to-end example will be added once Table API support becomes available.

Table API Connection Options

Planned for a future release — this section will be updated once available.

Supported data types

The table below provides a quick reference for converting data types when inserting from Flink into ClickHouse.

Java TypeClickHouse TypeSupportedSerialize Method
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt124
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeN/A
long/LongTime64N/A
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantN/A

Notes:

  • A ZoneId must be provided when performing date operations.
  • Precision and scale must be provided when performing decimal operations.
  • In order for ClickHouse to parse a Java String as JSON, you need to enable enableJsonSupportAsString in ClickHouseClientConfig.

Metrics

The connector exposes the following additional metrics on top of Flink's existing metrics:

MetricDescriptionTypeStatus
numBytesSendTotal number of bytes sent to ClickHouseCounter
numRecordSendTotal number of records sent to ClickHouseCounter
numRequestSubmittedTotal number of requests sent (actual number of flushes performed)Counter
numOfDroppedBatchesTotal number of batches dropped due to non-retryable failuresCounter
numOfDroppedRecordsTotal number of records dropped due to non-retryable failuresCounter
totalBatchRetriesTotal number of batch retries due to retryable failuresCounter
writeLatencyHistogramHistogram of successful write latency distribution (ms)Histogram
writeFailureLatencyHistogramHistogram of failed write latency distribution (ms)Histogram
triggeredByMaxBatchSizeCounterTotal number of flushes triggered by reaching maxBatchSizeCounter
triggeredByMaxBatchSizeInBytesCounterTotal number of flushes triggered by reaching maxBatchSizeInBytesCounter
triggeredByMaxTimeInBufferMSCounterTotal number of flushes triggered by reaching maxTimeInBufferMSCounter
actualRecordsPerBatchHistogram of actual batch size distributionHistogram
actualBytesPerBatchHistogram of actual bytes per batch distributionHistogram

Limitations

  • Currently, the sink does not support exactly-once semantics.

ClickHouse Version Compatibility and Security

  • All artifacts and versions of the connector are tested with all active LTS versions of ClickHouse.
  • See the ClickHouse security policy for known security vulnerabilities and how to report a vulnerability.
  • We recommend upgrading the connector continuously to not miss security fixes and new improvements.
  • If you have an issue with migration, please create a GitHub issue and we will respond!

Contributing and Support

If you'd like to contribute to the project or report any issues, we welcome your input! Visit our GitHub repository to open an issue, suggest improvements, or submit a pull request.

Contributions are welcome! Please check the contribution guide in the repository before starting. Thank you for helping improve the ClickHouse Flink connector!