Skip to content

Commit

Permalink
mongo支持aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
xiyoo0812 committed Nov 9, 2023
1 parent df9eee8 commit af90ee6
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
6 changes: 6 additions & 0 deletions script/agent/mongo_agent.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ function MongoAgent:find_and_modify(db_query, hash_key, db_id)
return router_mgr:call_mongo_hash(key, "rpc_mongo_find_and_modify", db_id or MAIN_DBID, key, tunpack(db_query))
end

--db_query: {pipeline, cursor, [xxkey, xxvalue] ...}
function MongoAgent:aggregate(db_query, hash_key, db_id)
local key = hash_key or mrandom()
return router_mgr:call_mongo_hash(key, "rpc_mongo_aggregate", db_id or MAIN_DBID, key, tunpack(db_query))
end

--db_query: {coll_name, selector}
function MongoAgent:count(db_query, hash_key, db_id)
return router_mgr:call_mongo_hash(hash_key or mrandom(), "rpc_mongo_count", db_id or MAIN_DBID, tunpack(db_query))
Expand Down
6 changes: 6 additions & 0 deletions script/driver/mongo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,10 @@ function MongoDB:find_and_modify(co_name, update, selector, upsert, fields, new)
return self:runCommand("findAndModify", co_name, "query", selector, "update", update, "fields", fields, "upsert", upsert, "new", new)
end

-- https://docs.mongodb.com/manual/reference/command/aggregate/
-- pipeline: { { ["$match"]={pid = 123456} }, { ["$group"]={_id="date",count={["$sum"]=1}} } }
function MongoDB:aggregate(co_name, pipeline, cursor, ...)
return self:runCommand("aggregate", co_name, "pipeline", pipeline, "cursor", cursor, ...)
end

return MongoDB
11 changes: 11 additions & 0 deletions script/store/mongo_mgr.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ function MongoMgr:__init()
event_mgr:add_listener(self, "rpc_mongo_update", "update")
event_mgr:add_listener(self, "rpc_mongo_execute", "execute")
event_mgr:add_listener(self, "rpc_mongo_find_one", "find_one")
event_mgr:add_listener(self, "rpc_mongo_aggregate", "aggregate")
event_mgr:add_listener(self, "rpc_mongo_drop_indexes", "drop_indexes")
event_mgr:add_listener(self, "rpc_mongo_create_indexes", "create_indexes")
event_mgr:add_listener(self, "rpc_mongo_find_and_modify", "find_and_modify")
Expand Down Expand Up @@ -105,6 +106,16 @@ function MongoMgr:find_and_modify(db_id, primary_id, coll_name, obj, selector, u
return MONGO_FAILED, "mongo db not exist"
end

function MongoMgr:aggregate(db_id, coll_name, pipeline, cursor, ...)
log_debug("[MongoMgr][aggregate]: {}, pipeline:{}", coll_name, pipeline)
local mongodb = self:get_db(db_id)
if mongodb then
local ok, res_oe = mongodb:aggregate(coll_name, pipeline, cursor, ...)
return ok and SUCCESS or MONGO_FAILED, res_oe
end
return MONGO_FAILED, "mongo db not exist"
end

function MongoMgr:count(db_id, coll_name, selector, limit, skip)
local mongodb = self:get_db(db_id)
if mongodb then
Expand Down

0 comments on commit af90ee6

Please sign in to comment.