mirror of
https://github.com/xuthus83/LittlePaimon.git
synced 2024-12-16 13:40:53 +08:00
234 lines
7.5 KiB
Python
234 lines
7.5 KiB
Python
import os
|
||
import json
|
||
import traceback
|
||
import hashlib
|
||
import random
|
||
import time
|
||
import uuid
|
||
import requests
|
||
from hoshino import logger, aiorequests
|
||
from io import BytesIO
|
||
import base64
|
||
import datetime
|
||
import functools
|
||
import inspect
|
||
|
||
# user_cookies.json数据文件模版,如没有.json文件就会按这个模版生成文件
|
||
user_cookies_example = {
|
||
"通用": [
|
||
{
|
||
"cookie": "",
|
||
"no": 1
|
||
},
|
||
{
|
||
"cookie": "",
|
||
"no": 2
|
||
}
|
||
],
|
||
"私人":{}
|
||
|
||
}
|
||
user_cookies = {}
|
||
def load_data():
|
||
path = os.path.join(os.path.dirname(__file__), 'user_data','user_cookies.json')
|
||
if not os.path.exists(path):
|
||
with open(path,'w',encoding='UTF-8') as f:
|
||
json.dump(user_cookies_example,f,ensure_ascii=False)
|
||
try:
|
||
with open(path, encoding='utf8') as f:
|
||
data = json.load(f)
|
||
for k, v in data.items():
|
||
user_cookies[k] = v
|
||
except:
|
||
traceback.print_exc()
|
||
|
||
def save_data():
|
||
path = os.path.join(os.path.dirname(__file__), 'user_data','user_cookies.json')
|
||
try:
|
||
with open(path, 'w', encoding='utf8') as f:
|
||
json.dump(user_cookies, f, ensure_ascii=False, indent=2)
|
||
except:
|
||
traceback.print_exc()
|
||
|
||
# 缓存装饰器 ttl为过期时间 参数use_cache决定是否使用缓存,默认为True
|
||
def cache(ttl=datetime.timedelta(hours=1), **kwargs):
|
||
def wrap(func):
|
||
cache_data = {}
|
||
|
||
@functools.wraps(func)
|
||
async def wrapped(*args, **kw):
|
||
nonlocal cache_data
|
||
bound = inspect.signature(func).bind(*args, **kw)
|
||
bound.apply_defaults()
|
||
ins_key = '|'.join(['%s_%s' % (k, v) for k, v in bound.arguments.items()])
|
||
default_data = {"time": None, "value": None}
|
||
data = cache_data.get(ins_key, default_data)
|
||
|
||
now = datetime.datetime.now()
|
||
if 'use_cache' not in kw:
|
||
kw['use_cache'] = True
|
||
if not kw['use_cache'] or not data['time'] or now - data['time'] > ttl:
|
||
try:
|
||
data['value'] = await func(*args, **kw)
|
||
data['time'] = now
|
||
cache_data[ins_key] = data
|
||
except Exception as e:
|
||
raise e
|
||
|
||
return data['value']
|
||
|
||
return wrapped
|
||
|
||
return wrap
|
||
|
||
class Dict(dict):
|
||
__setattr__ = dict.__setitem__
|
||
__getattr__ = dict.__getitem__
|
||
|
||
# 图片转b64,q为质量(压缩比例)
|
||
def pil2b64(data, q=85):
|
||
bio = BytesIO()
|
||
data = data.convert("RGB")
|
||
data.save(bio, format='JPEG', quality=q)
|
||
base64_str = base64.b64encode(bio.getvalue()).decode()
|
||
return 'base64://' + base64_str
|
||
|
||
# md5加密
|
||
def md5(text: str) -> str:
|
||
md5 = hashlib.md5()
|
||
md5.update(text.encode())
|
||
return md5.hexdigest()
|
||
|
||
# 米游社headers的ds_token,对应版本2.11.1
|
||
def get_ds(q="", b=None) -> str:
|
||
if b:
|
||
br = json.dumps(b)
|
||
else:
|
||
br = ""
|
||
s = "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs"
|
||
t = str(int(time.time()))
|
||
r = str(random.randint(100000, 200000))
|
||
c = md5("salt=" + s + "&t=" + t + "&r=" + r + "&b=" + br + "&q=" + q)
|
||
return f"{t},{r},{c}"
|
||
|
||
# 米游社爬虫headers
|
||
def get_headers(cookie, q='',b=None):
|
||
headers ={
|
||
'DS': get_ds(q, b),
|
||
'Origin': 'https://webstatic.mihoyo.com',
|
||
'Cookie': cookie,
|
||
'x-rpc-app_version': "2.11.1",
|
||
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS '
|
||
'X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1',
|
||
'x-rpc-client_type': '5',
|
||
'Referer': 'https://webstatic.mihoyo.com/'
|
||
}
|
||
#print(headers)
|
||
return headers
|
||
|
||
# 检查cookie是否有效,通过查看个人主页是否返回ok来判断
|
||
async def check_cookie(cookie):
|
||
url = 'https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo?gids=2'
|
||
headers ={
|
||
'DS': get_ds(),
|
||
'Origin': 'https://webstatic.mihoyo.com',
|
||
'Cookie': cookie,
|
||
'x-rpc-app_version': "2.11.1",
|
||
'x-rpc-client_type': '5',
|
||
'Referer': 'https://webstatic.mihoyo.com/'
|
||
}
|
||
res = await aiorequests.get(url=url,headers=headers)
|
||
res = await res.json()
|
||
if res['retcode'] != 0:
|
||
return False
|
||
else:
|
||
return True
|
||
|
||
# 通过qq号获取最后查询的uid
|
||
def get_uid_by_qq(qq):
|
||
if qq not in user_cookies['私人']:
|
||
return None
|
||
return user_cookies['私人'][qq]['last_query']
|
||
|
||
# 检查qq号是否绑定uid
|
||
def check_uid_by_qq(qq, uid):
|
||
if qq not in user_cookies['私人']:
|
||
return False
|
||
for cookie in user_cookies['私人'][qq]['cookies']:
|
||
if cookie['uid'] == uid:
|
||
return True
|
||
return False
|
||
|
||
# 更新qq最后查询的uid
|
||
def update_last_query_to_qq(qq, uid):
|
||
if qq not in user_cookies['私人']:
|
||
user_cookies['私人'][qq] = {}
|
||
user_cookies['私人'][qq]['cookies'] = []
|
||
user_cookies['私人'][qq]['last_query'] = uid
|
||
save_data()
|
||
|
||
# 绑定uid、cookie到qq号
|
||
async def bind_cookie(qq, uid, cookie):
|
||
if qq not in user_cookies['私人']:
|
||
user_cookies['私人'][qq] = {}
|
||
user_cookies['私人'][qq]['cookies'] = []
|
||
f = False
|
||
for c in user_cookies['私人'][qq]['cookies']:
|
||
if c['uid'] == uid:
|
||
c['cookie'] = cookie
|
||
f = True
|
||
break
|
||
if not f:
|
||
c = {'cookie':cookie,'uid':uid}
|
||
user_cookies['私人'][qq]['cookies'].append(c)
|
||
user_cookies['私人'][qq]['last_query'] = uid
|
||
save_data()
|
||
|
||
# 绑定cookie到公共cookie池
|
||
async def bind_public_cookie(cookie):
|
||
if not await check_cookie(cookie):
|
||
return '这cookie没有用哦,检查一下是不是复制错了或者过期了(试试重新登录米游社再获取)'
|
||
else:
|
||
user_cookies['通用'].append({"cookie": cookie, "no": len(user_cookies['通用']) + 1})
|
||
save_data()
|
||
return '添加公共cookie成功'
|
||
|
||
# 获取公共池可用的cookie
|
||
async def get_public_cookie():
|
||
for cookie in user_cookies['通用']:
|
||
if await check_cookie(cookie['cookie']):
|
||
logger.info(f'--CMgenshin:调用公共cookie池-{cookie["no"]}号执行操作==')
|
||
return cookie['cookie']
|
||
else:
|
||
logger.info(f'--CMgenshin:公共cookie池-{cookie["no"]}号已失效--')
|
||
logger.error('--CMgenshin:原神查询公共cookie池已全部失效--')
|
||
return None
|
||
|
||
# 获取可用的cookie,优先获取私人cookie,没有则获取公共池cookie
|
||
async def get_cookie(qq, uid, only_private = False, only_match_uid = False):
|
||
if qq not in user_cookies['私人']:
|
||
if only_private:
|
||
return None
|
||
else:
|
||
return await get_public_cookie()
|
||
else:
|
||
valid_cookie = []
|
||
for cookie in user_cookies['私人'][qq]['cookies']:
|
||
if not await check_cookie(cookie['cookie']):
|
||
logger.error(f'--CMgenshin:qq{qq}下的cookie-{cookie["uid"]}已失效--')
|
||
else:
|
||
valid_cookie.append(cookie)
|
||
if cookie['uid'] == uid:
|
||
logger.info(f'--CMgenshin:调用qq{qq}的cookie-{cookie["uid"]}执行操作--')
|
||
return cookie['cookie']
|
||
if valid_cookie and only_match_uid:
|
||
logger.info(f'--CMgenshin:调用qq{qq}的cookie-{cookie["uid"]}执行操作--')
|
||
return random.choice(valid_cookie)['cookie']
|
||
else:
|
||
if only_private:
|
||
return None
|
||
else:
|
||
return await get_public_cookie()
|
||
|
||
# 初始化读取cookie数据
|
||
load_data() |