This repository contains comprehensive examples demonstrating how to use the Milvus Spark Connector for both Scala and Python applications.
The Milvus Spark Connector provides seamless integration between Apache Spark and Milvus vector database, enabling efficient data processing and vector operations at scale. This example repository showcases various usage patterns and best practices.
- Apache Spark environment setup
- Milvus instance running and accessible
- For detailed Spark environment setup and the two new format parameters of the Spark connector, please refer to the main repository: milvus-spark-connector
├── src/main/scala/example/
│ ├── HelloDemo.scala # Basic usage example
│ ├── read/ # Data reading examples
│ │ ├── MilvusDemo.scala # Collection and segment reading
│ │ ├── LocalBinlogDemo.scala # Local binlog file reading
│ │ └── RemoteBinlogDemo.scala # Remote binlog file reading
│ └── write/ # Data writing examples
│ ├── FloatVectorDemo.scala # Float vector data writing
│ └── DoubleVectorDemo.scala # Double vector data writing
└── python/
├── pyspark_milvus_demo.py # Python PySpark demo
├── config.py # Configuration file
└── .env.example # Environment configuration template
The most basic example demonstrating how to connect to Milvus and read data using the Spark connector.
Key Features:
- Simple Milvus connection setup
- Basic data reading from a collection
Once you have obtained a DataFrame from a Milvus collection, you can leverage Spark DataFrame APIs for various data operations:
1. Select Specific Columns
// Scala
val selectedDF = df.select("id", "vector", "metadata")
// Python
selected_df = df.select("id", "vector", "metadata")
2. Filter Data
// Scala
val filteredDF = df.filter($"id" > 100)
val complexFilterDF = df.filter($"metadata" === "important" && $"score" > 0.8)
// Python
filtered_df = df.filter(df.id > 100)
complex_filter_df = df.filter((df.metadata == "important") & (df.score > 0.8))
3. Count Records
// Scala
val totalCount = df.count()
val filteredCount = df.filter($"score" > 0.5).count()
// Python
total_count = df.count()
filtered_count = df.filter(df.score > 0.5).count()
4. Group By and Aggregation
// Scala
val groupedDF = df.groupBy("category").agg(
count("*").as("count"),
avg("score").as("avg_score"),
max("timestamp").as("latest_timestamp")
)
// Python
grouped_df = df.groupBy("category").agg(
count("*").alias("count"),
avg("score").alias("avg_score"),
max("timestamp").alias("latest_timestamp")
)
5. Sort Data
// Scala
val sortedDF = df.orderBy($"score".desc, $"timestamp".asc)
// Python
sorted_df = df.orderBy(df.score.desc(), df.timestamp.asc())
6. Additional Operations
// Scala
// Show first 20 rows
df.show(20)
// Get schema information
df.printSchema()
// Collect to local array (use with caution for large datasets)
val localData = df.collect()
// Python
# Show first 20 rows
df.show(20)
# Get schema information
df.printSchema()
# Collect to local array (use with caution for large datasets)
local_data = df.collect()
For more comprehensive DataFrame operations, please refer to the official Spark DataFrame documentation: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html
Comprehensive example showing various ways to read data from Milvus collections.
Key Features:
- Read entire collection data
- Read specific fields from collection
- Apply timestamp filters
- Read specific segment data
- Read segment data with S3 path
Demonstrates reading Milvus binlog files stored locally.
Key Features:
- Read local insert binlog files
- Read local delete binlog files
- Support for various data types (varchar, short, float vector)
Shows how to read Milvus binlog files from remote storage (S3).
Key Features:
- Read remote insert binlog files
- Read remote delete binlog files
- S3 filesystem integration
Demonstrates writing data with float vector fields to Milvus.
Key Features:
- Collection creation with float vector fields
- Data schema definition for float vectors
- Batch data insertion
Shows writing data with double vector fields to Milvus.
Key Features:
- Collection creation with double vector fields
- Data schema definition for double vectors
- Batch data insertion
Important Note: When writing data, ensure that your Spark DataFrame schema matches the vector data type in your Milvus collection. Use
FloatType
for float vectors andDoubleType
for double vectors in your schema definition.
Comprehensive Python example using PySpark with Milvus connector.
Key Features:
- PySpark session configuration
- Milvus data reading with Python
- S3 integration setup
- Data analysis and statistics
- Error handling and validation
Most Scala examples use hardcoded configuration for simplicity:
val uri = "http://localhost:19530"
val token = "root:Milvus"
val collectionName = "your_collection_name"
Python examples use a configuration file approach. Update python/config.py
with your actual values:
MILVUS_CONFIG = {
"uri": "your_milvus_uri",
"token": "your_milvus_token",
"collection_name": "your_collection_name",
# ... other configurations
}
- Ensure your Spark environment is properly configured. If you're not sure how to set up Spark, please refer to the main repository: milvus-spark-connector
- Update the connection parameters in each demo file
- Compile and package the project:
sbt clean compile package
- Run the specific demo using spark-submit:
# For HelloDemo
spark-submit-wrapper --jars /xxx/spark-connector-assembly-x.x.x-SNAPSHOT.jar --class "example.HelloDemo" /xxx/milvus-spark-connector-example_2.13-0.1.0-SNAPSHOT.jar
# For specific read/write demos
spark-submit-wrapper --jars /xxx/spark-connector-assembly-x.x.x-SNAPSHOT.jar --class "example.read.MilvusDemo" /xxx/milvus-spark-connector-example_2.13-0.1.0-SNAPSHOT.jar
spark-submit-wrapper --jars /xxx/spark-connector-assembly-x.x.x-SNAPSHOT.jar --class "example.write.FloatVectorDemo" /xxx/milvus-spark-connector-example_2.13-0.1.0-SNAPSHOT.jar
We recommend using uv for Python environment management:
-
Install uv if you haven't already:
- Visit uv installation guide for detailed instructions
-
Navigate to the python directory:
cd python
- Copy the example environment file and configure it:
cp .env.example .env
# Edit .env with your actual Milvus and S3 configuration
- Run the demo using uv:
uv run pyspark_milvus_demo.py
- Main Repository: milvus-spark-connector
- Milvus Documentation: https://milvus.io/docs
- Apache Spark Documentation: https://spark.apache.org/docs/latest/
For questions, issues, or contributions, please refer to the main milvus-spark-connector repository.