加入watcher报错,不加watcher就可以正常运行,麻烦老师帮忙看下
来源:29-6 service层集成nacos

我家大狗最最萌
2021-10-15
server代码
import logging
import socket
import sys
import os
import signal
import argparse
from functools import partial
import grpc
from concurrent import futures
from loguru import logger
BASE_DIR = os.path.dirname(os.path.abspath(os.path.dirname(__file__)))
sys.path.insert(0, BASE_DIR)
from common.grpc_health.v1 import health_pb2_grpc
from common.grpc_health.v1.health import HealthServicer
from common.register.consul import ConsulRegister
from user_srv.settings.settings import CONSUL_HOST, CONSUL_PORT, SERVICE_NAME, SERVICE_TAGS, client, NACOS, \
update_config, data
from user_srv.proto import user_pb2_grpc
from user_srv.handler.user import UserService
def get_free_tcp_port():
# 自动获取端口号
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.bind(("", 0))
_, port = tcp.getsockname()
tcp.close()
return port
def service():
parser = argparse.ArgumentParser()
parser.add_argument('--ip', nargs='?', type=str, default='192.168.0.105', help='binding ip')
parser.add_argument('--port', nargs='?', type=int, default=0, help='binding port')
args = parser.parse_args()
if args.port == 0:
port = get_free_tcp_port()
else:
port = args.port
logger.add("./logs/user_srv_{time}.log")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# 注册用户服务
user_pb2_grpc.add_UserServicer_to_server(UserService(), server)
# 注册健康检查
health_pb2_grpc.add_HealthServicer_to_server(HealthServicer(), server)
server.add_insecure_port(f'{args.ip}:{port}')
import uuid
service_id = str(uuid.uuid1())
# 主进程退出信号监听
'''
windows下支持的信号是有限的
SIGINT CTRL+C中断
SIGTERM kill发出的软件终止
'''
signal.signal(signal.SIGINT, partial(on_exit, service_id=service_id))
signal.signal(signal.SIGTERM, partial(on_exit, service_id=service_id))
logger.info(f'启动服务:{args.ip}:{port}')
server.start()
logger.info('服务注册开始')
register = ConsulRegister(CONSUL_HOST, CONSUL_PORT)
if not register.register(name=SERVICE_NAME, id=service_id, address=args.ip, port=port,
tags=SERVICE_TAGS):
logger.info('服务注册失败')
sys.exit(0)
logger.info('服务注册成功')
server.wait_for_termination()
def on_exit(signo, frame, service_id):
register = ConsulRegister(CONSUL_HOST, CONSUL_PORT)
logger.info(f'开始注销srv {service_id} 服务')
register.deregister(service_id)
logger.info(f'注销srv {service_id} 服务 成功')
sys.exit(0)
if __name__ == '__main__':
logging.basicConfig()
client.add_config_watcher(NACOS['DataId'], NACOS['Group'], update_config)
service()
settings代码
import json
import nacos
from loguru import logger
from playhouse.pool import PooledMySQLDatabase
from playhouse.shortcuts import ReconnectMixin
NACOS = {
"Host": "192.168.0.108",
"Port": 8848,
"NameSpace": "6afe4c1c-1de5-4044-8096-1b2a8f0335c9",
"User": "nacos",
"Password": "nacos",
"DataId": "user-srv",
"Group": "dev"
}
client = nacos.NacosClient(f"{NACOS['Host']}:{NACOS['Port']}",
namespace=NACOS['NameSpace'],
username=NACOS['User'],
password=NACOS['Password'])
data = client.get_config(NACOS['DataId'], NACOS['Group'])
data = json.loads(data)
logger.info(data)
def update_config(args):
print("配置产生变化")
print(args)
CONSUL = data['consul']
# consul配置
CONSUL_HOST = '192.168.0.108'
CONSUL_PORT = 8500
# 服务相关的配置
SERVICE_NAME = data['name']
SERVICE_TAGS = data['tags']
# 使用peewee的连接池, 使用ReconnectMixin来防止出现链接断开查询失败
class ReconnectMysqlDatabase(PooledMySQLDatabase, ReconnectMixin):
pass
MYSQL = data['mysql']
DB = ReconnectMysqlDatabase(MYSQL['db'], host=MYSQL['host'],
port=MYSQL['port'], user=MYSQL['user'],
password=MYSQL['password'])
写回答
1回答
-
bobby
2021-10-18
你留下qq 我加你看看吧
032022-07-12
相似问题