Skip to content

Commit

Permalink
feature: max_active_requests flushable
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzhenghua-jk committed Jul 5, 2024
1 parent acd0269 commit c340bc8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
30 changes: 22 additions & 8 deletions api/core/app/features/rate_limiting/rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@


class RateLimit:
_ACTIVE_REQUESTS = "dify:rate_limit:{}:active_requests"
_MAX_ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:max_active_requests"
_ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:active_requests"
_UNLIMITED_REQUEST_ID = "unlimited_request_id"
_REQUEST_MAX_ALIVE_TIME = 10 * 60 # 10 minutes
_ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL = 5 * 60 # recalculate request_count from request_detail every 5 minutes
Expand All @@ -29,28 +30,41 @@ def __init__(self, client_id: str, max_active_requests: int):
self.initialized = True
self.client_id = client_id
self.max_active_requests = max_active_requests
self.active_requests_key = self._ACTIVE_REQUESTS.format(client_id)
self.active_requests_key = self._ACTIVE_REQUESTS_KEY.format(client_id)
self.max_active_requests_key = self._MAX_ACTIVE_REQUESTS_KEY.format(client_id)
self.last_recalculate_time = float('-inf')
self.flush_active_requests()
self.flush_cache(use_local_value=True)

def flush_active_requests(self):
def flush_cache(self, use_local_value=False):
self.last_recalculate_time = time.time()
# flush max active requests
if use_local_value or not redis_client.exists(self.max_active_requests_key):
with redis_client.pipeline() as pipe:
pipe.set(self.max_active_requests_key, self.max_active_requests)
pipe.expire(self.max_active_requests_key, timedelta(days=1))
pipe.execute()
else:
with redis_client.pipeline() as pipe:
self.max_active_requests = int(redis_client.get(self.max_active_requests_key).decode('utf-8'))
redis_client.expire(self.max_active_requests_key, timedelta(days=1))

# flush max active requests (in-transit request list)
if not redis_client.exists(self.active_requests_key):
return
redis_client.expire(self.active_requests_key, timedelta(days=1))
request_details = redis_client.hgetall(self.active_requests_key)
redis_client.expire(self.active_requests_key, timedelta(days=1))
timeout_requests = [k for k, v in request_details.items() if
time.time() - float(v.decode('utf-8')) > RateLimit._REQUEST_MAX_ALIVE_TIME]
if timeout_requests:
redis_client.hdel(self.active_requests_key, *timeout_requests)
self.last_recalculate_time = time.time()

def enter(self, request_id: Optional[str] = None) -> str:
if time.time() - self.last_recalculate_time > RateLimit._ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL:
self.flush_cache()
if self.max_active_requests <= 0:
return RateLimit._UNLIMITED_REQUEST_ID
if not request_id:
request_id = RateLimit.gen_request_key()
if time.time() - self.last_recalculate_time > RateLimit._ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL:
self.flush_active_requests()

redis_client.hset(self.active_requests_key, request_id, str(time.time()))
active_requests_count = redis_client.hlen(self.active_requests_key)
Expand Down
2 changes: 1 addition & 1 deletion api/services/app_generate_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def generate(cls, app_model: App,
rate_limit = RateLimit(app_model.id, max_active_request)
request_id = RateLimit.gen_request_key()
try:
rate_limit.enter(request_id)
request_id = rate_limit.enter(request_id)
if app_model.mode == AppMode.COMPLETION.value:
return CompletionAppGenerator().generate(
app_model=app_model,
Expand Down

0 comments on commit c340bc8

Please sign in to comment.