Skip to content

Commit

Permalink
feat:add apollo configuration to load env file
Browse files Browse the repository at this point in the history
issue:#11064
  • Loading branch information
刘欢(liuhuan101) committed Nov 26, 2024
1 parent 79db920 commit 66bdcc3
Show file tree
Hide file tree
Showing 11 changed files with 427 additions and 0 deletions.
6 changes: 6 additions & 0 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
```cli
cp .env.example .env
```
```
Please note:dify supports loading env file configuration from the configuration-center(apollo).
If you use apollo, first configure the dify configuration on apollo,
and then before starting the project, configure the following parameters in the environment variables:
CONFIGURATION_TYPE=apollo,APOLLO_APP_ID=xxx,APOLLO_CLUSTER=xxx,APOLLO_CONFIG_URL=xxx,APOLLO_NAMESPACE=xxx
```
3. Generate a `SECRET_KEY` in the `.env` file.

bash for Linux
Expand Down
2 changes: 2 additions & 0 deletions api/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from extensions.ext_configuration import Configuration
from .app_config import DifyConfig

Configuration().init_configuration()
dify_config = DifyConfig()
Empty file.
Empty file.
267 changes: 267 additions & 0 deletions api/extensions/configuration/apollo/apollo_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
import json
import threading
import time

from .util import *


class ApolloClient:

def __init__(self, config_url, app_id, cluster='default', secret='', start_hot_update=True,
change_listener=None, _notification_map=None):

# Core routing parameters
self.config_url = config_url
self.cluster = cluster
self.app_id = app_id

# Non-core parameters
self.ip = init_ip()
self.secret = secret

# Check the parameter variables

# Private control variables
self._cycle_time = 5
self._stopping = False
self._cache = {}
self._no_key = {}
self._hash = {}
self._pull_timeout = 75
self._cache_file_path = os.path.expanduser('~') + '/data/apollo/cache/'
self._long_poll_thread = None
self._change_listener = change_listener # "add" "delete" "update"
if _notification_map is None:
_notification_map = {'application': -1}
self._notification_map = _notification_map
self.last_release_key = None
# Private startup method
self._path_checker()
if start_hot_update:
self._start_hot_update()

# start the heartbeat thread
heartbeat = threading.Thread(target=self._heartBeat)
heartbeat.daemon = True
heartbeat.start()

def get_json_from_net(self, namespace='application'):
url = '{}/configs/{}/{}/{}?releaseKey={}&ip={}'.format(self.config_url, self.app_id, self.cluster, namespace,
"", self.ip)
try:
code, body = http_request(url, timeout=3, headers=self._signHeaders(url))
if code == 200:
data = json.loads(body)
data = data["configurations"]
return_data = {CONFIGURATIONS: data}
return return_data
else:
return None
except Exception as e:
logging.getLogger(__name__).error(str(e))
return None

def get_value(self, key, default_val=None, namespace='application'):
try:
# read memory configuration
namespace_cache = self._cache.get(namespace)
val = get_value_from_dict(namespace_cache, key)
if val is not None:
return val

no_key = no_key_cache_key(namespace, key)
if no_key in self._no_key:
return default_val

# read the network configuration
namespace_data = self.get_json_from_net(namespace)
val = get_value_from_dict(namespace_data, key)
if val is not None:
self._update_cache_and_file(namespace_data, namespace)
return val

# read the file configuration
namespace_cache = self._get_local_cache(namespace)
val = get_value_from_dict(namespace_cache, key)
if val is not None:
self._update_cache_and_file(namespace_cache, namespace)
return val

# If all of them are not obtained, the default value is returned and the local cache is set to None
self._set_local_cache_none(namespace, key)
return default_val
except Exception as e:
logging.getLogger(__name__).error("get_value has error, [key is %s], [namespace is %s], [error is %s], ",
key, namespace, e)
return default_val

# Set the key of a namespace to none, and do not set default val to ensure the real-time correctness of the function call.
# If the user does not have the same default val twice and the default val is used here, there may be a problem.
def _set_local_cache_none(self, namespace, key):
no_key = no_key_cache_key(namespace, key)
self._no_key[no_key] = key

def _start_hot_update(self):
self._long_poll_thread = threading.Thread(target=self._listener)
# When the asynchronous thread is started, the daemon thread will automatically exit when the main thread is launched.
self._long_poll_thread.daemon = True
self._long_poll_thread.start()

def stop(self):
self._stopping = True
logging.getLogger(__name__).info("Stopping listener...")

# Call the set callback function, and if it is abnormal, try it out
def _call_listener(self, namespace, old_kv, new_kv):
if self._change_listener is None:
return
if old_kv is None:
old_kv = {}
if new_kv is None:
new_kv = {}
try:
for key in old_kv:
new_value = new_kv.get(key)
old_value = old_kv.get(key)
if new_value is None:
# If newValue is empty, it means key, and the value is deleted.
self._change_listener("delete", namespace, key, old_value)
continue
if new_value != old_value:
self._change_listener("update", namespace, key, new_value)
continue
for key in new_kv:
new_value = new_kv.get(key)
old_value = old_kv.get(key)
if old_value is None:
self._change_listener("add", namespace, key, new_value)
except BaseException as e:
logging.getLogger(__name__).warning(str(e))

def _path_checker(self):
if not os.path.isdir(self._cache_file_path):
makedirs_wrapper(self._cache_file_path)

# update the local cache and file cache
def _update_cache_and_file(self, namespace_data, namespace='application'):
# update the local cache
self._cache[namespace] = namespace_data
# update the file cache
new_string = json.dumps(namespace_data)
new_hash = hashlib.md5(new_string.encode('utf-8')).hexdigest()
if self._hash.get(namespace) == new_hash:
pass
else:
with open(os.path.join(self._cache_file_path, '%s_configuration_%s.txt' % (self.app_id, namespace)),
'w') as f:
f.write(new_string)
self._hash[namespace] = new_hash

# get the configuration from the local file
def _get_local_cache(self, namespace='application'):
cache_file_path = os.path.join(self._cache_file_path, '%s_configuration_%s.txt' % (self.app_id, namespace))
if os.path.isfile(cache_file_path):
with open(cache_file_path) as f:
result = json.loads(f.readline())
return result
return {}

def _long_poll(self):
notifications = []
for key in self._cache:
namespace_data = self._cache[key]
notification_id = -1
if NOTIFICATION_ID in namespace_data:
notification_id = self._cache[key][NOTIFICATION_ID]
notifications.append({
NAMESPACE_NAME: key,
NOTIFICATION_ID: notification_id
})
try:
# if the length is 0 it is returned directly
if len(notifications) == 0:
return
url = '{}/notifications/v2'.format(self.config_url)
params = {
'appId': self.app_id,
'cluster': self.cluster,
'notifications': json.dumps(notifications, ensure_ascii=False)
}
param_str = url_encode_wrapper(params)
url = url + '?' + param_str
code, body = http_request(url, self._pull_timeout, headers=self._signHeaders(url))
http_code = code
if http_code == 304:
logging.getLogger(__name__).debug('No change, loop...')
return
if http_code == 200:
data = json.loads(body)
for entry in data:
namespace = entry[NAMESPACE_NAME]
n_id = entry[NOTIFICATION_ID]
logging.getLogger(__name__).info("%s has changes: notificationId=%d", namespace, n_id)
self._get_net_and_set_local(namespace, n_id, call_change=True)
return
else:
logging.getLogger(__name__).warning('Sleep...')
except Exception as e:
logging.getLogger(__name__).warning(str(e))

def _get_net_and_set_local(self, namespace, n_id, call_change=False):
namespace_data = self.get_json_from_net(namespace)
namespace_data[NOTIFICATION_ID] = n_id
old_namespace = self._cache.get(namespace)
self._update_cache_and_file(namespace_data, namespace)
if self._change_listener is not None and call_change:
old_kv = old_namespace.get(CONFIGURATIONS)
new_kv = namespace_data.get(CONFIGURATIONS)
self._call_listener(namespace, old_kv, new_kv)

def _listener(self):
logging.getLogger(__name__).info('start long_poll')
while not self._stopping:
self._long_poll()
time.sleep(self._cycle_time)
logging.getLogger(__name__).info("stopped, long_poll")

# add the need for endorsement to the header
def _signHeaders(self, url):
headers = {}
if self.secret == '':
return headers
uri = url[len(self.config_url):len(url)]
time_unix_now = str(int(round(time.time() * 1000)))
headers['Authorization'] = 'Apollo ' + self.app_id + ':' + signature(time_unix_now, uri, self.secret)
headers['Timestamp'] = time_unix_now
return headers

def _heartBeat(self):
while not self._stopping:
for namespace in self._notification_map:
self._do_heartBeat(namespace)
time.sleep(60 * 10) # 10分钟

def _do_heartBeat(self, namespace):
url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_url, self.app_id, self.cluster, namespace,
self.ip)
try:
code, body = http_request(url, timeout=3, headers=self._signHeaders(url))
if code == 200:
data = json.loads(body)
if self.last_release_key == data["releaseKey"]:
return None
self.last_release_key = data["releaseKey"]
data = data["configurations"]
self._update_cache_and_file(data, namespace)
else:
return None
except Exception as e:
logging.getLogger(__name__).error(str(e))
return None

def get_all_dicts(self, namespace):
namespace_data = self._cache.get(namespace)
if namespace_data is None:
namespace_data = self.get_json_from_net(namespace).get(CONFIGURATIONS)
self._update_cache_and_file(namespace_data, namespace)
return namespace_data
39 changes: 39 additions & 0 deletions api/extensions/configuration/apollo/python_3x.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
import os
import ssl
import urllib.request
from urllib import parse
from urllib.error import HTTPError

# Create an SSL context that allows for a lower level of security
ssl_context = ssl.create_default_context()
ssl_context.set_ciphers('HIGH:!DH:!aNULL')
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

# Create an opener object and pass in a custom SSL context
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ssl_context))

urllib.request.install_opener(opener)


def http_request(url, timeout, headers={}):
try:
request = urllib.request.Request(url, headers=headers)
res = urllib.request.urlopen(request, timeout=timeout)
body = res.read().decode("utf-8")
return res.code, body
except HTTPError as e:
if e.code == 304:
logging.getLogger(__name__).warning("http_request error,code is 304, maybe you should check secret")
return 304, None
logging.getLogger(__name__).warning("http_request error,code is %d, msg is %s", e.code, e.msg)
raise e


def url_encode(params):
return parse.urlencode(params)


def makedirs_wrapper(path):
os.makedirs(path, exist_ok=True)
48 changes: 48 additions & 0 deletions api/extensions/configuration/apollo/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import hashlib
import socket

from .python_3x import *

# define constants
CONFIGURATIONS = "configurations"
NOTIFICATION_ID = "notificationId"
NAMESPACE_NAME = "namespaceName"


# add timestamps uris and keys
def signature(timestamp, uri, secret):
import base64
import hmac
string_to_sign = '' + timestamp + '\n' + uri
hmac_code = hmac.new(secret.encode(), string_to_sign.encode(), hashlib.sha1).digest()
return base64.b64encode(hmac_code).decode()


def url_encode_wrapper(params):
return url_encode(params)


def no_key_cache_key(namespace, key):
return "{}{}{}".format(namespace, len(namespace), key)


# Returns whether the obtained value is obtained, and None if it does not
def get_value_from_dict(namespace_cache, key):
if namespace_cache:
kv_data = namespace_cache.get(CONFIGURATIONS)
if kv_data is None:
return None
if key in kv_data:
return kv_data[key]
return None


def init_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 53))
ip = s.getsockname()[0]
return ip
finally:
s.close()
return ""
22 changes: 22 additions & 0 deletions api/extensions/configuration/apollo_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os

from extensions.configuration.apollo.apollo_client import ApolloClient
from extensions.configuration.base_configuration import BaseConfiguration


class ApolloConfiguration(BaseConfiguration):
"""Implementation for Apollo Configuration."""

def load_config_to_env_file(self):
client = ApolloClient(app_id=os.environ.get("APOLLO_APP_ID"), cluster=os.environ.get("APOLLO_CLUSTER"),
config_url=os.environ.get("APOLLO_CONFIG_URL"), start_hot_update=False,
_notification_map={os.environ.get("APOLLO_NAMESPACE"): -1})

# Get the path to the .env file
env_path = os.path.join(os.getcwd(), '.env')

apollo_config_dicts = client.get_all_dicts(os.environ.get("APOLLO_NAMESPACE"))
# Obtain the value of the configuration item from the Apollo configuration center and write it to the .env file
with open(env_path, 'w') as f:
for key, value in apollo_config_dicts.items():
f.write(f"{key}={value}\n")
Loading

0 comments on commit 66bdcc3

Please sign in to comment.