Spark connector¶
First-party Apache Spark DataSource v2 connector. Read and write GVDB collections from Spark (Scala, Java, or PySpark) for batch and streaming workloads.
Coordinates¶
Built for Spark 3.5+ on Scala 2.13. JVM 17.
Write¶
spark_write.py
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType, StructField, LongType, FloatType, ArrayType, StringType
)
import random
spark = (
SparkSession.builder
.master("local[*]")
.appName("gvdb-spark-write-example")
.getOrCreate()
)
DIMENSION = 128
NUM_VECTORS = 10_000
data = [
(i, [random.gauss(0, 1) for _ in range(DIMENSION)], f"item_{i}", random.random())
for i in range(NUM_VECTORS)
]
schema = StructType([
StructField("id", LongType(), False),
StructField("vector", ArrayType(FloatType()), False),
StructField("name", StringType(), True),
StructField("score", FloatType(), True),
])
df = spark.createDataFrame(data, schema)
df.write.format("io.gvdb.spark") \
.option("gvdb.target", "localhost:50051") \
.option("gvdb.collection", "spark_embeddings") \
.option("gvdb.dimension", str(DIMENSION)) \
.option("gvdb.metric", "cosine") \
.option("gvdb.index_type", "auto") \
.option("gvdb.batch_size", "5000") \
.mode("append") \
.save()
Read¶
df_read = spark.read.format("io.gvdb.spark") \
.option("gvdb.target", "localhost:50051") \
.option("gvdb.collection", "spark_embeddings") \
.option("gvdb.include_metadata", "true") \
.load()
df_read.show(5, truncate=True)
Options¶
| Option | Required | Default | Description |
|---|---|---|---|
gvdb.target |
yes | — | host:port of the proxy or single-node server |
gvdb.collection |
yes | — | Collection name |
gvdb.dimension |
writes only | — | Vector dimension (on auto-create) |
gvdb.metric |
no | cosine |
l2, ip, or cosine |
gvdb.index_type |
no | auto |
Any supported index type |
gvdb.batch_size |
no | 1000 |
Rows per insert RPC |
gvdb.api_key |
no | — | API key for RBAC |
gvdb.tls |
no | false |
Enable TLS |
gvdb.include_metadata |
reads only | false |
Include metadata columns |
Schema mapping¶
id→ GVDB vector ID (LongType)vector→ dense vector (ArrayType(FloatType))- All other columns → per-vector metadata (JSON-serialized)
Custom column names can be configured via gvdb.id_column / gvdb.vector_column.
Write modes¶
append— inserts new vectors; existing IDs will error.overwrite— drops and recreates the collection, then writes.upsert(via.option("gvdb.write_mode", "upsert")) — idempotent, safe to re-run.
Streaming¶
Structured Streaming writes are supported:
stream = spark.readStream...
stream.writeStream.format("io.gvdb.spark") \
.option("gvdb.target", "localhost:50051") \
.option("gvdb.collection", "stream_embeddings") \
.option("checkpointLocation", "/tmp/gvdb-checkpoint") \
.start()
Source¶
See also¶
- Flink connector for real-time streaming
- Java client for direct gRPC usage
- Streaming ingestion use case