CMHopeSunshine fa12023db0 小派蒙
2022-03-13 21:25:42 +08:00

223 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import traceback
import hashlib
import random
import time
import uuid
import requests
from hoshino import logger, aiorequests
from io import BytesIO
import base64
import datetime
import functools
import inspect
user_cookies = {
"通用": [
{
"cookie": "",
"no": 1
},
{
"cookie": "",
"no": 2
}
],
"私人":{
}
}
def load_data():
path = os.path.join(os.path.dirname(__file__), 'user_data','user_cookies.json')
if not os.path.exists(path):
with open(path,'w',encoding='UTF-8') as f:
json.dump(user_cookies,f,ensure_ascii=False)
else:
try:
with open(path, encoding='utf8') as f:
user_cookies = json.load(f)
except:
traceback.print_exc()
def save_data():
path = os.path.join(os.path.dirname(__file__), 'user_data','user_cookies.json')
try:
with open(path, 'w', encoding='utf8') as f:
json.dump(user_cookies, f, ensure_ascii=False, indent=2)
except:
traceback.print_exc()
# 缓存装饰器
def cache(ttl=datetime.timedelta(hours=1), **kwargs):
def wrap(func):
cache_data = {}
@functools.wraps(func)
async def wrapped(*args, **kw):
nonlocal cache_data
bound = inspect.signature(func).bind(*args, **kw)
bound.apply_defaults()
ins_key = '|'.join(['%s_%s' % (k, v) for k, v in bound.arguments.items()])
default_data = {"time": None, "value": None}
data = cache_data.get(ins_key, default_data)
now = datetime.datetime.now()
if 'use_cache' not in kw:
kw['use_cache'] = True
if not kw['use_cache'] or not data['time'] or now - data['time'] > ttl:
try:
data['value'] = await func(*args, **kw)
data['time'] = now
cache_data[ins_key] = data
except Exception as e:
raise e
return data['value']
return wrapped
return wrap
# 图片转b64
def pil2b64(data, q=85):
bio = BytesIO()
data = data.convert("RGB")
data.save(bio, format='JPEG', quality=q)
base64_str = base64.b64encode(bio.getvalue()).decode()
return 'base64://' + base64_str
# md5加密
def md5(text: str) -> str:
md5 = hashlib.md5()
md5.update(text.encode())
return md5.hexdigest()
# 米游社headers的ds_token
def get_ds(q="", b=None) -> str:
if b:
br = json.dumps(b)
else:
br = ""
s = "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs"
t = str(int(time.time()))
r = str(random.randint(100000, 200000))
c = md5("salt=" + s + "&t=" + t + "&r=" + r + "&b=" + br + "&q=" + q)
return f"{t},{r},{c}"
# 米游社爬虫headers
def get_headers(cookie, q='',b=None):
headers ={
'DS': get_ds(q, b),
'Origin': 'https://webstatic.mihoyo.com',
'Cookie': cookie,
'x-rpc-app_version': "2.11.1",
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS '
'X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1',
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/'
}
#print(headers)
return headers
# 检查cookie是否有效
async def check_cookie(cookie):
url = 'https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo?gids=2'
headers ={
'DS': get_ds(),
'Origin': 'https://webstatic.mihoyo.com',
'Cookie': cookie,
'x-rpc-app_version': "2.11.1",
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/'
}
res = await aiorequests.get(url=url,headers=headers)
res = await res.json()
if res['retcode'] != 0:
return False
else:
return True
# 通过qq号获取cookie
def get_uid_by_qq(qq):
if qq not in user_cookies['私人']:
return None
return user_cookies['私人'][qq]['last_query']
# 检查qq号是否绑定uid
def check_uid_by_qq(qq, uid):
if qq not in user_cookies['私人']:
return False
for cookie in user_cookies['私人'][qq]['cookies']:
if cookie['uid'] == uid:
return True
return False
# 更新qq最后查询的uid
def update_last_query_to_qq(qq, uid):
if qq not in user_cookies['私人']:
user_cookies['私人'][qq] = {}
user_cookies['私人'][qq]['cookies'] = []
user_cookies['私人'][qq]['last_query'] = uid
save_data()
# 绑定uid、cookie到qq号
async def bind_cookie(qq, uid, cookie):
if qq not in user_cookies['私人']:
user_cookies['私人'][qq] = {}
user_cookies['私人'][qq]['cookies'] = []
if not await check_cookie(cookie):
return '这cookie没有用哦检查一下是不是复制错了或者过期了(试试重新登录米游社再获取)'
else:
f = False
for c in user_cookies['私人'][qq]['cookies']:
if c['uid'] == uid:
c['cookie'] = cookie
f = True
break
if not f:
c = {'cookie':cookie,'uid':uid}
user_cookies['私人'][qq]['cookies'].append(c)
user_cookies['私人'][qq]['last_query'] = uid
save_data()
return '绑定成功啦!'
# 获取公共池可用的cookie
async def get_public_cookie():
for cookie in user_cookies['通用']:
if await check_cookie(cookie['cookie']):
logger.info(f'--CMgenshin调用公共cookie池-{cookie["no"]}号执行操作==')
return cookie['cookie']
else:
logger.info(f'--CMgenshin公共cookie池-{cookie["no"]}号已失效--')
logger.error('--CMgenshin原神查询公共cookie池已全部失效--')
return None
# 获取可用的cookie
async def get_cookie(qq, uid, only_private = False, only_match_uid = False):
if qq not in user_cookies['私人']:
if only_private:
return None
else:
return await get_public_cookie()
else:
valid_cookie = []
for cookie in user_cookies['私人'][qq]['cookies']:
if not await check_cookie(cookie['cookie']):
logger.error(f'--CMgenshinqq{qq}下的cookie-{cookie["uid"]}已失效--')
else:
valid_cookie.append(cookie)
if cookie['uid'] == uid:
logger.info(f'--CMgenshin调用qq{qq}的cookie-{cookie["uid"]}执行操作--')
return cookie['cookie']
if valid_cookie and only_match_uid:
logger.info(f'--CMgenshin调用qq{qq}的cookie-{cookie["uid"]}执行操作--')
return random.choice(valid_cookie)['cookie']
else:
if only_private:
return None
else:
return await get_public_cookie()
load_data()