From c340bc87ecad654b13553683026154efa0ce0436 Mon Sep 17 00:00:00 2001 From: liuzhenghua-jk Date: Fri, 5 Jul 2024 09:00:24 +0000 Subject: [PATCH] feature: max_active_requests flushable --- .../app/features/rate_limiting/rate_limit.py | 30 ++++++++++++++----- api/services/app_generate_service.py | 2 +- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/api/core/app/features/rate_limiting/rate_limit.py b/api/core/app/features/rate_limiting/rate_limit.py index 2e5e15f6f7ea42..5ca59a9479b9b3 100644 --- a/api/core/app/features/rate_limiting/rate_limit.py +++ b/api/core/app/features/rate_limiting/rate_limit.py @@ -11,7 +11,8 @@ class RateLimit: - _ACTIVE_REQUESTS = "dify:rate_limit:{}:active_requests" + _MAX_ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:max_active_requests" + _ACTIVE_REQUESTS_KEY = "dify:rate_limit:{}:active_requests" _UNLIMITED_REQUEST_ID = "unlimited_request_id" _REQUEST_MAX_ALIVE_TIME = 10 * 60 # 10 minutes _ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL = 5 * 60 # recalculate request_count from request_detail every 5 minutes @@ -29,28 +30,41 @@ def __init__(self, client_id: str, max_active_requests: int): self.initialized = True self.client_id = client_id self.max_active_requests = max_active_requests - self.active_requests_key = self._ACTIVE_REQUESTS.format(client_id) + self.active_requests_key = self._ACTIVE_REQUESTS_KEY.format(client_id) + self.max_active_requests_key = self._MAX_ACTIVE_REQUESTS_KEY.format(client_id) self.last_recalculate_time = float('-inf') - self.flush_active_requests() + self.flush_cache(use_local_value=True) - def flush_active_requests(self): + def flush_cache(self, use_local_value=False): + self.last_recalculate_time = time.time() + # flush max active requests + if use_local_value or not redis_client.exists(self.max_active_requests_key): + with redis_client.pipeline() as pipe: + pipe.set(self.max_active_requests_key, self.max_active_requests) + pipe.expire(self.max_active_requests_key, timedelta(days=1)) + pipe.execute() + else: + with redis_client.pipeline() as pipe: + self.max_active_requests = int(redis_client.get(self.max_active_requests_key).decode('utf-8')) + redis_client.expire(self.max_active_requests_key, timedelta(days=1)) + + # flush max active requests (in-transit request list) if not redis_client.exists(self.active_requests_key): return - redis_client.expire(self.active_requests_key, timedelta(days=1)) request_details = redis_client.hgetall(self.active_requests_key) + redis_client.expire(self.active_requests_key, timedelta(days=1)) timeout_requests = [k for k, v in request_details.items() if time.time() - float(v.decode('utf-8')) > RateLimit._REQUEST_MAX_ALIVE_TIME] if timeout_requests: redis_client.hdel(self.active_requests_key, *timeout_requests) - self.last_recalculate_time = time.time() def enter(self, request_id: Optional[str] = None) -> str: + if time.time() - self.last_recalculate_time > RateLimit._ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL: + self.flush_cache() if self.max_active_requests <= 0: return RateLimit._UNLIMITED_REQUEST_ID if not request_id: request_id = RateLimit.gen_request_key() - if time.time() - self.last_recalculate_time > RateLimit._ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL: - self.flush_active_requests() redis_client.hset(self.active_requests_key, request_id, str(time.time())) active_requests_count = redis_client.hlen(self.active_requests_key) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 4516b49dfd2a2a..09a50dd702739a 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -34,7 +34,7 @@ def generate(cls, app_model: App, rate_limit = RateLimit(app_model.id, max_active_request) request_id = RateLimit.gen_request_key() try: - rate_limit.enter(request_id) + request_id = rate_limit.enter(request_id) if app_model.mode == AppMode.COMPLETION.value: return CompletionAppGenerator().generate( app_model=app_model,