mirror of
https://github.com/xuthus83/LittlePaimon.git
synced 2024-12-16 13:40:53 +08:00
✨ 更新enka
api地址,优化资源检查方式
This commit is contained in:
parent
8f8bca9177
commit
66530afe46
@ -4,7 +4,7 @@ from nonebot import get_driver
|
||||
from .logger import logger
|
||||
from .scheduler import scheduler
|
||||
|
||||
__version__ = '3.0.5'
|
||||
__version__ = '3.0.6'
|
||||
|
||||
DRIVER = get_driver()
|
||||
try:
|
||||
@ -13,7 +13,7 @@ except Exception:
|
||||
SUPERUSERS = []
|
||||
|
||||
if not SUPERUSERS or SUPERUSERS == ['123456']:
|
||||
logger.warning('请在.env.prod文件中配置超级用户SUPERUSERS')
|
||||
logger.warning('超级用户配置', '请在.env.prod文件中配置超级用户SUPERUSERS')
|
||||
|
||||
try:
|
||||
NICKNAME: str = list(DRIVER.config.nickname)[0]
|
||||
|
@ -35,8 +35,9 @@ LOGIN_TICKET_INFO_API = 'https://webapi.account.mihoyo.com/Api/cookie_accountinf
|
||||
def md5(text: str) -> str:
|
||||
"""
|
||||
md5加密
|
||||
:param text: 文本
|
||||
:return: md5加密后的文本
|
||||
|
||||
:param text: 文本
|
||||
:return: md5加密后的文本
|
||||
"""
|
||||
md5_ = hashlib.md5()
|
||||
md5_.update(text.encode())
|
||||
@ -46,8 +47,9 @@ def md5(text: str) -> str:
|
||||
def random_hex(length: int) -> str:
|
||||
"""
|
||||
生成指定长度的随机字符串
|
||||
:param length: 长度
|
||||
:return: 随机字符串
|
||||
|
||||
:param length: 长度
|
||||
:return: 随机字符串
|
||||
"""
|
||||
result = hex(random.randint(0, 16 ** length)).replace('0x', '').upper()
|
||||
if len(result) < length:
|
||||
@ -58,8 +60,9 @@ def random_hex(length: int) -> str:
|
||||
def random_text(length: int) -> str:
|
||||
"""
|
||||
生成指定长度的随机字符串
|
||||
:param length: 长度
|
||||
:return: 随机字符串
|
||||
|
||||
:param length: 长度
|
||||
:return: 随机字符串
|
||||
"""
|
||||
return ''.join(random.sample(string.ascii_lowercase + string.digits, length))
|
||||
|
||||
@ -67,10 +70,11 @@ def random_text(length: int) -> str:
|
||||
def get_ds(q: str = '', b: dict = None, mhy_bbs_sign: bool = False) -> str:
|
||||
"""
|
||||
生成米游社headers的ds_token
|
||||
:param q: 查询
|
||||
:param b: 请求体
|
||||
:param mhy_bbs_sign: 是否为米游社讨论区签到
|
||||
:return: ds_token
|
||||
|
||||
:param q: 查询
|
||||
:param b: 请求体
|
||||
:param mhy_bbs_sign: 是否为米游社讨论区签到
|
||||
:return: ds_token
|
||||
"""
|
||||
br = json.dumps(b) if b else ''
|
||||
if mhy_bbs_sign:
|
||||
@ -415,6 +419,7 @@ async def get_cookie_token_by_stoken(stoken: str, mys_id: str) -> Optional[str]:
|
||||
async def get_authkey_by_stoken(user_id: str, uid: str) -> Tuple[Optional[str], bool, Optional[PrivateCookie]]:
|
||||
"""
|
||||
根据stoken获取authkey
|
||||
|
||||
:param user_id: 用户id
|
||||
:param uid: 原神uid
|
||||
:return: authkey
|
||||
@ -455,8 +460,8 @@ async def get_authkey_by_stoken(user_id: str, uid: str) -> Tuple[Optional[str],
|
||||
|
||||
async def get_enka_data(uid):
|
||||
urls = [
|
||||
'https://enka.network/u/{uid}/__data.json',
|
||||
'https://enka.microgg.cn/u/{uid}/__data.json'
|
||||
'https://enka.network/api/uid/{uid}',
|
||||
'https://enka.microgg.cn/api/uid/{uid}'
|
||||
]
|
||||
for url in urls:
|
||||
with contextlib.suppress(Exception):
|
||||
|
@ -6,8 +6,6 @@ from pathlib import Path
|
||||
from ssl import SSLCertVerificationError
|
||||
from typing import Union
|
||||
|
||||
import httpx
|
||||
import tqdm.asyncio
|
||||
from ruamel import yaml
|
||||
|
||||
from .requests import aiorequests
|
||||
@ -16,22 +14,26 @@ from .requests import aiorequests
|
||||
def load_json(path: Union[Path, str], encoding: str = 'utf-8'):
|
||||
"""
|
||||
读取本地json文件,返回文件数据。
|
||||
:param path: 文件路径
|
||||
:param encoding: 编码,默认为utf-8
|
||||
:return: 数据
|
||||
|
||||
:param path: 文件路径
|
||||
:param encoding: 编码,默认为utf-8
|
||||
:return: 数据
|
||||
"""
|
||||
if isinstance(path, str):
|
||||
path = Path(path)
|
||||
if not path.name.endswith('.json'):
|
||||
path = path.with_suffix('.json')
|
||||
return json.loads(path.read_text(encoding=encoding)) if path.exists() else {}
|
||||
|
||||
|
||||
async def load_json_from_url(url: str, path: Union[Path, str] = None, force_refresh: bool = False) -> dict:
|
||||
"""
|
||||
从网络url中读取json,当有path参数时,如果path文件不存在,就会从url下载保存到path,如果path文件存在,则直接读取path
|
||||
:param url: url
|
||||
:param path: 本地json文件路径
|
||||
:param force_refresh: 是否强制重新下载
|
||||
:return: json字典
|
||||
|
||||
:param url: url
|
||||
:param path: 本地json文件路径
|
||||
:param force_refresh: 是否强制重新下载
|
||||
:return: json字典
|
||||
"""
|
||||
if path and Path(path).exists() and not force_refresh:
|
||||
return load_json(path=path)
|
||||
@ -48,23 +50,24 @@ async def load_json_from_url(url: str, path: Union[Path, str] = None, force_refr
|
||||
def save_json(data: dict, path: Union[Path, str] = None, encoding: str = 'utf-8'):
|
||||
"""
|
||||
保存json文件
|
||||
:param data: json数据
|
||||
:param path: 保存路径
|
||||
:param encoding: 编码
|
||||
|
||||
:param data: json数据
|
||||
:param path: 保存路径
|
||||
:param encoding: 编码
|
||||
"""
|
||||
if isinstance(path, str):
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open('w', encoding=encoding) as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding=encoding)
|
||||
|
||||
|
||||
def load_yaml(path: Union[Path, str], encoding: str = 'utf-8'):
|
||||
"""
|
||||
读取本地yaml文件,返回字典。
|
||||
:param path: 文件路径
|
||||
:param encoding: 编码,默认为utf-8
|
||||
:return: 字典
|
||||
|
||||
:param path: 文件路径
|
||||
:param encoding: 编码,默认为utf-8
|
||||
:return: 字典
|
||||
"""
|
||||
if isinstance(path, str):
|
||||
path = Path(path)
|
||||
@ -75,9 +78,10 @@ def load_yaml(path: Union[Path, str], encoding: str = 'utf-8'):
|
||||
def save_yaml(data: dict, path: Union[Path, str] = None, encoding: str = 'utf-8'):
|
||||
"""
|
||||
保存yaml文件
|
||||
:param data: 数据
|
||||
:param path: 保存路径
|
||||
:param encoding: 编码
|
||||
|
||||
:param data: 数据
|
||||
:param path: 保存路径
|
||||
:param encoding: 编码
|
||||
"""
|
||||
if isinstance(path, str):
|
||||
path = Path(path)
|
||||
@ -89,26 +93,3 @@ def save_yaml(data: dict, path: Union[Path, str] = None, encoding: str = 'utf-8'
|
||||
indent=2,
|
||||
Dumper=yaml.RoundTripDumper,
|
||||
allow_unicode=True)
|
||||
|
||||
|
||||
async def download(url: str, save_path: Union[Path, str]):
|
||||
"""
|
||||
下载文件(带进度条)
|
||||
:param url: url
|
||||
:param save_path: 保存路径
|
||||
"""
|
||||
if isinstance(save_path, str):
|
||||
save_path = Path(save_path)
|
||||
save_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
async with httpx.AsyncClient().stream(method='GET', url=url, follow_redirects=True) as datas:
|
||||
size = int(datas.headers['Content-Length'])
|
||||
f = save_path.open('wb')
|
||||
async for chunk in tqdm.asyncio.tqdm(iterable=datas.aiter_bytes(1),
|
||||
desc=url.split('/')[-1],
|
||||
unit='iB',
|
||||
unit_scale=True,
|
||||
unit_divisor=1024,
|
||||
total=size,
|
||||
colour='green'):
|
||||
f.write(chunk)
|
||||
f.close()
|
||||
|
@ -1,74 +0,0 @@
|
||||
import datetime
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
from LittlePaimon.database import PublicCookie, PrivateCookie, LastQuery, DailyNoteSub, MihoyoBBSSub
|
||||
from .logger import logger
|
||||
|
||||
|
||||
async def migrate_database():
|
||||
old_db_path = Path() / 'data' / 'LittlePaimon' / 'user_data' / 'user_data.db'
|
||||
if not old_db_path.exists():
|
||||
return
|
||||
logger.info('派蒙数据库迁移', '开始迁移数据库')
|
||||
conn = sqlite3.connect(old_db_path)
|
||||
cursor = conn.cursor()
|
||||
# 迁移公共cookie
|
||||
try:
|
||||
cursor.execute('SELECT cookie FROM public_cookies;')
|
||||
cookie = cursor.fetchall()
|
||||
for c in cookie:
|
||||
await PublicCookie.create(cookie=c[0])
|
||||
logger.info('派蒙数据库迁移', f'成功迁移公共cookie<m>{c[0][:20]}...</m>')
|
||||
except Exception:
|
||||
logger.info('派蒙数据库迁移', '公共cookie没有可迁移的数据')
|
||||
# 迁移私人cookie
|
||||
try:
|
||||
cursor.execute('SELECT user_id, uid, mys_id, cookie, stoken FROM private_cookies;')
|
||||
cookie = cursor.fetchall()
|
||||
for c in cookie:
|
||||
await PrivateCookie.update_or_create(user_id=c[0], uid=c[1], mys_id=c[2], cookie=c[3], stoken=c[4])
|
||||
logger.info('派蒙数据库迁移', f'成功迁移用户<m>{c[0]}</m>的UID<m>{c[1]}</m>的<m>私人cookie</m>')
|
||||
except Exception:
|
||||
logger.info('派蒙数据库迁移', '私人cookie没有可迁移的数据')
|
||||
# 最后查询记录迁移
|
||||
try:
|
||||
cursor.execute('SELECT user_id, uid FROM last_query;')
|
||||
query = cursor.fetchall()
|
||||
for q in query:
|
||||
await LastQuery.update_or_create(user_id=q[0], uid=q[1], last_time=datetime.datetime.now())
|
||||
logger.info('派蒙数据库迁移', f'成功迁移UID查询记录<m>{len(query)}</m>条')
|
||||
except Exception:
|
||||
logger.info('派蒙数据库迁移', 'UID查询记录没有可迁移的数据')
|
||||
# 实时便签提醒迁移
|
||||
try:
|
||||
cursor.execute('SELECT user_id, uid, count, remind_group FROM note_remind;')
|
||||
note = cursor.fetchall()
|
||||
for n in note:
|
||||
await DailyNoteSub.update_or_create(user_id=n[0], uid=n[1], remind_type='private' if n[3] == n[1] else 'group', group_id=n[3], resin_num=n[2])
|
||||
logger.info('派蒙数据库迁移', f'成功迁移用户<m>{n[0]}</m>的UID<m>{n[1]}</m>的米游社自动签到')
|
||||
except Exception:
|
||||
logger.info('派蒙数据库迁移', '米游社自动签到没有可迁移的数据')
|
||||
# 米游社签到迁移
|
||||
try:
|
||||
cursor.execute('SELECT user_id, uid, group_id FROM bbs_sign;')
|
||||
sign = cursor.fetchall()
|
||||
for s in sign:
|
||||
await MihoyoBBSSub.update_or_create(user_id=s[0], uid=s[1], group_id=s[2], sub_event='米游社原神签到')
|
||||
logger.info('派蒙数据库迁移', f'成功迁移用户<m>{s[0]}</m>的UID<m>{s[1]}</m>的米游社原神签到')
|
||||
except Exception:
|
||||
logger.info('派蒙数据库迁移', '米游社原神签到没有可迁移的数据')
|
||||
# 米游币获取迁移
|
||||
try:
|
||||
cursor.execute('SELECT user_id, uid, group_id FROM coin_bbs_sign;')
|
||||
sign = cursor.fetchall()
|
||||
for s in sign:
|
||||
await MihoyoBBSSub.update_or_create(user_id=s[0], uid=s[1], group_id=s[2], sub_event='米游币自动获取')
|
||||
logger.info('派蒙数据库迁移', f'成功迁移用户<m>{s[0]}</m>的UID<m>{s[1]}</m>的米游币自动获取')
|
||||
except Exception:
|
||||
logger.info('派蒙数据库迁移', '米游币自动获取没有可迁移的数据')
|
||||
|
||||
conn.close()
|
||||
|
||||
# 将old_db_path文件改名为old_db_path.bak
|
||||
old_db_path.rename(old_db_path.parent / f'{old_db_path.name}.bak')
|
@ -142,9 +142,10 @@ class aiorequests:
|
||||
async def download(url: str, save_path: Path, exclude_json: bool = False):
|
||||
"""
|
||||
下载文件(带进度条)
|
||||
:param url: url
|
||||
:param save_path: 保存路径
|
||||
:param exclude_json: 是否排除json文件
|
||||
|
||||
:param url: url
|
||||
:param save_path: 保存路径
|
||||
:param exclude_json: 是否排除json文件
|
||||
"""
|
||||
save_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
async with httpx.AsyncClient().stream(method='GET', url=url, follow_redirects=True) as datas:
|
||||
|
@ -4,6 +4,7 @@ import functools
|
||||
import hashlib
|
||||
import inspect
|
||||
import time
|
||||
import zipfile
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
|
||||
@ -87,31 +88,47 @@ def cache(ttl=datetime.timedelta(hours=1)):
|
||||
|
||||
async def check_resource():
|
||||
logger.info('资源检查', '开始检查资源')
|
||||
try:
|
||||
resource_list = await aiorequests.get(
|
||||
f'{config.github_proxy}https://raw.githubusercontent.com/CMHopeSunshine/LittlePaimonRes/main/resources_list.json',
|
||||
follow_redirects=True)
|
||||
resource_list = resource_list.json()
|
||||
except Exception:
|
||||
logger.info('资源检查', '读取资源列表<r>失败</r>,请尝试更换<m>github资源地址</m>')
|
||||
return
|
||||
flag = False
|
||||
for resource in resource_list:
|
||||
file_path = RESOURCE_BASE_PATH / resource['path']
|
||||
if file_path.exists():
|
||||
if not resource['lock'] or hashlib.md5(file_path.read_bytes()).hexdigest() == resource['hash']:
|
||||
continue
|
||||
else:
|
||||
file_path.unlink()
|
||||
flag = True
|
||||
if not (
|
||||
(RESOURCE_BASE_PATH / 'LittlePaimon').is_dir() and
|
||||
len(list((RESOURCE_BASE_PATH / 'LittlePaimon').rglob('*'))) >= 50):
|
||||
try:
|
||||
await aiorequests.download(
|
||||
url=f'{config.github_proxy}https://raw.githubusercontent.com/CMHopeSunshine/LittlePaimonRes/main/{resource["path"]}',
|
||||
save_path=file_path, exclude_json=resource['path'].split('.')[-1] != 'json')
|
||||
await asyncio.sleep(0.5)
|
||||
url=f'{config.github_proxy}https://raw.githubusercontent.com/CMHopeSunshine/LittlePaimonRes/main/resources.zip',
|
||||
save_path=RESOURCE_BASE_PATH / '小派蒙基础资源.zip')
|
||||
zipfile.ZipFile(RESOURCE_BASE_PATH / '小派蒙基础资源.zip').extractall(RESOURCE_BASE_PATH)
|
||||
(RESOURCE_BASE_PATH / '小派蒙基础资源.zip').unlink()
|
||||
|
||||
await aiorequests.download(
|
||||
url=f'{config.github_proxy}https://raw.githubusercontent.com/CMHopeSunshine/GenshinWikiMap/master/resources/genshin_resources.zip',
|
||||
save_path=RESOURCE_BASE_PATH / '原神图标资源.zip')
|
||||
zipfile.ZipFile(RESOURCE_BASE_PATH / '原神图标资源.zip').extractall(RESOURCE_BASE_PATH / 'LittlePaimon')
|
||||
(RESOURCE_BASE_PATH / '原神图标资源.zip').unlink()
|
||||
logger.info('资源检查', '<g>资源下载完成</g>')
|
||||
except Exception:
|
||||
logger.warning('资源检查', f'下载<m>{resource["path"]}</m>时<r>出错</r>,请尝试更换<m>github资源地址</m>')
|
||||
if flag:
|
||||
logger.info('资源检查', '<g>资源下载完成</g>')
|
||||
logger.warning('资源检查', '下载<m>资源</m>时<r>出错</r>,请尝试更换<m>github资源地址</m>')
|
||||
else:
|
||||
logger.info('资源检查', '<g>资源完好,无需下载</g>')
|
||||
try:
|
||||
resource_list = await aiorequests.get(
|
||||
f'{config.github_proxy}https://raw.githubusercontent.com/CMHopeSunshine/LittlePaimonRes/main/resources_list.json',
|
||||
follow_redirects=True)
|
||||
resource_list = resource_list.json()
|
||||
except Exception:
|
||||
logger.warning('资源检查', '读取资源列表<r>失败</r>,请尝试更换<m>github资源地址</m>')
|
||||
return
|
||||
flag = False
|
||||
for resource in resource_list:
|
||||
file_path = RESOURCE_BASE_PATH / resource['path']
|
||||
if file_path.exists():
|
||||
if not resource['lock'] or hashlib.md5(file_path.read_bytes()).hexdigest() == resource['hash']:
|
||||
continue
|
||||
else:
|
||||
file_path.unlink()
|
||||
try:
|
||||
await aiorequests.download(
|
||||
url=f'{config.github_proxy}https://raw.githubusercontent.com/CMHopeSunshine/LittlePaimonRes/main/{resource["path"]}',
|
||||
save_path=file_path, exclude_json=resource['path'].split('.')[-1] != 'json')
|
||||
await asyncio.sleep(0.2)
|
||||
flag = True
|
||||
except Exception:
|
||||
logger.warning('资源检查', f'下载<m>{resource["path"]}</m>时<r>出错</r>,请尝试更换<m>github资源地址</m>')
|
||||
logger.info('资源检查', '<g>资源下载完成</g>' if flag else '<g>资源完好,无需下载</g>')
|
||||
|
Loading…
x
Reference in New Issue
Block a user