Skip to content

Commit

Permalink
Optimize key based indexing and then scoring
Browse files Browse the repository at this point in the history
  • Loading branch information
rajendrant committed Jan 6, 2019
1 parent d91ce24 commit c9f9801
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 56 deletions.
14 changes: 14 additions & 0 deletions benchmark/benchmark-large.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ for query in three_letter_tests
console.timeEnd('ThreeLetter#fuzzaldrin-plus-fast')
console.log("======")

console.time('TwoLetter#fuzzaldrin-plus-fast-filter')
for query in two_letter_tests
FuzzaldrinPlusFast.filter lines, query, maxResults: 10
console.timeEnd('TwoLetter#fuzzaldrin-plus-fast-filter')
console.log("======")

dict = []
for e in lines
dict.push {key:e, val:e}
console.time('TwoLetter#Keybased#Filter')
for query in two_letter_tests
FuzzaldrinPlusFast.filter dict, query, maxResults: 10, key: 'key'
console.timeEnd('TwoLetter#Keybased#Filter')
console.log("======")

# An exmaple run below
# npm run benchmarklarge
Expand Down
23 changes: 10 additions & 13 deletions fuzzaldrin.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,28 @@ class FuzzaldrinPlusFast
@obj = new binding.Fuzzaldrin()

setCandidates: (candidates, options = {}) ->
@item_to_val = null
if options.key?
@item_to_val = {}
newcandidates = []
for c in candidates
newcandidates.push c[options.key]
@item_to_val[c[options.key]] = c
candidates = newcandidates
@obj.setCandidates(candidates)

filter: (query, options = {}) ->
options = parseOptions(options)
filtered = @obj.filter query, options.maxResults,
@obj.filter query, options.maxResults,
options.usePathScoring, options.useExtensionBonus
return if @item_to_val? then filtered.map((item) => @item_to_val[item]) else filtered

module.exports =

New: ->
new FuzzaldrinPlusFast()

filter: (candidates, query, options = {}) ->
obj = new FuzzaldrinPlusFast()
obj.setCandidates(candidates, options)
obj.filter(query, options)
if options.key?
options = parseOptions(options)
filtered = binding.filterWithCandidates query, options.maxResults,
options.usePathScoring, options.useExtensionBonus, candidates, options.key
return filtered.map((item) => candidates[item])
else
obj = new FuzzaldrinPlusFast()
obj.setCandidates(candidates, options)
obj.filter(query, options)

score: (candidate, query, options = {}) ->
options = parseOptions(options)
Expand Down
71 changes: 71 additions & 0 deletions src/ConcurrentQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//
// Copyright (c) 2013 Juan Palacios [email protected]
// Subject to the BSD 2-Clause License
// - see < http://opensource.org/licenses/BSD-2-Clause>
//
// Source:
// https://github.com/juanchopanza/cppblog/blob/master/Concurrency/Queue/Queue.h

#ifndef CONCURRENT_QUEUE_H__
#define CONCURRENT_QUEUE_H__

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template <typename T>
class ConcurrentQueue
{
public:

bool empty() {
std::unique_lock<std::mutex> mlock(mutex_);
bool res = queue_.empty();
mlock.unlock();
return res;
}

T pop()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
auto val = queue_.front();
queue_.pop();
return val;
}

void pop(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
item = queue_.front();
queue_.pop();
}

void push(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
cond_.notify_one();
}
ConcurrentQueue() = default;
ConcurrentQueue(const ConcurrentQueue&) = delete; // disable copying
ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; // disable assignment
// move constructor
ConcurrentQueue(ConcurrentQueue && a) : queue_(std::move(a.queue_)) {}

private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};

#endif // CONCURRENT_QUEUE_H__
13 changes: 12 additions & 1 deletion src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <iostream>
#include <utility>

#include <napi.h>

using namespace std;


Expand All @@ -30,8 +32,15 @@ typedef string Element;
typedef string Candidate;
#endif

typedef struct CandidateIndex {
size_t thread_id;
size_t index;
CandidateIndex(size_t thread_id, size_t index) : thread_id(thread_id), index(index) {}
} CandidateIndex;

typedef std::vector<Candidate> Candidates;
typedef float Score;
typedef std::vector<CandidateIndex> CandidateIndexes;

struct Options;

Expand Down Expand Up @@ -76,4 +85,6 @@ extern Score path_scorer_score(const Candidate &string, const Element &query, co
extern int countDir(const Candidate &path, int end, char pathSeparator);
extern Candidate getExtension(const Candidate &str);

extern Candidates filter(const Candidates &candidates, const Element &query, const Options &options);
extern CandidateIndexes filter(const vector<Candidates> &candidates, const Element &query, const Options &options);

Napi::Value filter_with_candidates(Napi::Env env, const Napi::Array &candidates, const std::string &key, const std::string &query, const Options &options);
153 changes: 120 additions & 33 deletions src/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
#include <queue>
#include <functional>
#include <thread>
#include <limits>

#include "ConcurrentQueue.h"

namespace {

struct CandidateScore {
Score score;
Candidate candidate;
CandidateScore(Score score, Candidate candidate) : score(score), candidate(candidate) {}
CandidateIndex index;
CandidateScore(Score score, size_t thread_id, size_t index) : score(score), index(thread_id, index) {}

bool operator<(const CandidateScore& other) const {
return score > other.score;
Expand All @@ -17,29 +20,54 @@ struct CandidateScore {

typedef std::priority_queue<CandidateScore> CandidateScorePriorityQueue;

void thread_worker_filter(const Candidates &candidates,
size_t start, size_t end,
const Element &query, const Options &options,
size_t max_results,
CandidateScorePriorityQueue &results) {
if (start >= end || end >candidates.size())
return;
for (size_t i = start; i < end; i++) {
struct ThreadState {
ConcurrentQueue<Candidates> input;
CandidateScorePriorityQueue results;
ThreadState() = default;
};

void filter_internal(const Candidates &candidates,
size_t thread_id,
size_t start_index,
const Element &query, const Options &options,
size_t max_results,
CandidateScorePriorityQueue &results) {
for (size_t i=0; i<candidates.size(); i++) {
const auto &candidate = candidates[i];
if(candidate.empty()) continue;
auto scoreProvider = options.usePathScoring ? path_scorer_score : scorer_score;
auto score = scoreProvider(candidate, query, options);
if (score>0) {
results.emplace(score, candidate);
results.emplace(score, thread_id, start_index+i);
if (results.size() > max_results)
results.pop();
}
}
}

Candidates sort_priority_queue(CandidateScorePriorityQueue &candidates) {
void thread_worker_filter(ThreadState &thread_state, size_t thread_id,
const Candidates *initial_candidates,
const Element &query, const Options &options,
size_t max_results) {
size_t start_index = 0;
if (initial_candidates) {
filter_internal(*initial_candidates, thread_id, 0, query, options, max_results,
thread_state.results);
start_index += initial_candidates->size();
}
while (true) {
Candidates candidates;
thread_state.input.pop(candidates);
if(candidates.empty()) break;
filter_internal(candidates, thread_id, start_index, query, options, max_results,
thread_state.results);
start_index += candidates.size();
}
}

CandidateIndexes sort_priority_queue(CandidateScorePriorityQueue &candidates) {
vector<CandidateScore> sorted;
Candidates ret;
CandidateIndexes ret;
sorted.reserve(candidates.size());
ret.reserve(candidates.size());
while(!candidates.empty()) {
Expand All @@ -48,53 +76,112 @@ Candidates sort_priority_queue(CandidateScorePriorityQueue &candidates) {
}
std::sort(sorted.begin(), sorted.end());
for(const auto& item : sorted) {
ret.push_back(item.candidate);
ret.push_back(item.index);
}
return ret;
}

}
} // namespace

Candidates filter(const Candidates &candidates, const Element &query, const Options &options) {
CandidateIndexes filter(const vector<Candidates> &candidates, const Element &query, const Options &options) {
CandidateScorePriorityQueue top_k;
size_t max_results = options.max_results;
if (!max_results || max_results >= candidates.size())
max_results = candidates.size();
if (!max_results)
max_results = std::numeric_limits<size_t>::max();

if (candidates.size() < 10000) {
thread_worker_filter(candidates, 0, candidates.size(), query,
options, max_results, top_k);
if (candidates.size()==1) {
filter_internal(candidates[0], 0, 0, query, options, max_results, top_k);
return sort_priority_queue(top_k);
}

// Split the dataset and pass down to multiple threads.
const size_t max_threads = candidates.size();
vector<thread> threads;
vector<ThreadState> thread_state(max_threads);
for (size_t i = 0; i < max_threads; i++) {
threads.emplace_back(
thread_worker_filter, ref(thread_state[i]), i,
&candidates[i],
ref(query), ref(options), max_results);
}
// Push an empty vector for the threads to terminate.
for (size_t i = 0; i < max_threads; i++) {
Candidates t;
thread_state[i].input.push(t);
}
// Wait for threads to complete and merge the restuls.
for (size_t i = 0; i < max_threads; i++) {
threads[i].join();
auto &results = thread_state[i].results;
while(!results.empty()) {
top_k.emplace(results.top());
results.pop();
if (top_k.size() > max_results)
top_k.pop();
}
}
return sort_priority_queue(top_k);
}

Napi::Value filter_with_candidates(Napi::Env env, const Napi::Array &candidates,
const std::string &key, const std::string &query, const Options &options) {
CandidateScorePriorityQueue top_k;
size_t max_results = options.max_results;
if (!max_results)
max_results = std::numeric_limits<size_t>::max();

Napi::Array res = Napi::Array::New(env);
const size_t max_threads = 8;
vector<thread> threads;
vector<CandidateScorePriorityQueue> thread_results(max_threads);
vector<ThreadState> thread_state(max_threads);
vector<size_t> chunks;
vector<Candidates> initial_candidates(max_threads);
size_t cur_start = 0;
for (size_t i = 0; i < max_threads; i++) {
size_t chunk_size = candidates.size() / max_threads;
size_t chunk_size = candidates.Length() / max_threads;
// Distribute remainder among the chunks.
if (i < candidates.size() % max_threads) {
if (i < candidates.Length() % max_threads) {
chunk_size++;
}
for(size_t j=0; j<1000 && j<chunk_size; j++) {
initial_candidates[i].push_back(candidates[cur_start+j].ToObject().Get(key).ToString());
}
threads.emplace_back(
thread_worker_filter, ref(candidates),
cur_start, cur_start + chunk_size,
ref(query), ref(options),
max_results, ref(thread_results[i])
);
thread_worker_filter, ref(thread_state[i]), i,
&initial_candidates[i],
ref(query), ref(options), max_results);
cur_start += chunk_size;
chunks.push_back(cur_start);
}
for (size_t i = 0; i < max_threads; i++) {
Candidates c;
for(size_t j=(i==0)?1000:chunks[i-1]+1000; j<chunks[i]; j++) {
c.push_back(candidates[j].ToObject().Get(key).ToString());
}
thread_state[i].input.push(c);
}
// Push an empty vector for the threads to terminate.
for (size_t i = 0; i < max_threads; i++) {
Candidates t;
thread_state[i].input.push(t);
}
// Wait for threads to complete and merge the restuls.
for (size_t i = 0; i < max_threads; i++) {
threads[i].join();
while(!thread_results[i].empty()) {
top_k.emplace(thread_results[i].top());
thread_results[i].pop();
auto &results = thread_state[i].results;
while(!results.empty()) {
top_k.emplace(results.top());
results.pop();
if (top_k.size() > max_results)
top_k.pop();
}
}
return sort_priority_queue(top_k);
auto indexes = sort_priority_queue(top_k);
for(size_t i=0; i<indexes.size(); i++) {
size_t ind = indexes[i].index;
if (indexes[i].thread_id > 0)
ind += chunks[indexes[i].thread_id-1];
res[i] = Napi::Number::New(env, ind);
}
return res;
}
Loading

0 comments on commit c9f9801

Please sign in to comment.