Skip to content

Commit

Permalink
feat: 服务注册与发现
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxiaolulu committed Aug 30, 2024
1 parent 07a6e18 commit 71c0f4d
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 358 deletions.
304 changes: 18 additions & 286 deletions .idea/workspace.xml

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions unit-backend/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,7 @@
}

ASGI_APPLICATION = "config.asgi.application"

CONSUL_HOST = "127.0.0.1"
CONSUL_PORT = 8500
CONSUL_DNS_PORT = 8600
49 changes: 49 additions & 0 deletions unit-backend/utils/grpconsul.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import random
import socket
from dns import resolver
from config import settings


class ServiceConsul(object):

def __init__(self):
self.consul_host = settings.CONSUL_HOST
self.consul_dns_port = settings.CONSUL_DNS_PORT

@staticmethod
def extract_ip_prefix(encoded_address: str):
match = encoded_address.split(".")[0]
return match

def decode_address(self, encoded_address):
hex_address = self.extract_ip_prefix(encoded_address)
binary_address = bytes.fromhex(hex_address)
return socket.inet_ntoa(binary_address)

def fetch_user_service_addresses(self, srv_record_name):
custom_resolver = resolver.Resolver()
custom_resolver.nameservers = [self.consul_host]
custom_resolver.port = self.consul_dns_port

try:
answer = custom_resolver.resolve(srv_record_name, 'SRV')
selected_rdata = random.choice(answer)
ip_address = self.decode_address(selected_rdata.target.to_text())
return ip_address, selected_rdata.port
except resolver.NoAnswer as e:
print(f"No answer found for the query: {srv_record_name}")
except resolver.NXDOMAIN as e:
print(f"Domain not found: {srv_record_name}")
except Exception as e:
print(f"Error occurred while resolving DNS: {str(e)}")


def main():
srv_record_name = 'unit_executor.service.consul'
cons = ServiceConsul()
address, port = cons.fetch_user_service_addresses(srv_record_name)
print(address, port)


if __name__ == "__main__":
main()
4 changes: 0 additions & 4 deletions unit-executor/register/consul.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import uuid
from abc import ABC
import consul
import requests
import random


class ConsulRegister(object):
Expand Down
10 changes: 7 additions & 3 deletions unit-executor/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
atomic_bomb_engine==0.41.3
atomic_bomb_engine~=0.41.0
Faker==8.11.0
grpcio==1.64.1
jmespath==1.0.1
Expand All @@ -7,10 +7,14 @@ loguru==0.6.0
MySQL-python==1.2.5
numpy==1.24.2
protobuf==5.28.0
pydantic==2.8.2
pydantic~=2.7.2
python_dateutil==2.8.2
Requests==2.32.3
requests_toolbelt==1.0.0
rsa==4.7.2
simplejson==3.17.0
urllib3==2.2.2
urllib3~=2.2.1
typing~=3.7.4.3
python-dateutil~=2.8.2
asyncio~=3.4.3
mysqlclient~=2.1.1
179 changes: 114 additions & 65 deletions unit-executor/unitrunner/test/base.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,115 @@
import json
import sys
from unitrunner.engine.base import run_test, run_api
from utils.logger import logger
# import json
# import sys
# from unitrunner.engine.base import run_test, run_api
# from utils.logger import logger
#
# if __name__ == '__main__':
# # "loop/for/while/http/if"
# case_data = [{
# 'name': "测试场景名称1",
# 'cases': [{
# "title": "测试用例2",
# "host": "http://httpbin.org/post",
# "interface": {
# "url": "/post",
# "name": "登录",
# "method": "post",
# },
# "headers": {
# 'content-Type': "application/json"
# },
# "request": {
# 'json': {"mobile_phone": "${{user_mobile}}", "pwd": "lemonban"},
# },
# 'setup_script': "print('前置脚本123')",
# 'teardown_script': "test.assertion('相等',200,response.status_code)"
# }]
# }]
# config = {
# 'ENV': {
# "host": 'http://httpbin.org',
# 'user_mobile': 999999999},
# 'db': [{}, {}],
# 'global_func': "print('前置脚本123')",
# 'rerun': 1
# }
# # response = run_test(env_config=config, case_data=case_data, debug=False)
# # sys.stdout.write("测试结果\n")
# # sys.stdout.write(str(response))
# # sys.stdout.write("\n测试结果\n")
# api_doc = api_data = {
# "type": "loop",
# "parameters": {
# "count": 2
# },
# "children": [{
# "type": "http",
# "title": "demo",
# "interface": {
# "url": "http://httpbin.org/post",
# "name": "33333",
# "method": "POST"
# },
# "headers": {},
# "request": {"data": {}},
# "setup_script": "",
# "teardown_script": "",
# "extract": {},
# "validators": []
# }]
# }
# responses = run_api(api_data=api_doc)
# sys.stdout.write(str(responses))
# logger.info(
# f"-------- 测试结果 ----------\n"
# f"{json.dumps(responses, indent=4, ensure_ascii=False)}\n"
# )

if __name__ == '__main__':
# "loop/for/while/http/if"
case_data = [{
'name': "测试场景名称1",
'cases': [{
"title": "测试用例2",
"host": "http://httpbin.org/post",
"interface": {
"url": "/post",
"name": "登录",
"method": "post",
},
"headers": {
'content-Type': "application/json"
},
"request": {
'json': {"mobile_phone": "${{user_mobile}}", "pwd": "lemonban"},
},
'setup_script': "print('前置脚本123')",
'teardown_script': "test.assertion('相等',200,response.status_code)"
}]
}]
config = {
'ENV': {
"host": 'http://httpbin.org',
'user_mobile': 999999999},
'db': [{}, {}],
'global_func': "print('前置脚本123')",
'rerun': 1
}
# response = run_test(env_config=config, case_data=case_data, debug=False)
# sys.stdout.write("测试结果\n")
# sys.stdout.write(str(response))
# sys.stdout.write("\n测试结果\n")
api_doc = api_data = {
"type": "loop",
"parameters": {
"count": 2
},
"children": [{
"type": "http",
"title": "demo",
"interface": {
"url": "http://httpbin.org/post",
"name": "33333",
"method": "POST"
},
"headers": {},
"request": {"data": {}},
"setup_script": "",
"teardown_script": "",
"extract": {},
"validators": []
}]
}
responses = run_api(api_data=api_doc)
sys.stdout.write(str(responses))
logger.info(
f"-------- 测试结果 ----------\n"
f"{json.dumps(responses, indent=4, ensure_ascii=False)}\n"
)
import random
import re
import socket
import struct
from dns import resolver

# 配置自定义 DNS 服务器
custom_resolver = resolver.Resolver()
custom_resolver.nameservers = ['127.0.0.1']
# 设置端口号(这里假设所有服务器都使用 8600 端口)
custom_resolver.port = 8600 # 注意:通常不需要对每个服务器单独设置,除非它们使用不同端口


def extract_ip_prefix(encoded_address: str):
match = encoded_address.split(".")[0]
return match


def decode_address(encoded_address):
hex_address = extract_ip_prefix(encoded_address)
binary_address = bytes.fromhex(hex_address)
return socket.inet_ntoa(binary_address)


def srv_to_address_random_choice(srv_record_name):
try:
answer = custom_resolver.resolve(srv_record_name, 'SRV')
selected_rdata = random.choice(answer)
ip_address = decode_address(selected_rdata.target.to_text())
print(f"Service: {srv_record_name} - {ip_address}:{selected_rdata.port} ")
return ip_address, selected_rdata.port
except resolver.NoAnswer as e:
print(f"No answer found for the query: {srv_record_name}")
except resolver.NXDOMAIN as e:
print(f"Domain not found: {srv_record_name}")
except Exception as e:
print(f"Error occurred while resolving DNS: {str(e)}")


def main():
# 查询 SRV 记录
srv_record_name = 'unit_executor.service.consul'
# srv_record_name = 'consul.service.consul'
srv_to_address_random_choice(srv_record_name)


if __name__ == "__main__":
main()

0 comments on commit 71c0f4d

Please sign in to comment.