Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add vsag library #549

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ jobs:
- vearch
- vespa
- voyager
- vsag
- weaviate
include:
- library: pynndescent
Expand Down
12 changes: 12 additions & 0 deletions ann_benchmarks/algorithms/vsag/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM ann-benchmarks

WORKDIR /home/app

RUN date

RUN lscpu

RUN pip3 install pyvsag==0.0.7

RUN python3 -c 'import pyvsag'

16 changes: 16 additions & 0 deletions ann_benchmarks/algorithms/vsag/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
float:
any:
- base_args: ['@metric', '@dimension']
constructor: Vsag
disabled: false
docker_tag: ann-benchmarks-vsag
module: ann_benchmarks.algorithms.vsag
name: vsag
run_groups:
HNSW:
args:
M: [8, 12, 16, 24, 32, 36, 48, 64]
ef_construction: 300
use_int8: [4, 8]
rs: [0, 0.3, 0.5, 1]
query_args: [[10, 20, 30, 40, 60, 80, 120, 200, 400, 600, 800]]
77 changes: 77 additions & 0 deletions ann_benchmarks/algorithms/vsag/module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pyvsag
import numpy as np
import json
import struct
from ..base.module import BaseANN

class Vsag(BaseANN):
def __init__(self, metric, dim, method_param):
self._metric = {"euclidean": "l2", "angular": "ip"}[metric]

self._params = dict()
self._params["M"] = method_param["M"]
self._params["efc"] = method_param["ef_construction"]

self._params["sq"] = -1
if method_param["use_int8"] != 0:
self._params["sq"] = method_param["use_int8"]

if "alpha" in method_param:
self._params["a"] = method_param["alpha"]
else:
self._params["a"] = 1.0

if "rs" in method_param:
self._params["rs"] = method_param["rs"]
else:
self._params["rs"] = 1

self.name = "vsag (%s)" % (self._params)
self._ef = 0
print(self._params)

def fit(self, X):
index_params = {
"dtype": "float32",
"metric_type": "l2",
"dim": len(X[0]),
"hnsw": {
"max_degree": self._params["M"],
"ef_construction": self._params["efc"],
"ef_search": self._params["efc"],
"max_elements": len(X),
"use_static": False,
"sq_num_bits": self._params["sq"],
"alpha": self._params["a"],
"redundant_rate": self._params["rs"]
}
}
print(index_params)
self._index = pyvsag.Index("hnsw", json.dumps(index_params))
if self._metric == "ip":
X[np.linalg.norm(X, axis=1) == 0] = 1.0 / np.sqrt(X.shape[1])
X /= np.linalg.norm(X, axis=1)[:, np.newaxis]
self._index.build(vectors=X,
ids=range(len(X)),
num_elements=len(X),
dim=len(X[0]))


def set_query_arguments(self, ef):
self._ef = ef
self.name = "efs_%s_%s" % (self._ef, self._params)

def query(self, v, n):
search_params = {
"hnsw": {
"ef_search": self._ef
}
}
length = 1
if self._metric == "ip":
length = np.linalg.norm(v)
if length == 0:
length = 1
ids, dists = self._index.knn_search(vector=v / length, k=n, parameters=json.dumps(search_params))
return ids

12 changes: 7 additions & 5 deletions ann_benchmarks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def positive_int(input_str: str) -> int:
return i


def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue) -> None:
def run_worker(cpu: int, mem_limit: int, args: argparse.Namespace, queue: multiprocessing.Queue) -> None:
"""
Executes the algorithm based on the provided parameters.

Expand All @@ -58,6 +58,7 @@ def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue)

Args:
cpu (int): The CPU number to be used in the execution.
mem_limit (int): The memory to be used in the execution.
args (argparse.Namespace): User provided arguments for running workers.
queue (multiprocessing.Queue): The multiprocessing queue that contains the algorithm definitions.

Expand All @@ -69,8 +70,6 @@ def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue)
if args.local:
run(definition, args.dataset, args.count, args.runs, args.batch)
else:
memory_margin = 500e6 # reserve some extra memory for misc stuff
mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism)
cpu_limit = str(cpu) if not args.batch else f"0-{multiprocessing.cpu_count() - 1}"

run_docker(definition, args.dataset, args.count, args.runs, args.timeout, args.batch, cpu_limit, mem_limit)
Expand Down Expand Up @@ -252,8 +251,11 @@ def create_workers_and_execute(definitions: List[Definition], args: argparse.Nam
for definition in definitions:
task_queue.put(definition)

memory_margin = 500e6 # reserve some extra memory for misc stuff
mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism)

try:
workers = [multiprocessing.Process(target=run_worker, args=(i + 1, args, task_queue)) for i in range(args.parallelism)]
workers = [multiprocessing.Process(target=run_worker, args=(i + 1, mem_limit, args, task_queue)) for i in range(args.parallelism)]
[worker.start() for worker in workers]
[worker.join() for worker in workers]
finally:
Expand Down Expand Up @@ -343,4 +345,4 @@ def main():
else:
logger.info(f"Order: {definitions}")

create_workers_and_execute(definitions, args)
create_workers_and_execute(definitions, args)
Loading