diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index c1fd7b5ad..22203be52 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -70,6 +70,7 @@ jobs: - vearch - vespa - voyager + - vsag - weaviate include: - library: pynndescent diff --git a/ann_benchmarks/algorithms/vsag/Dockerfile b/ann_benchmarks/algorithms/vsag/Dockerfile new file mode 100644 index 000000000..1594767ae --- /dev/null +++ b/ann_benchmarks/algorithms/vsag/Dockerfile @@ -0,0 +1,12 @@ +FROM ann-benchmarks + +WORKDIR /home/app + +RUN date + +RUN lscpu + +RUN pip3 install pyvsag==0.0.7 + +RUN python3 -c 'import pyvsag' + diff --git a/ann_benchmarks/algorithms/vsag/config.yml b/ann_benchmarks/algorithms/vsag/config.yml new file mode 100644 index 000000000..e81b7ddc5 --- /dev/null +++ b/ann_benchmarks/algorithms/vsag/config.yml @@ -0,0 +1,16 @@ +float: + any: + - base_args: ['@metric', '@dimension'] + constructor: Vsag + disabled: false + docker_tag: ann-benchmarks-vsag + module: ann_benchmarks.algorithms.vsag + name: vsag + run_groups: + HNSW: + args: + M: [8, 12, 16, 24, 32, 36, 48, 64] + ef_construction: 300 + use_int8: [4, 8] + rs: [0, 0.3, 0.5, 1] + query_args: [[10, 20, 30, 40, 60, 80, 120, 200, 400, 600, 800]] diff --git a/ann_benchmarks/algorithms/vsag/module.py b/ann_benchmarks/algorithms/vsag/module.py new file mode 100644 index 000000000..fcf7e62e6 --- /dev/null +++ b/ann_benchmarks/algorithms/vsag/module.py @@ -0,0 +1,77 @@ +import pyvsag +import numpy as np +import json +import struct +from ..base.module import BaseANN + +class Vsag(BaseANN): + def __init__(self, metric, dim, method_param): + self._metric = {"euclidean": "l2", "angular": "ip"}[metric] + + self._params = dict() + self._params["M"] = method_param["M"] + self._params["efc"] = method_param["ef_construction"] + + self._params["sq"] = -1 + if method_param["use_int8"] != 0: + self._params["sq"] = method_param["use_int8"] + + if "alpha" in method_param: + self._params["a"] = method_param["alpha"] + else: + self._params["a"] = 1.0 + + if "rs" in method_param: + self._params["rs"] = method_param["rs"] + else: + self._params["rs"] = 1 + + self.name = "vsag (%s)" % (self._params) + self._ef = 0 + print(self._params) + + def fit(self, X): + index_params = { + "dtype": "float32", + "metric_type": "l2", + "dim": len(X[0]), + "hnsw": { + "max_degree": self._params["M"], + "ef_construction": self._params["efc"], + "ef_search": self._params["efc"], + "max_elements": len(X), + "use_static": False, + "sq_num_bits": self._params["sq"], + "alpha": self._params["a"], + "redundant_rate": self._params["rs"] + } + } + print(index_params) + self._index = pyvsag.Index("hnsw", json.dumps(index_params)) + if self._metric == "ip": + X[np.linalg.norm(X, axis=1) == 0] = 1.0 / np.sqrt(X.shape[1]) + X /= np.linalg.norm(X, axis=1)[:, np.newaxis] + self._index.build(vectors=X, + ids=range(len(X)), + num_elements=len(X), + dim=len(X[0])) + + + def set_query_arguments(self, ef): + self._ef = ef + self.name = "efs_%s_%s" % (self._ef, self._params) + + def query(self, v, n): + search_params = { + "hnsw": { + "ef_search": self._ef + } + } + length = 1 + if self._metric == "ip": + length = np.linalg.norm(v) + if length == 0: + length = 1 + ids, dists = self._index.knn_search(vector=v / length, k=n, parameters=json.dumps(search_params)) + return ids + diff --git a/ann_benchmarks/main.py b/ann_benchmarks/main.py index b650d4950..07539775d 100644 --- a/ann_benchmarks/main.py +++ b/ann_benchmarks/main.py @@ -48,7 +48,7 @@ def positive_int(input_str: str) -> int: return i -def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue) -> None: +def run_worker(cpu: int, mem_limit: int, args: argparse.Namespace, queue: multiprocessing.Queue) -> None: """ Executes the algorithm based on the provided parameters. @@ -58,6 +58,7 @@ def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue) Args: cpu (int): The CPU number to be used in the execution. + mem_limit (int): The memory to be used in the execution. args (argparse.Namespace): User provided arguments for running workers. queue (multiprocessing.Queue): The multiprocessing queue that contains the algorithm definitions. @@ -69,8 +70,6 @@ def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue) if args.local: run(definition, args.dataset, args.count, args.runs, args.batch) else: - memory_margin = 500e6 # reserve some extra memory for misc stuff - mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism) cpu_limit = str(cpu) if not args.batch else f"0-{multiprocessing.cpu_count() - 1}" run_docker(definition, args.dataset, args.count, args.runs, args.timeout, args.batch, cpu_limit, mem_limit) @@ -252,8 +251,11 @@ def create_workers_and_execute(definitions: List[Definition], args: argparse.Nam for definition in definitions: task_queue.put(definition) + memory_margin = 500e6 # reserve some extra memory for misc stuff + mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism) + try: - workers = [multiprocessing.Process(target=run_worker, args=(i + 1, args, task_queue)) for i in range(args.parallelism)] + workers = [multiprocessing.Process(target=run_worker, args=(i + 1, mem_limit, args, task_queue)) for i in range(args.parallelism)] [worker.start() for worker in workers] [worker.join() for worker in workers] finally: @@ -343,4 +345,4 @@ def main(): else: logger.info(f"Order: {definitions}") - create_workers_and_execute(definitions, args) \ No newline at end of file + create_workers_and_execute(definitions, args)