Flink connector¶
First-party Apache Flink sink connector. Write streaming embeddings to GVDB with exactly-once semantics via Flink checkpoints + GVDB upsert idempotency.
Coordinates¶
Built for Flink 1.18+ on JVM 17.
Example¶
FlinkSinkExample.java
package io.gvdb.examples;
import io.gvdb.client.model.GvdbVector;
import io.gvdb.flink.GvdbSink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Map;
import java.util.Random;
public class FlinkSinkExample {
public record Embedding(long id, float[] vector, String category) {}
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5 seconds
int dimension = 128;
int count = 5000;
var random = new Random(42);
var embeddings = new Embedding[count];
for (int i = 0; i < count; i++) {
float[] vec = new float[dimension];
for (int j = 0; j < dimension; j++) vec[j] = (float) random.nextGaussian();
embeddings[i] = new Embedding(i, vec,
i % 2 == 0 ? "electronics" : "clothing");
}
var stream = env.fromCollection(
java.util.Arrays.asList(embeddings),
TypeInformation.of(Embedding.class)
);
var sink = GvdbSink.<Embedding>builder()
.setTarget("localhost:50051")
.setCollection("flink_embeddings")
.setBatchSize(1000)
.setMaxRetries(3)
.setRecordMapper(e -> new GvdbVector(
e.id(),
e.vector(),
Map.of("category", e.category())
))
.build();
stream.sinkTo(sink);
env.execute("gvdb-flink-sink-example");
}
}
Builder options¶
| Method | Default | Description |
|---|---|---|
setTarget(host:port) |
required | GVDB proxy or single-node endpoint |
setCollection(name) |
required | Target collection |
setApiKey(key) |
— | API key for RBAC |
setTls(bool) |
false |
Enable TLS |
setBatchSize(int) |
1000 |
Records per insert RPC |
setMaxRetries(int) |
3 |
Retries on transient errors |
setRecordMapper(fn) |
required | Maps your record type → GvdbVector |
setWriteMode(mode) |
UPSERT |
INSERT or UPSERT |
Exactly-once¶
Combine Flink checkpointing with GVDB's upsert idempotency:
env.enableCheckpointing(5000);
// Sink uses default UPSERT mode — retried records produce the same result
Back-pressure¶
The sink blocks on invoke() when in-flight batches exceed the configured limit. Flink's back-pressure mechanism propagates this upstream.
Source¶
See also¶
- Spark connector for batch workloads
- Java client for direct gRPC
- Streaming ingestion use case