From 66bdcc35aee749f7ea47b644d87d2e0067a6e073 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=AC=A2=28liuhuan101=29?= Date: Tue, 26 Nov 2024 19:10:09 +0800 Subject: [PATCH] feat:add apollo configuration to load env file issue:#11064 --- api/README.md | 6 + api/configs/__init__.py | 2 + api/extensions/configuration/__init__.py | 0 .../configuration/apollo/__init__.py | 0 .../configuration/apollo/apollo_client.py | 267 ++++++++++++++++++ .../configuration/apollo/python_3x.py | 39 +++ api/extensions/configuration/apollo/util.py | 48 ++++ .../configuration/apollo_configuration.py | 22 ++ .../configuration/base_configuration.py | 14 + .../configuration/configuration_type.py | 5 + api/extensions/ext_configuration.py | 24 ++ 11 files changed, 427 insertions(+) create mode 100644 api/extensions/configuration/__init__.py create mode 100644 api/extensions/configuration/apollo/__init__.py create mode 100644 api/extensions/configuration/apollo/apollo_client.py create mode 100755 api/extensions/configuration/apollo/python_3x.py create mode 100755 api/extensions/configuration/apollo/util.py create mode 100644 api/extensions/configuration/apollo_configuration.py create mode 100644 api/extensions/configuration/base_configuration.py create mode 100644 api/extensions/configuration/configuration_type.py create mode 100644 api/extensions/ext_configuration.py diff --git a/api/README.md b/api/README.md index 461dac4759c6c4..6323f234da7933 100644 --- a/api/README.md +++ b/api/README.md @@ -22,6 +22,12 @@ ```cli cp .env.example .env ``` + ``` + Please note:dify supports loading env file configuration from the configuration-center(apollo). + If you use apollo, first configure the dify configuration on apollo, + and then before starting the project, configure the following parameters in the environment variables: + CONFIGURATION_TYPE=apollo,APOLLO_APP_ID=xxx,APOLLO_CLUSTER=xxx,APOLLO_CONFIG_URL=xxx,APOLLO_NAMESPACE=xxx + ``` 3. Generate a `SECRET_KEY` in the `.env` file. bash for Linux diff --git a/api/configs/__init__.py b/api/configs/__init__.py index 3a172601c96382..f816cc42069447 100644 --- a/api/configs/__init__.py +++ b/api/configs/__init__.py @@ -1,3 +1,5 @@ +from extensions.ext_configuration import Configuration from .app_config import DifyConfig +Configuration().init_configuration() dify_config = DifyConfig() diff --git a/api/extensions/configuration/__init__.py b/api/extensions/configuration/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/api/extensions/configuration/apollo/__init__.py b/api/extensions/configuration/apollo/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/api/extensions/configuration/apollo/apollo_client.py b/api/extensions/configuration/apollo/apollo_client.py new file mode 100644 index 00000000000000..6d5eb0b3643cde --- /dev/null +++ b/api/extensions/configuration/apollo/apollo_client.py @@ -0,0 +1,267 @@ +import json +import threading +import time + +from .util import * + + +class ApolloClient: + + def __init__(self, config_url, app_id, cluster='default', secret='', start_hot_update=True, + change_listener=None, _notification_map=None): + + # Core routing parameters + self.config_url = config_url + self.cluster = cluster + self.app_id = app_id + + # Non-core parameters + self.ip = init_ip() + self.secret = secret + + # Check the parameter variables + + # Private control variables + self._cycle_time = 5 + self._stopping = False + self._cache = {} + self._no_key = {} + self._hash = {} + self._pull_timeout = 75 + self._cache_file_path = os.path.expanduser('~') + '/data/apollo/cache/' + self._long_poll_thread = None + self._change_listener = change_listener # "add" "delete" "update" + if _notification_map is None: + _notification_map = {'application': -1} + self._notification_map = _notification_map + self.last_release_key = None + # Private startup method + self._path_checker() + if start_hot_update: + self._start_hot_update() + + # start the heartbeat thread + heartbeat = threading.Thread(target=self._heartBeat) + heartbeat.daemon = True + heartbeat.start() + + def get_json_from_net(self, namespace='application'): + url = '{}/configs/{}/{}/{}?releaseKey={}&ip={}'.format(self.config_url, self.app_id, self.cluster, namespace, + "", self.ip) + try: + code, body = http_request(url, timeout=3, headers=self._signHeaders(url)) + if code == 200: + data = json.loads(body) + data = data["configurations"] + return_data = {CONFIGURATIONS: data} + return return_data + else: + return None + except Exception as e: + logging.getLogger(__name__).error(str(e)) + return None + + def get_value(self, key, default_val=None, namespace='application'): + try: + # read memory configuration + namespace_cache = self._cache.get(namespace) + val = get_value_from_dict(namespace_cache, key) + if val is not None: + return val + + no_key = no_key_cache_key(namespace, key) + if no_key in self._no_key: + return default_val + + # read the network configuration + namespace_data = self.get_json_from_net(namespace) + val = get_value_from_dict(namespace_data, key) + if val is not None: + self._update_cache_and_file(namespace_data, namespace) + return val + + # read the file configuration + namespace_cache = self._get_local_cache(namespace) + val = get_value_from_dict(namespace_cache, key) + if val is not None: + self._update_cache_and_file(namespace_cache, namespace) + return val + + # If all of them are not obtained, the default value is returned and the local cache is set to None + self._set_local_cache_none(namespace, key) + return default_val + except Exception as e: + logging.getLogger(__name__).error("get_value has error, [key is %s], [namespace is %s], [error is %s], ", + key, namespace, e) + return default_val + + # Set the key of a namespace to none, and do not set default val to ensure the real-time correctness of the function call. + # If the user does not have the same default val twice and the default val is used here, there may be a problem. + def _set_local_cache_none(self, namespace, key): + no_key = no_key_cache_key(namespace, key) + self._no_key[no_key] = key + + def _start_hot_update(self): + self._long_poll_thread = threading.Thread(target=self._listener) + # When the asynchronous thread is started, the daemon thread will automatically exit when the main thread is launched. + self._long_poll_thread.daemon = True + self._long_poll_thread.start() + + def stop(self): + self._stopping = True + logging.getLogger(__name__).info("Stopping listener...") + + # Call the set callback function, and if it is abnormal, try it out + def _call_listener(self, namespace, old_kv, new_kv): + if self._change_listener is None: + return + if old_kv is None: + old_kv = {} + if new_kv is None: + new_kv = {} + try: + for key in old_kv: + new_value = new_kv.get(key) + old_value = old_kv.get(key) + if new_value is None: + # If newValue is empty, it means key, and the value is deleted. + self._change_listener("delete", namespace, key, old_value) + continue + if new_value != old_value: + self._change_listener("update", namespace, key, new_value) + continue + for key in new_kv: + new_value = new_kv.get(key) + old_value = old_kv.get(key) + if old_value is None: + self._change_listener("add", namespace, key, new_value) + except BaseException as e: + logging.getLogger(__name__).warning(str(e)) + + def _path_checker(self): + if not os.path.isdir(self._cache_file_path): + makedirs_wrapper(self._cache_file_path) + + # update the local cache and file cache + def _update_cache_and_file(self, namespace_data, namespace='application'): + # update the local cache + self._cache[namespace] = namespace_data + # update the file cache + new_string = json.dumps(namespace_data) + new_hash = hashlib.md5(new_string.encode('utf-8')).hexdigest() + if self._hash.get(namespace) == new_hash: + pass + else: + with open(os.path.join(self._cache_file_path, '%s_configuration_%s.txt' % (self.app_id, namespace)), + 'w') as f: + f.write(new_string) + self._hash[namespace] = new_hash + + # get the configuration from the local file + def _get_local_cache(self, namespace='application'): + cache_file_path = os.path.join(self._cache_file_path, '%s_configuration_%s.txt' % (self.app_id, namespace)) + if os.path.isfile(cache_file_path): + with open(cache_file_path) as f: + result = json.loads(f.readline()) + return result + return {} + + def _long_poll(self): + notifications = [] + for key in self._cache: + namespace_data = self._cache[key] + notification_id = -1 + if NOTIFICATION_ID in namespace_data: + notification_id = self._cache[key][NOTIFICATION_ID] + notifications.append({ + NAMESPACE_NAME: key, + NOTIFICATION_ID: notification_id + }) + try: + # if the length is 0 it is returned directly + if len(notifications) == 0: + return + url = '{}/notifications/v2'.format(self.config_url) + params = { + 'appId': self.app_id, + 'cluster': self.cluster, + 'notifications': json.dumps(notifications, ensure_ascii=False) + } + param_str = url_encode_wrapper(params) + url = url + '?' + param_str + code, body = http_request(url, self._pull_timeout, headers=self._signHeaders(url)) + http_code = code + if http_code == 304: + logging.getLogger(__name__).debug('No change, loop...') + return + if http_code == 200: + data = json.loads(body) + for entry in data: + namespace = entry[NAMESPACE_NAME] + n_id = entry[NOTIFICATION_ID] + logging.getLogger(__name__).info("%s has changes: notificationId=%d", namespace, n_id) + self._get_net_and_set_local(namespace, n_id, call_change=True) + return + else: + logging.getLogger(__name__).warning('Sleep...') + except Exception as e: + logging.getLogger(__name__).warning(str(e)) + + def _get_net_and_set_local(self, namespace, n_id, call_change=False): + namespace_data = self.get_json_from_net(namespace) + namespace_data[NOTIFICATION_ID] = n_id + old_namespace = self._cache.get(namespace) + self._update_cache_and_file(namespace_data, namespace) + if self._change_listener is not None and call_change: + old_kv = old_namespace.get(CONFIGURATIONS) + new_kv = namespace_data.get(CONFIGURATIONS) + self._call_listener(namespace, old_kv, new_kv) + + def _listener(self): + logging.getLogger(__name__).info('start long_poll') + while not self._stopping: + self._long_poll() + time.sleep(self._cycle_time) + logging.getLogger(__name__).info("stopped, long_poll") + + # add the need for endorsement to the header + def _signHeaders(self, url): + headers = {} + if self.secret == '': + return headers + uri = url[len(self.config_url):len(url)] + time_unix_now = str(int(round(time.time() * 1000))) + headers['Authorization'] = 'Apollo ' + self.app_id + ':' + signature(time_unix_now, uri, self.secret) + headers['Timestamp'] = time_unix_now + return headers + + def _heartBeat(self): + while not self._stopping: + for namespace in self._notification_map: + self._do_heartBeat(namespace) + time.sleep(60 * 10) # 10分钟 + + def _do_heartBeat(self, namespace): + url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_url, self.app_id, self.cluster, namespace, + self.ip) + try: + code, body = http_request(url, timeout=3, headers=self._signHeaders(url)) + if code == 200: + data = json.loads(body) + if self.last_release_key == data["releaseKey"]: + return None + self.last_release_key = data["releaseKey"] + data = data["configurations"] + self._update_cache_and_file(data, namespace) + else: + return None + except Exception as e: + logging.getLogger(__name__).error(str(e)) + return None + + def get_all_dicts(self, namespace): + namespace_data = self._cache.get(namespace) + if namespace_data is None: + namespace_data = self.get_json_from_net(namespace).get(CONFIGURATIONS) + self._update_cache_and_file(namespace_data, namespace) + return namespace_data diff --git a/api/extensions/configuration/apollo/python_3x.py b/api/extensions/configuration/apollo/python_3x.py new file mode 100755 index 00000000000000..9077a002b32249 --- /dev/null +++ b/api/extensions/configuration/apollo/python_3x.py @@ -0,0 +1,39 @@ +import logging +import os +import ssl +import urllib.request +from urllib import parse +from urllib.error import HTTPError + +# Create an SSL context that allows for a lower level of security +ssl_context = ssl.create_default_context() +ssl_context.set_ciphers('HIGH:!DH:!aNULL') +ssl_context.check_hostname = False +ssl_context.verify_mode = ssl.CERT_NONE + +# Create an opener object and pass in a custom SSL context +opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ssl_context)) + +urllib.request.install_opener(opener) + + +def http_request(url, timeout, headers={}): + try: + request = urllib.request.Request(url, headers=headers) + res = urllib.request.urlopen(request, timeout=timeout) + body = res.read().decode("utf-8") + return res.code, body + except HTTPError as e: + if e.code == 304: + logging.getLogger(__name__).warning("http_request error,code is 304, maybe you should check secret") + return 304, None + logging.getLogger(__name__).warning("http_request error,code is %d, msg is %s", e.code, e.msg) + raise e + + +def url_encode(params): + return parse.urlencode(params) + + +def makedirs_wrapper(path): + os.makedirs(path, exist_ok=True) diff --git a/api/extensions/configuration/apollo/util.py b/api/extensions/configuration/apollo/util.py new file mode 100755 index 00000000000000..6b5d7c2e812d0e --- /dev/null +++ b/api/extensions/configuration/apollo/util.py @@ -0,0 +1,48 @@ +import hashlib +import socket + +from .python_3x import * + +# define constants +CONFIGURATIONS = "configurations" +NOTIFICATION_ID = "notificationId" +NAMESPACE_NAME = "namespaceName" + + +# add timestamps uris and keys +def signature(timestamp, uri, secret): + import base64 + import hmac + string_to_sign = '' + timestamp + '\n' + uri + hmac_code = hmac.new(secret.encode(), string_to_sign.encode(), hashlib.sha1).digest() + return base64.b64encode(hmac_code).decode() + + +def url_encode_wrapper(params): + return url_encode(params) + + +def no_key_cache_key(namespace, key): + return "{}{}{}".format(namespace, len(namespace), key) + + +# Returns whether the obtained value is obtained, and None if it does not +def get_value_from_dict(namespace_cache, key): + if namespace_cache: + kv_data = namespace_cache.get(CONFIGURATIONS) + if kv_data is None: + return None + if key in kv_data: + return kv_data[key] + return None + + +def init_ip(): + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(('8.8.8.8', 53)) + ip = s.getsockname()[0] + return ip + finally: + s.close() + return "" diff --git a/api/extensions/configuration/apollo_configuration.py b/api/extensions/configuration/apollo_configuration.py new file mode 100644 index 00000000000000..508ba9a87bce5e --- /dev/null +++ b/api/extensions/configuration/apollo_configuration.py @@ -0,0 +1,22 @@ +import os + +from extensions.configuration.apollo.apollo_client import ApolloClient +from extensions.configuration.base_configuration import BaseConfiguration + + +class ApolloConfiguration(BaseConfiguration): + """Implementation for Apollo Configuration.""" + + def load_config_to_env_file(self): + client = ApolloClient(app_id=os.environ.get("APOLLO_APP_ID"), cluster=os.environ.get("APOLLO_CLUSTER"), + config_url=os.environ.get("APOLLO_CONFIG_URL"), start_hot_update=False, + _notification_map={os.environ.get("APOLLO_NAMESPACE"): -1}) + + # Get the path to the .env file + env_path = os.path.join(os.getcwd(), '.env') + + apollo_config_dicts = client.get_all_dicts(os.environ.get("APOLLO_NAMESPACE")) + # Obtain the value of the configuration item from the Apollo configuration center and write it to the .env file + with open(env_path, 'w') as f: + for key, value in apollo_config_dicts.items(): + f.write(f"{key}={value}\n") diff --git a/api/extensions/configuration/base_configuration.py b/api/extensions/configuration/base_configuration.py new file mode 100644 index 00000000000000..1bb3deb14f3166 --- /dev/null +++ b/api/extensions/configuration/base_configuration.py @@ -0,0 +1,14 @@ +"""Abstract interface for configuration center implementations.""" + +from abc import ABC, abstractmethod + + +class BaseConfiguration(ABC): + """Interface for configuration center.""" + + def __init__(self): # noqa: B027 + pass + + @abstractmethod + def load_config_to_env_file(self): + raise NotImplementedError diff --git a/api/extensions/configuration/configuration_type.py b/api/extensions/configuration/configuration_type.py new file mode 100644 index 00000000000000..ff1b65cac71f8b --- /dev/null +++ b/api/extensions/configuration/configuration_type.py @@ -0,0 +1,5 @@ +from enum import StrEnum + + +class ConfigurationType(StrEnum): + APOLLO = "apollo" diff --git a/api/extensions/ext_configuration.py b/api/extensions/ext_configuration.py new file mode 100644 index 00000000000000..f5dc920e0f5af2 --- /dev/null +++ b/api/extensions/ext_configuration.py @@ -0,0 +1,24 @@ +import os + +from extensions.configuration.base_configuration import BaseConfiguration +from extensions.configuration.configuration_type import ConfigurationType + + +class Configuration: + def __init__(self): + self.storage_runner = None + + def init_configuration(self): + configuration_type = os.environ.get("CONFIGURATION_TYPE") + if configuration_type: + storage_factory = self.get_storage_factory(configuration_type) + self.storage_runner = storage_factory() + self.storage_runner.load_config_to_env_file() + + @staticmethod + def get_storage_factory(storage_type: str) -> type[BaseConfiguration]: + match storage_type: + case ConfigurationType.APOLLO: + from extensions.configuration.apollo_configuration import ApolloConfiguration + + return ApolloConfiguration