Skip to content

Commit

Permalink
add vsag library (#549)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxyucs authored Oct 29, 2024
1 parent 9239445 commit 266b14f
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 5 deletions.
1 change: 1 addition & 0 deletions .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ jobs:
- vearch
- vespa
- voyager
- vsag
- weaviate
include:
- library: pynndescent
Expand Down
12 changes: 12 additions & 0 deletions ann_benchmarks/algorithms/vsag/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM ann-benchmarks

WORKDIR /home/app

RUN date

RUN lscpu

RUN pip3 install pyvsag==0.0.7

RUN python3 -c 'import pyvsag'

16 changes: 16 additions & 0 deletions ann_benchmarks/algorithms/vsag/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
float:
any:
- base_args: ['@metric', '@dimension']
constructor: Vsag
disabled: false
docker_tag: ann-benchmarks-vsag
module: ann_benchmarks.algorithms.vsag
name: vsag
run_groups:
HNSW:
args:
M: [8, 12, 16, 24, 32, 36, 48, 64]
ef_construction: 300
use_int8: [4, 8]
rs: [0, 0.3, 0.5, 1]
query_args: [[10, 20, 30, 40, 60, 80, 120, 200, 400, 600, 800]]
77 changes: 77 additions & 0 deletions ann_benchmarks/algorithms/vsag/module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pyvsag
import numpy as np
import json
import struct
from ..base.module import BaseANN

class Vsag(BaseANN):
def __init__(self, metric, dim, method_param):
self._metric = {"euclidean": "l2", "angular": "ip"}[metric]

self._params = dict()
self._params["M"] = method_param["M"]
self._params["efc"] = method_param["ef_construction"]

self._params["sq"] = -1
if method_param["use_int8"] != 0:
self._params["sq"] = method_param["use_int8"]

if "alpha" in method_param:
self._params["a"] = method_param["alpha"]
else:
self._params["a"] = 1.0

if "rs" in method_param:
self._params["rs"] = method_param["rs"]
else:
self._params["rs"] = 1

self.name = "vsag (%s)" % (self._params)
self._ef = 0
print(self._params)

def fit(self, X):
index_params = {
"dtype": "float32",
"metric_type": "l2",
"dim": len(X[0]),
"hnsw": {
"max_degree": self._params["M"],
"ef_construction": self._params["efc"],
"ef_search": self._params["efc"],
"max_elements": len(X),
"use_static": False,
"sq_num_bits": self._params["sq"],
"alpha": self._params["a"],
"redundant_rate": self._params["rs"]
}
}
print(index_params)
self._index = pyvsag.Index("hnsw", json.dumps(index_params))
if self._metric == "ip":
X[np.linalg.norm(X, axis=1) == 0] = 1.0 / np.sqrt(X.shape[1])
X /= np.linalg.norm(X, axis=1)[:, np.newaxis]
self._index.build(vectors=X,
ids=range(len(X)),
num_elements=len(X),
dim=len(X[0]))


def set_query_arguments(self, ef):
self._ef = ef
self.name = "efs_%s_%s" % (self._ef, self._params)

def query(self, v, n):
search_params = {
"hnsw": {
"ef_search": self._ef
}
}
length = 1
if self._metric == "ip":
length = np.linalg.norm(v)
if length == 0:
length = 1
ids, dists = self._index.knn_search(vector=v / length, k=n, parameters=json.dumps(search_params))
return ids

12 changes: 7 additions & 5 deletions ann_benchmarks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def positive_int(input_str: str) -> int:
return i


def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue) -> None:
def run_worker(cpu: int, mem_limit: int, args: argparse.Namespace, queue: multiprocessing.Queue) -> None:
"""
Executes the algorithm based on the provided parameters.
Expand All @@ -58,6 +58,7 @@ def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue)
Args:
cpu (int): The CPU number to be used in the execution.
mem_limit (int): The memory to be used in the execution.
args (argparse.Namespace): User provided arguments for running workers.
queue (multiprocessing.Queue): The multiprocessing queue that contains the algorithm definitions.
Expand All @@ -69,8 +70,6 @@ def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue)
if args.local:
run(definition, args.dataset, args.count, args.runs, args.batch)
else:
memory_margin = 500e6 # reserve some extra memory for misc stuff
mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism)
cpu_limit = str(cpu) if not args.batch else f"0-{multiprocessing.cpu_count() - 1}"

run_docker(definition, args.dataset, args.count, args.runs, args.timeout, args.batch, cpu_limit, mem_limit)
Expand Down Expand Up @@ -252,8 +251,11 @@ def create_workers_and_execute(definitions: List[Definition], args: argparse.Nam
for definition in definitions:
task_queue.put(definition)

memory_margin = 500e6 # reserve some extra memory for misc stuff
mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism)

try:
workers = [multiprocessing.Process(target=run_worker, args=(i + 1, args, task_queue)) for i in range(args.parallelism)]
workers = [multiprocessing.Process(target=run_worker, args=(i + 1, mem_limit, args, task_queue)) for i in range(args.parallelism)]
[worker.start() for worker in workers]
[worker.join() for worker in workers]
finally:
Expand Down Expand Up @@ -343,4 +345,4 @@ def main():
else:
logger.info(f"Order: {definitions}")

create_workers_and_execute(definitions, args)
create_workers_and_execute(definitions, args)

0 comments on commit 266b14f

Please sign in to comment.