Vector databases have become essential infrastructure for modern AI applications, enabling efficient similarity search, recommendation systems, and semantic analysis. This comprehensive guide demonstrates deploying Milvus, a cloud-native vector database, on AlmaLinux for production AI workloads, covering installation, optimization, and integration with popular ML frameworks.
Understanding Vector Databases and Milvus
Vector databases are specialized systems designed to store and query high-dimensional vectors:
- Similarity Search: Find nearest neighbors in vector space
- Scalability: Handle billions of vectors efficiently
- Real-time Performance: Sub-second query latency
- AI Integration: Native support for embeddings
- Hybrid Search: Combine vector and scalar filtering
Milvus Architecture
Milvus is a cloud-native vector database with distributed architecture:
- Access Layer: Handles client connections and requests
- Coordinator Service: Manages metadata and coordination
- Worker Nodes: Execute queries and indexing
- Storage Layer: Distributed storage for vectors and data
- Message Queue: Ensures data consistency
Prerequisites
Before deploying Milvus:
- AlmaLinux 9 with kernel 5.4+
- Minimum 16GB RAM (32GB+ recommended)
- SSD storage with 100GB+ free space
- Docker and Docker Compose installed
- NVIDIA GPU (optional, for acceleration)
- Python 3.8+ for client applications
Setting Up the Environment
System Preparation
# Update system packages
sudo dnf update -y
# Install required dependencies
sudo dnf install -y \
git \
wget \
curl \
python3-pip \
python3-devel \
gcc \
gcc-c++ \
make \
cmake
# Install Docker
sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
sudo dnf install -y docker-ce docker-ce-cli containerd.io docker-compose-plugin
sudo systemctl start docker
sudo systemctl enable docker
sudo usermod -aG docker $USER
# Configure system parameters
cat <<EOF | sudo tee /etc/sysctl.d/99-milvus.conf
vm.max_map_count = 262144
vm.swappiness = 1
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
EOF
sudo sysctl -p /etc/sysctl.d/99-milvus.conf
# Set up storage directories
sudo mkdir -p /milvus/data /milvus/logs /milvus/etcd
sudo chown -R $USER:$USER /milvus
GPU Support Setup (Optional)
# Install NVIDIA drivers and CUDA
sudo dnf config-manager --add-repo https://developer.download.nvidia.com/compute/cuda/repos/rhel9/x86_64/cuda-rhel9.repo
sudo dnf clean all
sudo dnf -y module install nvidia-driver:latest-dkms
sudo dnf install -y cuda-toolkit-12-3
# Install NVIDIA Container Toolkit
distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.repo | \
sudo tee /etc/yum.repos.d/nvidia-docker.repo
sudo dnf clean expire-cache
sudo dnf install -y nvidia-container-toolkit
sudo systemctl restart docker
# Verify GPU availability
nvidia-smi
docker run --rm --gpus all nvidia/cuda:12.3.0-base-ubuntu22.04 nvidia-smi
Deploying Milvus Cluster
Single-Node Deployment
# docker-compose.yml - Milvus single-node deployment
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- /milvus/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
networks:
- milvus
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- /milvus/data:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
networks:
- milvus
standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
PULSAR_ADDRESS: pulsar://pulsar:6650
DOCKER_VOLUME_DIRECTORY: /var/lib/milvus
volumes:
- /milvus/data:/var/lib/milvus
- ./milvus.yaml:/milvus/configs/milvus.yaml
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- "etcd"
- "minio"
networks:
- milvus
networks:
milvus:
driver: bridge
Distributed Deployment
# docker-compose-distributed.yml - Milvus distributed deployment
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
volumes:
- /milvus/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
networks:
- milvus
pulsar:
container_name: milvus-pulsar
image: apachepulsar/pulsar:2.8.2
command: bin/pulsar standalone
volumes:
- /milvus/pulsar:/pulsar/data
networks:
- milvus
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- /milvus/data:/minio_data
command: minio server /minio_data --console-address ":9001"
ports:
- "9001:9001"
networks:
- milvus
rootcoord:
container_name: milvus-rootcoord
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "rootcoord"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
PULSAR_ADDRESS: pulsar://pulsar:6650
depends_on:
- "etcd"
- "pulsar"
- "minio"
networks:
- milvus
datacoord:
container_name: milvus-datacoord
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "datacoord"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
PULSAR_ADDRESS: pulsar://pulsar:6650
depends_on:
- "rootcoord"
networks:
- milvus
querycoord:
container_name: milvus-querycoord
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "querycoord"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
PULSAR_ADDRESS: pulsar://pulsar:6650
depends_on:
- "rootcoord"
networks:
- milvus
proxy:
container_name: milvus-proxy
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "proxy"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
PULSAR_ADDRESS: pulsar://pulsar:6650
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- "rootcoord"
- "datacoord"
- "querycoord"
networks:
- milvus
datanode:
container_name: milvus-datanode
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "datanode"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
PULSAR_ADDRESS: pulsar://pulsar:6650
depends_on:
- "datacoord"
networks:
- milvus
querynode:
container_name: milvus-querynode
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "querynode"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
PULSAR_ADDRESS: pulsar://pulsar:6650
depends_on:
- "querycoord"
networks:
- milvus
indexnode:
container_name: milvus-indexnode
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "indexnode"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
PULSAR_ADDRESS: pulsar://pulsar:6650
depends_on:
- "datacoord"
networks:
- milvus
networks:
milvus:
driver: bridge
Milvus Configuration
# milvus.yaml - Milvus configuration file
# This config file is used for Milvus server configuration
etcd:
endpoints:
- localhost:2379
rootPath: by-dev
metaSubPath: meta
kvSubPath: kv
minio:
address: localhost
port: 9000
accessKeyID: minioadmin
secretAccessKey: minioadmin
useSSL: false
bucketName: milvus-bucket
rootPath: files
pulsar:
address: localhost
port: 6650
maxMessageSize: 5242880
rocksmq:
path: /var/lib/milvus/rdb_data
common:
defaultPartitionName: _default
defaultIndexName: _default_idx
entityExpiration: -1
indexSliceSize: 16
gracefulTime: 5000
gracefulStopTimeout: 30
proxy:
port: 19530
internalPort: 19529
http:
enabled: true
port: 9091
queryCoord:
autoHandoff: true
autoBalance: true
overloadedMemoryThresholdPercentage: 90
balanceIntervalSeconds: 60
memoryUsageMaxDifferencePercentage: 30
queryNode:
gracefulStopTimeout: 0
stats:
publishInterval: 1000
dataSync:
flowGraph:
maxQueueLength: 1024
maxParallelism: 1024
segcore:
chunkRows: 1024
interimIndex:
nlist: 128
nprobe: 16
indexNode:
scheduler:
buildParallel: 1
dataNode:
dataSync:
flowGraph:
maxQueueLength: 1024
maxParallelism: 1024
dataCoord:
segment:
maxSize: 512
diskSegmentMaxSize: 2048
sealProportion: 0.25
assignmentExpiration: 2000
maxLifetime: 86400
gc:
interval: 3600
missingTolerance: 86400
dropTolerance: 10800
Creating Vector Collections
Python Client Setup
# install_dependencies.py
import subprocess
import sys
dependencies = [
"pymilvus==2.3.3",
"numpy>=1.21.0",
"pandas>=1.3.0",
"scikit-learn>=1.0.0",
"torch>=2.0.0",
"transformers>=4.30.0",
"sentence-transformers>=2.2.0",
"Pillow>=9.0.0",
"tqdm>=4.62.0",
]
for dep in dependencies:
subprocess.check_call([sys.executable, "-m", "pip", "install", dep])
Collection Management
# milvus_collection_manager.py
from pymilvus import (
connections,
Collection,
CollectionSchema,
FieldSchema,
DataType,
utility,
)
import numpy as np
from typing import List, Dict, Any
class MilvusManager:
def __init__(self, host: str = "localhost", port: str = "19530"):
self.host = host
self.port = port
self.connect()
def connect(self):
"""Connect to Milvus server"""
connections.connect(
alias="default",
host=self.host,
port=self.port,
timeout=30,
)
print(f"Connected to Milvus at {self.host}:{self.port}")
def create_collection(
self,
collection_name: str,
dim: int,
metric_type: str = "L2",
description: str = "",
) -> Collection:
"""Create a collection for vector storage"""
# Define fields
fields = [
FieldSchema(
name="id",
dtype=DataType.INT64,
is_primary=True,
auto_id=True,
description="Primary ID"
),
FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=dim,
description="Vector embedding"
),
FieldSchema(
name="metadata",
dtype=DataType.JSON,
description="Additional metadata"
),
]
# Create schema
schema = CollectionSchema(
fields=fields,
description=description or f"Collection for {dim}-dimensional vectors"
)
# Create collection
collection = Collection(
name=collection_name,
schema=schema,
consistency_level="Strong"
)
print(f"Created collection: {collection_name}")
return collection
def create_index(
self,
collection: Collection,
index_type: str = "IVF_FLAT",
metric_type: str = "L2",
params: Dict[str, Any] = None
):
"""Create index for efficient vector search"""
if params is None:
params = {"nlist": 1024}
index_params = {
"index_type": index_type,
"metric_type": metric_type,
"params": params
}
collection.create_index(
field_name="embedding",
index_params=index_params
)
print(f"Created {index_type} index with metric {metric_type}")
def insert_vectors(
self,
collection: Collection,
embeddings: np.ndarray,
metadata: List[Dict[str, Any]] = None
) -> List[int]:
"""Insert vectors into collection"""
if metadata is None:
metadata = [{}] * len(embeddings)
# Prepare data
data = [
embeddings.tolist(),
metadata
]
# Insert
insert_result = collection.insert(data)
collection.flush()
print(f"Inserted {len(embeddings)} vectors")
return insert_result.primary_keys
def search_vectors(
self,
collection: Collection,
query_vectors: np.ndarray,
top_k: int = 10,
search_params: Dict[str, Any] = None,
expr: str = None,
output_fields: List[str] = None
):
"""Search for similar vectors"""
if search_params is None:
search_params = {"nprobe": 16}
if output_fields is None:
output_fields = ["metadata"]
# Load collection
collection.load()
# Search
results = collection.search(
data=query_vectors.tolist(),
anns_field="embedding",
param=search_params,
limit=top_k,
expr=expr,
output_fields=output_fields,
consistency_level="Strong"
)
return results
def create_partition(
self,
collection: Collection,
partition_name: str
):
"""Create partition for data organization"""
collection.create_partition(partition_name)
print(f"Created partition: {partition_name}")
def get_collection_stats(self, collection_name: str) -> Dict[str, Any]:
"""Get collection statistics"""
collection = Collection(collection_name)
stats = collection.num_entities
return {
"name": collection_name,
"num_entities": stats,
"loaded": utility.load_state(collection_name),
"has_index": collection.has_index(),
"partitions": [p.name for p in collection.partitions]
}
Building AI Applications with Milvus
Text Embedding and Semantic Search
# semantic_search.py
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Any
import json
class SemanticSearchEngine:
def __init__(self, milvus_manager: MilvusManager, model_name: str = "all-MiniLM-L6-v2"):
self.milvus = milvus_manager
self.model = SentenceTransformer(model_name)
self.collection_name = "semantic_search"
self.dim = self.model.get_sentence_embedding_dimension()
# Create collection
self.collection = self.milvus.create_collection(
collection_name=self.collection_name,
dim=self.dim,
metric_type="IP", # Inner Product for normalized vectors
description="Semantic search collection"
)
# Create index
self.milvus.create_index(
collection=self.collection,
index_type="IVF_SQ8",
metric_type="IP",
params={"nlist": 2048}
)
def index_documents(self, documents: List[Dict[str, str]]):
"""Index documents for semantic search"""
# Extract texts
texts = [doc.get("content", "") for doc in documents]
# Generate embeddings
print("Generating embeddings...")
embeddings = self.model.encode(
texts,
normalize_embeddings=True,
show_progress_bar=True,
batch_size=32
)
# Prepare metadata
metadata = []
for doc in documents:
meta = {
"title": doc.get("title", ""),
"url": doc.get("url", ""),
"category": doc.get("category", ""),
"timestamp": doc.get("timestamp", ""),
}
metadata.append(meta)
# Insert into Milvus
self.milvus.insert_vectors(
collection=self.collection,
embeddings=embeddings,
metadata=metadata
)
print(f"Indexed {len(documents)} documents")
def search(
self,
query: str,
top_k: int = 10,
category_filter: str = None
) -> List[Dict[str, Any]]:
"""Search for similar documents"""
# Generate query embedding
query_embedding = self.model.encode(
[query],
normalize_embeddings=True
)
# Build filter expression
expr = None
if category_filter:
expr = f'metadata["category"] == "{category_filter}"'
# Search
results = self.milvus.search_vectors(
collection=self.collection,
query_vectors=query_embedding,
top_k=top_k,
search_params={"nprobe": 64},
expr=expr,
output_fields=["metadata"]
)
# Format results
formatted_results = []
for hits in results:
for hit in hits:
result = {
"score": hit.score,
"id": hit.id,
**hit.entity.get("metadata", {})
}
formatted_results.append(result)
return formatted_results
Image Similarity Search
# image_search.py
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
from typing import List, Tuple
import os
class ImageSearchEngine:
def __init__(self, milvus_manager: MilvusManager):
self.milvus = milvus_manager
self.collection_name = "image_search"
# Load pre-trained model
self.model = models.resnet50(pretrained=True)
self.model.eval()
# Remove final classification layer
self.model = torch.nn.Sequential(*list(self.model.children())[:-1])
# Image preprocessing
self.transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
# Feature dimension
self.dim = 2048
# Create collection
self.collection = self.milvus.create_collection(
collection_name=self.collection_name,
dim=self.dim,
metric_type="L2",
description="Image similarity search"
)
# Create index
self.milvus.create_index(
collection=self.collection,
index_type="IVF_PQ",
metric_type="L2",
params={"nlist": 2048, "m": 16, "nbits": 8}
)
def extract_features(self, image_path: str) -> np.ndarray:
"""Extract features from image"""
# Load and preprocess image
image = Image.open(image_path).convert('RGB')
image_tensor = self.transform(image).unsqueeze(0)
# Extract features
with torch.no_grad():
features = self.model(image_tensor)
features = features.squeeze().numpy()
# L2 normalize
features = features / np.linalg.norm(features)
return features
def index_images(self, image_paths: List[str]):
"""Index images for similarity search"""
embeddings = []
metadata = []
print("Extracting image features...")
for path in image_paths:
try:
# Extract features
features = self.extract_features(path)
embeddings.append(features)
# Create metadata
meta = {
"path": path,
"filename": os.path.basename(path),
"size": os.path.getsize(path),
}
metadata.append(meta)
except Exception as e:
print(f"Error processing {path}: {e}")
continue
# Convert to numpy array
embeddings = np.array(embeddings)
# Insert into Milvus
self.milvus.insert_vectors(
collection=self.collection,
embeddings=embeddings,
metadata=metadata
)
print(f"Indexed {len(embeddings)} images")
def search_similar_images(
self,
query_image_path: str,
top_k: int = 10
) -> List[Dict[str, Any]]:
"""Search for similar images"""
# Extract query features
query_features = self.extract_features(query_image_path)
query_features = query_features.reshape(1, -1)
# Search
results = self.milvus.search_vectors(
collection=self.collection,
query_vectors=query_features,
top_k=top_k,
search_params={"nprobe": 128}
)
# Format results
formatted_results = []
for hits in results:
for hit in hits:
result = {
"distance": hit.distance,
"id": hit.id,
**hit.entity.get("metadata", {})
}
formatted_results.append(result)
return formatted_results
Performance Optimization
Index Selection Guide
# index_benchmark.py
import time
import numpy as np
from typing import Dict, List, Tuple
import matplotlib.pyplot as plt
class IndexBenchmark:
def __init__(self, milvus_manager: MilvusManager):
self.milvus = milvus_manager
self.index_configs = {
"FLAT": {
"index_type": "FLAT",
"params": {},
"search_params": {}
},
"IVF_FLAT": {
"index_type": "IVF_FLAT",
"params": {"nlist": 1024},
"search_params": {"nprobe": 16}
},
"IVF_SQ8": {
"index_type": "IVF_SQ8",
"params": {"nlist": 1024},
"search_params": {"nprobe": 16}
},
"IVF_PQ": {
"index_type": "IVF_PQ",
"params": {"nlist": 1024, "m": 16, "nbits": 8},
"search_params": {"nprobe": 16}
},
"HNSW": {
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 200},
"search_params": {"ef": 64}
},
"ANNOY": {
"index_type": "ANNOY",
"params": {"n_trees": 16},
"search_params": {"search_k": -1}
},
}
def benchmark_index(
self,
collection_name: str,
vectors: np.ndarray,
queries: np.ndarray,
index_name: str,
metric_type: str = "L2"
) -> Dict[str, float]:
"""Benchmark a specific index type"""
config = self.index_configs[index_name]
# Create collection
collection = self.milvus.create_collection(
collection_name=f"{collection_name}_{index_name}",
dim=vectors.shape[1],
metric_type=metric_type
)
# Insert vectors
insert_start = time.time()
self.milvus.insert_vectors(collection, vectors)
insert_time = time.time() - insert_start
# Create index
index_start = time.time()
self.milvus.create_index(
collection=collection,
index_type=config["index_type"],
metric_type=metric_type,
params=config["params"]
)
index_time = time.time() - index_start
# Load collection
collection.load()
# Search benchmark
search_times = []
for query in queries:
search_start = time.time()
self.milvus.search_vectors(
collection=collection,
query_vectors=query.reshape(1, -1),
top_k=10,
search_params=config["search_params"]
)
search_times.append(time.time() - search_start)
avg_search_time = np.mean(search_times)
# Cleanup
collection.drop()
return {
"index_type": index_name,
"insert_time": insert_time,
"index_time": index_time,
"avg_search_time": avg_search_time,
"qps": 1.0 / avg_search_time
}
def run_benchmark(
self,
num_vectors: int = 100000,
dim: int = 128,
num_queries: int = 100
):
"""Run comprehensive benchmark"""
# Generate test data
print(f"Generating {num_vectors} vectors of dimension {dim}")
vectors = np.random.random((num_vectors, dim)).astype('float32')
queries = np.random.random((num_queries, dim)).astype('float32')
# Run benchmarks
results = []
for index_name in self.index_configs.keys():
print(f"\nBenchmarking {index_name}...")
try:
result = self.benchmark_index(
collection_name="benchmark",
vectors=vectors,
queries=queries,
index_name=index_name
)
results.append(result)
print(f"Results: {result}")
except Exception as e:
print(f"Error benchmarking {index_name}: {e}")
return results
GPU Acceleration Configuration
# gpu_config.py
import os
from pymilvus import connections, Collection
def configure_gpu_search(collection: Collection, gpu_id: int = 0):
"""Configure GPU acceleration for search"""
# Set GPU search parameters
search_params = {
"metric_type": "L2",
"params": {
"nprobe": 16,
"gpu_search_threshold": 1000, # Use GPU when result > threshold
"gpu_id": gpu_id
}
}
# Enable GPU index
index_params = {
"index_type": "GPU_IVF_FLAT",
"metric_type": "L2",
"params": {
"nlist": 1024,
"gpu_id": gpu_id
}
}
# Create GPU index
collection.create_index(
field_name="embedding",
index_params=index_params
)
print(f"Configured GPU {gpu_id} for accelerated search")
return search_params
Monitoring and Operations
Prometheus Metrics Setup
# prometheus-config.yaml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'milvus'
static_configs:
- targets: ['localhost:9091']
metrics_path: '/metrics'
- job_name: 'milvus-exporter'
static_configs:
- targets: ['localhost:9092']
Monitoring Dashboard
# monitoring.py
import requests
import json
from datetime import datetime
import pandas as pd
class MilvusMonitor:
def __init__(self, metrics_url: str = "http://localhost:9091/metrics"):
self.metrics_url = metrics_url
def get_metrics(self) -> Dict[str, Any]:
"""Fetch current metrics from Milvus"""
response = requests.get(self.metrics_url)
metrics = {}
for line in response.text.split('\n'):
if line and not line.startswith('#'):
parts = line.split(' ')
if len(parts) == 2:
metric_name, value = parts
metrics[metric_name] = float(value)
return metrics
def get_system_info(self) -> Dict[str, Any]:
"""Get system information"""
from pymilvus import utility
return {
"version": utility.get_server_version(),
"build_time": utility.get_server_type(),
"collections": utility.list_collections(),
"connections": connections.list_connections(),
}
def check_health(self) -> Dict[str, Any]:
"""Health check for Milvus cluster"""
health_status = {
"timestamp": datetime.now().isoformat(),
"status": "healthy",
"checks": {}
}
# Check connections
try:
connections.connect(alias="health_check", host="localhost", port="19530")
health_status["checks"]["connection"] = "OK"
connections.disconnect("health_check")
except Exception as e:
health_status["status"] = "unhealthy"
health_status["checks"]["connection"] = str(e)
# Check metrics endpoint
try:
response = requests.get(self.metrics_url, timeout=5)
health_status["checks"]["metrics"] = "OK" if response.status_code == 200 else "FAIL"
except Exception as e:
health_status["checks"]["metrics"] = str(e)
return health_status
Backup and Recovery
#!/bin/bash
# backup_milvus.sh
BACKUP_DIR="/backup/milvus/$(date +%Y%m%d_%H%M%S)"
MILVUS_DATA_DIR="/milvus/data"
ETCD_ENDPOINTS="localhost:2379"
echo "Starting Milvus backup..."
# Create backup directory
mkdir -p $BACKUP_DIR
# Backup etcd metadata
echo "Backing up etcd..."
docker exec milvus-etcd etcdctl snapshot save /tmp/snapshot.db \
--endpoints=$ETCD_ENDPOINTS
docker cp milvus-etcd:/tmp/snapshot.db $BACKUP_DIR/etcd-snapshot.db
# Backup MinIO data
echo "Backing up MinIO data..."
docker run --rm \
-v $MILVUS_DATA_DIR:/data \
-v $BACKUP_DIR:/backup \
alpine tar czf /backup/minio-data.tar.gz -C /data .
# Backup configuration
echo "Backing up configuration..."
cp -r /path/to/milvus/configs $BACKUP_DIR/
# Create backup metadata
cat > $BACKUP_DIR/metadata.json <<EOF
{
"timestamp": "$(date -Iseconds)",
"version": "$(docker exec milvus-standalone milvus version)",
"collections": $(docker exec milvus-standalone milvus-cli list collections),
"size": "$(du -sh $BACKUP_DIR | cut -f1)"
}
EOF
echo "Backup completed: $BACKUP_DIR"
Production Best Practices
High Availability Configuration
# ha-deployment.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: milvus-ha-config
data:
milvus.yaml: |
proxy:
replicas: 3
queryNode:
replicas: 3
resources:
requests:
memory: "8Gi"
cpu: "2"
limits:
memory: "16Gi"
cpu: "4"
dataNode:
replicas: 3
resources:
requests:
memory: "4Gi"
cpu: "1"
limits:
memory: "8Gi"
cpu: "2"
indexNode:
replicas: 2
resources:
requests:
memory: "8Gi"
cpu: "2"
nvidia.com/gpu: "1" # GPU for indexing
limits:
memory: "16Gi"
cpu: "4"
nvidia.com/gpu: "1"
Security Configuration
# security_config.py
from pymilvus import connections, utility
import ssl
def setup_secure_connection():
"""Setup TLS/SSL secured connection"""
# Create SSL context
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_REQUIRED
context.load_cert_chain(
certfile="/path/to/client.crt",
keyfile="/path/to/client.key"
)
context.load_verify_locations("/path/to/ca.crt")
# Connect with TLS
connections.connect(
alias="secure",
host="milvus.example.com",
port="19530",
secure=True,
client_pem_path="/path/to/client.pem",
client_key_path="/path/to/client.key",
ca_pem_path="/path/to/ca.pem",
server_pem_path="/path/to/server.pem",
server_name="milvus.example.com"
)
print("Established secure connection to Milvus")
Conclusion
Deploying Milvus on AlmaLinux provides a robust vector database platform for AI workloads, enabling efficient similarity search, recommendation systems, and semantic analysis at scale. By following the deployment patterns, optimization techniques, and best practices outlined in this guide, you can build production-ready AI applications that leverage the power of vector embeddings.
The combination of Milvus’s distributed architecture, GPU acceleration capabilities, and comprehensive indexing options makes it an ideal choice for organizations looking to implement vector search in their AI infrastructure.