优化部分代码

This commit is contained in:
CMHopeSunshine 2023-01-13 23:42:19 +08:00
parent 9c45b3da95
commit 85532a7a9e
6 changed files with 71 additions and 75 deletions

View File

@ -14,10 +14,12 @@ from nonebot.typing import T_State
from LittlePaimon.database import PlayerAlias
from LittlePaimon.utils import NICKNAME
from LittlePaimon.config import config
from LittlePaimon.utils.alias import get_match_alias, WEAPON_TYPE_ALIAS, type_file, alias_file
from LittlePaimon.utils.alias import get_match_alias, WEAPON_TYPE_ALIAS
from LittlePaimon.utils.message import MessageBuild, fullmatch_rule
from LittlePaimon.utils.tool import freq_limiter
from LittlePaimon.utils.typing import COMMAND_START_RE
from LittlePaimon.utils.files import load_json
from LittlePaimon.utils.path import JSON_DATA
from .draw_daily_material import draw_material
from .draw_map import init_map, draw_map, get_full_map
from .SereniteaPot import draw_pot_materials
@ -202,11 +204,11 @@ async def _(bot: Bot, event: MessageEvent, state: T_State, type: str = Arg('type
await total_wiki.finish(MessageSegment.image(API[type].format(proxy=config.github_proxy, name=name)))
elif type.startswith('角色') or type in {'参考面板', '收益曲线'}:
if name == '全部':
matches = type_file['角色']['元素类型']
matches = load_json(JSON_DATA / '类型.json')['角色']['元素类型']
elif re.match('^[火水冰雷风岩草](元素|属性|系)?$', name):
matches = {'角色': type_file['角色']['元素类型'][name[0]]}
matches = {'角色': load_json(JSON_DATA / '类型.json')['角色']['元素类型'][name[0]]}
elif re.match('^' + '|'.join(WEAPON_TYPE_ALIAS.keys()) + '$', name):
matches = {'角色': type_file['角色']['武器类型'][WEAPON_TYPE_ALIAS[name]]}
matches = {'角色': load_json(JSON_DATA / '类型.json')['角色']['武器类型'][WEAPON_TYPE_ALIAS[name]]}
elif alias := await PlayerAlias.get_or_none(user_id=str(event.user_id), alias=name):
final_name = alias.character
matches = {}
@ -220,15 +222,15 @@ async def _(bot: Bot, event: MessageEvent, state: T_State, type: str = Arg('type
matches = get_match_alias(name, '角色')
elif type.startswith('武器'):
if name == '全部':
matches = type_file['武器']
matches = load_json(JSON_DATA / '类型.json')['武器']
elif re.match('^' + '|'.join(WEAPON_TYPE_ALIAS.keys()) + '$', name):
matches = {'武器': type_file['武器'][WEAPON_TYPE_ALIAS[name]]}
matches = {'武器': load_json(JSON_DATA / '类型.json')['武器'][WEAPON_TYPE_ALIAS[name]]}
else:
matches = get_match_alias(name, '武器')
elif type.startswith('原魔'):
matches = {'原魔': list(alias_file['原魔'].keys())} if name == '全部' else get_match_alias(name, '原魔')
matches = {'原魔': list(load_json(JSON_DATA / 'alias.json')['原魔'].keys())} if name == '全部' else get_match_alias(name, '原魔')
elif type.startswith('圣遗物'):
matches = {'圣遗物': list(alias_file['圣遗物'].keys())} if name == '全部' else get_match_alias(name, '圣遗物')
matches = {'圣遗物': list(load_json(JSON_DATA / 'alias.json')['圣遗物'].keys())} if name == '全部' else get_match_alias(name, '圣遗物')
elif type.startswith('七圣召唤'):
if name == '全部':
matches = await get_match_card(name)
@ -238,10 +240,10 @@ async def _(bot: Bot, event: MessageEvent, state: T_State, type: str = Arg('type
matches = {'特产': s}
else:
if re.match('^[火水冰雷风岩草](元素|属性|系)?$', name):
matches = {'角色': type_file['角色']['元素类型'][name[0]]}
matches = {'角色': load_json(JSON_DATA / '类型.json')['角色']['元素类型'][name[0]]}
elif re.match('^' + '|'.join(WEAPON_TYPE_ALIAS.keys()) + '$', name):
matches = {'角色': type_file['角色']['武器类型'][WEAPON_TYPE_ALIAS[name]],
'武器': type_file['武器'][WEAPON_TYPE_ALIAS[name]]}
matches = {'角色': load_json(JSON_DATA / '类型.json')['角色']['武器类型'][WEAPON_TYPE_ALIAS[name]],
'武器': load_json(JSON_DATA / '类型.json')['武器'][WEAPON_TYPE_ALIAS[name]]}
elif alias := await PlayerAlias.get_or_none(user_id=str(event.user_id), alias=name):
final_name = alias.character
if type in {'材料', '攻略', '图鉴'}:

View File

@ -144,15 +144,17 @@ async def _(event: MessageEvent, bot: Bot, msg: Message = Arg('msg'), groups: st
@DRIVER.on_bot_connect
async def _():
if (reboot_file := (Path() / 'rebooting.json')).exists():
bot = get_bot()
reboot_data = load_json(reboot_file)
if reboot_data['session_type'] == 'group':
await bot.send_group_msg(group_id=reboot_data['session_id'],
message=f'{NICKNAME}已重启完成,当前版本为{__version__}')
else:
await bot.send_private_msg(user_id=reboot_data['session_id'],
message=f'{NICKNAME}已重启完成,当前版本为{__version__}')
if not (reboot_file := (Path() / 'rebooting.json')).exists():
return
bot = get_bot()
reboot_data = load_json(reboot_file)
if reboot_data['session_type'] == 'group':
await bot.send_group_msg(group_id=reboot_data['session_id'],
message=f'{NICKNAME}已重启完成,当前版本为{__version__}')
else:
await bot.send_private_msg(user_id=reboot_data['session_id'],
message=f'{NICKNAME}已重启完成,当前版本为{__version__}')
if 'group_card' in reboot_data:
for group_id, card_info in reboot_data['group_card'].items():
await bot.set_group_card(group_id=int(group_id), user_id=int(bot.self_id), card=card_info)
await asyncio.sleep(0.25)

View File

@ -4,7 +4,7 @@ from nonebot import get_driver
from .logger import logger
from .scheduler import scheduler
__version__ = '3.0.4'
__version__ = '3.0.5'
DRIVER = get_driver()
try:

View File

@ -1,15 +1,9 @@
import re
from difflib import get_close_matches
from typing import Union, Literal, List, Optional, Dict
from .files import load_json
from .path import JSON_DATA, GACHA_RES
from .path import JSON_DATA
alias_file = load_json(JSON_DATA / 'alias.json')
info_file = load_json(JSON_DATA / 'genshin_info.json')
weapon_file = load_json(JSON_DATA / 'weapon.json')
item_type_file = load_json(GACHA_RES / 'type.json')
type_file = load_json(JSON_DATA / '类型.json')
WEAPON_TYPE_ALIAS = {
'单手剑': '单手剑',
@ -33,7 +27,7 @@ def get_id_by_name(name: str) -> Optional[str]:
:param name: 角色名
:return: id字符串
"""
name_list = alias_file['角色']
name_list = load_json(JSON_DATA / 'alias.json')['角色']
for role_id, alias in name_list.items():
if name in alias:
return role_id
@ -47,7 +41,7 @@ def get_name_by_id(role_id: Union[str, int]) -> Optional[str]:
"""
if isinstance(role_id, int):
role_id = str(role_id)
name_list = alias_file['角色']
name_list = load_json(JSON_DATA / 'alias.json')['角色']
return name_list[role_id][0] if role_id in name_list else None
@ -57,7 +51,7 @@ def get_alias_by_name(name: str) -> Optional[List[str]]:
:param name: 角色名
:return: 别名列表
"""
name_list = alias_file['角色']
name_list = load_json(JSON_DATA / 'alias.json')['角色']
return next((r for r in name_list.values() if name in r), None)
@ -78,6 +72,7 @@ def get_match_alias(name: str, types: Union[List[ALIAS_TYPE], ALIAS_TYPE] = None
elif isinstance(types, str):
types = [types]
matches = {}
alias_file = load_json(JSON_DATA / 'alias.json')
include_flag = False
for type in types:
alias_list = alias_file[type]
@ -137,11 +132,10 @@ def get_chara_icon(name: Optional[str] = None, chara_id: Optional[int] = None,
"""
if name and not chara_id:
chara_id = get_id_by_name(name)
if info := info_file.get(str(chara_id)):
if info := load_json(JSON_DATA / 'genshin_info.json').get(str(chara_id)):
side_icon = info['SideIconName']
else:
return None
# UI_AvatarIcon_Side_Wanderer
if icon_type == 'side':
return side_icon
elif icon_type == 'avatar':
@ -155,5 +149,5 @@ def get_chara_icon(name: Optional[str] = None, chara_id: Optional[int] = None,
def get_weapon_icon(name: str) -> Optional[str]:
icon_list = weapon_file['Icon']
icon_list = load_json(JSON_DATA / 'weapon.json')['Icon']
return icon_list.get(name)

View File

@ -2,7 +2,6 @@ import contextlib
import hashlib
import json
import random
import re
import string
import time
from typing import Optional, Literal, Union, Tuple
@ -143,12 +142,11 @@ def mihoyo_sign_headers(cookie: str, extra_headers: Optional[dict] = None) -> di
return header
async def check_retcode(data: dict, cookie_info, cookie_type: str, user_id: str, uid: str) -> bool:
async def check_retcode(data: dict, cookie_info, user_id: str, uid: str) -> bool:
"""
检查数据响应状态冰进行响应处理
:param data: 数据
:param cookie_info: cookie信息
:param cookie_type: cookie类型
:param user_id: 用户id
:param uid: 原神uid
:return: 数据是否有效
@ -156,7 +154,7 @@ async def check_retcode(data: dict, cookie_info, cookie_type: str, user_id: str,
if not data:
return False
if data['retcode'] in [10001, -100]:
if cookie_type == 'private':
if isinstance(cookie_info, PrivateCookie):
if cookie_info.status == 1:
cookie_info.status = 0
await cookie_info.save()
@ -164,7 +162,7 @@ async def check_retcode(data: dict, cookie_info, cookie_type: str, user_id: str,
elif cookie_info.status == 0:
await cookie_info.delete()
logger.info('原神Cookie', f'用户<m>{user_id}</m>的私人cookie<m>{uid}</m>连续失效,<r>已删除</r>')
elif cookie_type == 'public':
elif isinstance(cookie_info, PublicCookie):
await CookieCache.filter(cookie=cookie_info.cookie).delete()
await cookie_info.delete()
logger.info('原神Cookie', f'<m>{cookie_info.id}</m>号公共cookie已失效<r>已删除</r>')
@ -175,11 +173,11 @@ async def check_retcode(data: dict, cookie_info, cookie_type: str, user_id: str,
return False
elif data['retcode'] == 10101:
cookie_info.status = 2
if cookie_type == 'private':
if isinstance(cookie_info, PrivateCookie):
cookie_info.status = 2
await cookie_info.save()
logger.info('原神Cookie', f'用户<m>{user_id}</m>的私人cookie<m>{uid}</m>已达到每日30次查询上限')
elif cookie_type == 'public':
elif isinstance(cookie_info, PublicCookie):
cookie_info.status = 2
await cookie_info.save()
logger.info('原神Cookie', f'<m>{cookie_info.id}</m>号公共cookie已达到每日30次查询上限')
@ -189,13 +187,15 @@ async def check_retcode(data: dict, cookie_info, cookie_type: str, user_id: str,
logger.info('原神Cookie', f'UID<m>{cookie_info.uid}</m>使用的缓存cookie已达到每日30次查询上限')
return False
else:
if cookie_type == 'public' and data['retcode'] != 1034:
if isinstance(cookie_info, PublicCookie) and data['retcode'] != 1034:
await CookieCache.update_or_create(uid=uid, defaults={'cookie': cookie_info.cookie})
return True
async def get_cookie(user_id: str, uid: str, check: bool = True, own: bool = False) -> Tuple[
Union[None, PrivateCookie, PublicCookie, CookieCache], str]:
async def get_cookie(user_id: str,
uid: str,
check: bool = True,
own: bool = False) -> Union[None, PrivateCookie, PublicCookie, CookieCache]:
"""
获取可用的cookie
:param user_id: 用户id
@ -205,16 +205,16 @@ async def get_cookie(user_id: str, uid: str, check: bool = True, own: bool = Fal
"""
query = Q(status=1) | Q(status=0) if check else Q(status=1)
if private_cookie := await PrivateCookie.filter(Q(Q(query) & Q(user_id=user_id) & Q(uid=uid))).first():
return private_cookie, 'private'
return private_cookie
elif not own:
if cache_cookie := await CookieCache.get_or_none(uid=uid):
return cache_cookie, 'cache'
return cache_cookie
elif private_cookie := await PrivateCookie.filter(Q(Q(query) & Q(user_id=user_id))).first():
return private_cookie, 'private'
return private_cookie
else:
return await PublicCookie.filter(Q(query)).first(), 'public'
return await PublicCookie.filter(Q(query)).first()
else:
return None, ''
return None
async def get_bind_game_info(cookie: str, mys_id: str):
@ -245,7 +245,7 @@ async def get_mihoyo_public_data(
server_id = 'cn_qd01' if uid[0] == '5' else 'cn_gf01'
check = True
while True:
cookie_info, cookie_type = await get_cookie(user_id, uid, check)
cookie_info = await get_cookie(user_id, uid, check)
check = False
if not cookie_info:
return '当前没有可使用的cookie请绑定私人cookie或联系超级管理员添加公共cookie'
@ -280,7 +280,7 @@ async def get_mihoyo_public_data(
data = None
data = data.json() if data else {'retcode': 999}
nb_logger.debug(data)
if await check_retcode(data, cookie_info, cookie_type, user_id, uid):
if await check_retcode(data, cookie_info, user_id, uid):
return data
@ -291,7 +291,7 @@ async def get_mihoyo_private_data(
role_id: Optional[str] = None,
month: Optional[str] = None):
server_id = 'cn_qd01' if uid[0] == '5' else 'cn_gf01'
cookie_info, _ = await get_cookie(user_id, uid, True, True)
cookie_info = await get_cookie(user_id, uid, True, True)
if not cookie_info:
return '未绑定私人cookie绑定方法二选一\n1.通过米游社扫码绑定:\n请发送指令[原神扫码绑定]\n2.获取cookie的教程\ndocs.qq.com/doc/DQ3JLWk1vQVllZ2Z1\n获取后,使用[ysb cookie]指令绑定' \
+ (f'或前往{config.CookieWeb_url}网页添加绑定' if config.CookieWeb_enable else '')
@ -348,7 +348,7 @@ async def get_mihoyo_private_data(
data = None
data = data.json() if data else {'retcode': 999}
nb_logger.debug(data)
if await check_retcode(data, cookie_info, 'private', user_id, uid):
if await check_retcode(data, cookie_info, user_id, uid):
return data
else:
return f'你的UID{uid}的cookie疑似失效了'
@ -420,7 +420,7 @@ async def get_authkey_by_stoken(user_id: str, uid: str) -> Tuple[Optional[str],
:return: authkey
"""
server_id = 'cn_qd01' if uid[0] == '5' else 'cn_gf01'
cookie_info, _ = await get_cookie(user_id, uid, True, True)
cookie_info = await get_cookie(user_id, uid, True, True)
if not cookie_info:
return '未绑定私人cookie绑定方法二选一\n1.通过米游社扫码绑定:\n请发送指令[原神扫码绑定]\n2.获取cookie的教程\ndocs.qq.com/doc/DQ3JLWk1vQVllZ2Z1\n获取后,使用[ysb cookie]指令绑定' \
+ (f'或前往{config.CookieWeb_url}网页添加绑定' if config.CookieWeb_enable else ''), False, cookie_info
@ -454,15 +454,15 @@ async def get_authkey_by_stoken(user_id: str, uid: str) -> Tuple[Optional[str],
async def get_enka_data(uid):
try:
url = f'https://enka.network/u/{uid}/__data.json'
resp = await aiorequests.get(url=url, headers={'User-Agent': 'LittlePaimon/3.0'}, follow_redirects=True)
data = resp.json()
nb_logger.debug(data)
return data
except Exception:
url = f'https://enka.microgg.cn/u/{uid}/__data.json'
resp = await aiorequests.get(url=url, headers={'User-Agent': 'LittlePaimon/3.0'}, follow_redirects=True)
data = resp.json()
nb_logger.debug(data)
return data
urls = [
'https://enka.network/u/{uid}/__data.json',
'https://enka.microgg.cn/u/{uid}/__data.json'
]
for url in urls:
with contextlib.suppress(Exception):
resp = await aiorequests.get(url=url.format(uid=uid),
headers={'User-Agent': 'LittlePaimon/3.0'},
follow_redirects=True)
data = resp.json()
nb_logger.debug(data)
return data

View File

@ -50,10 +50,9 @@ def update():
origin.pull()
msg = f'更新完成,版本:{__version__}\n最新更新日志为:\n{repo.head.commit.message.replace(":bug:", "🐛").replace(":sparkles:", "").replace(":memo:", "📝")}\n可使用命令[@bot 重启]重启{NICKNAME}'
except GitCommandError as e:
emsg = e.stdout + '\n' + e.stderr
if 'timeout' in emsg or 'unable to access' in emsg:
if 'timeout' in e.stderr or 'unable to access' in e.stderr:
msg = '更新失败连接git仓库超时请重试或修改源为代理源后再重试。'
elif 'Your local changes' in emsg:
elif 'Your local changes' in e.stderr:
pyproject_file = Path().parent / 'pyproject.toml'
pyproject_raw_content = pyproject_file.read_text(encoding='utf-8')
if raw_plugins_load := re.search(r'^plugins = \[.+]$', pyproject_raw_content, flags=re.M):
@ -66,13 +65,12 @@ def update():
origin.pull()
msg = f'更新完成,版本:{__version__}\n最新更新日志为:\n{repo.head.commit.message.replace(":bug:", "🐛").replace(":sparkles:", "").replace(":memo:", "📝")}\n可使用命令[@bot 重启]重启{NICKNAME}'
except GitCommandError as e:
emsg = e.stdout + '\n' + e.stderr
if 'timeout' in emsg or 'unable to access' in emsg:
if 'timeout' in e.stderr or 'unable to access' in e.stderr:
msg = '更新失败连接git仓库超时请重试或修改源为代理源后再重试。'
elif ' Your local changes' in emsg:
msg = f'更新失败,本地修改过文件导致冲突,请解决冲突后再更新。\n{emsg}'
elif ' Your local changes' in e.stderr:
msg = f'更新失败,本地修改过文件导致冲突,请解决冲突后再更新。\n{e.stderr}'
else:
msg = f'更新失败,错误信息:{emsg},请尝试手动进行更新'
msg = f'更新失败,错误信息:{e.stderr},请尝试手动进行更新'
finally:
if raw_plugins_load:
pyproject_new_content = pyproject_file.read_text(encoding='utf-8')
@ -83,5 +81,5 @@ def update():
logger.info('派蒙更新', f'更新结束,还原插件:{raw_plugins_load.group()}')
return msg
else:
msg = f'更新失败,错误信息:{emsg},请尝试手动进行更新'
msg = f'更新失败,错误信息:{e.stderr},请尝试手动进行更新'
return msg