LittlePaimon/utils/auth_util.py

214 lines
7.9 KiB
Python
Raw Permalink Normal View History

2022-04-30 14:13:39 +08:00
import hashlib
2022-05-20 18:44:18 +08:00
import json
2022-04-30 14:13:39 +08:00
import random
import string
2022-05-20 18:44:18 +08:00
from collections import defaultdict
from time import time
from littlepaimon_utils import aiorequests
2022-04-30 14:13:39 +08:00
from nonebot import logger
2022-05-20 18:44:18 +08:00
from .db_util import get_cookie_cache, update_cookie_cache, delete_cookie_cache
2022-04-30 14:13:39 +08:00
from .db_util import get_private_cookie, delete_cookie
from .db_util import get_public_cookie, limit_public_cookie
2022-05-20 18:44:18 +08:00
from .message_util import send_cookie_delete_msg
2022-04-30 14:13:39 +08:00
# 双参数冷却时间限制器
class FreqLimiter2:
def __init__(self, default_cd_seconds: int = 60):
self.next_time = defaultdict(lambda: defaultdict(float))
self.default_cd = default_cd_seconds
def check(self, key1, key2) -> bool:
return bool(time() >= self.next_time[key1][key2])
def start_cd(self, key1, key2, cd_time=0):
self.next_time[key1][key2] = time() + (cd_time if cd_time > 0 else self.default_cd)
def left_time(self, key1, key2) -> int:
return int(self.next_time[key1][key2] - time()) + 1
2022-04-30 14:13:39 +08:00
# 获取可用的cookie
async def get_use_cookie(user_id, uid='', mys_id='', action=''):
cache_cookie = await get_cookie_cache(uid, 'uid')
cookies = await get_private_cookie(user_id, 'user_id')
if not cookies:
if cache_cookie:
if cache_cookie['type'] == 'public':
logger.info(f'---派蒙调用{uid}的缓存公共cookie执行{action}操作---')
else:
logger.info(f'---派蒙调用{uid}的缓存私人cookie执行{action}操作---')
return cache_cookie
public_cookie = await get_public_cookie()
if not public_cookie:
logger.info(f'---派蒙当前没有可用的公共cookie可能是都达到了上限或没有公共cookie---')
return None
else:
logger.info(f'---派蒙调用{public_cookie[0]}号公共cookie执行{action}操作---')
return {'type': 'public', 'cookie': public_cookie[1], 'no': public_cookie[0]}
else:
for user_id_, cookie, uid_, mys_id_ in cookies:
if (uid and uid_ == uid) or (mys_id and mys_id_ == mys_id):
logger.info(f'---派蒙调用用户{user_id_}的uid{uid_}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': user_id_, 'cookie': cookie, 'uid': uid_, 'mys_id': mys_id_}
if cache_cookie:
if cache_cookie['type'] == 'public':
logger.info(f'---派蒙调用{uid}的缓存公共cookie执行{action}操作---')
else:
logger.info(f'---派蒙调用{uid}的缓存私人cookie执行{action}操作---')
return cache_cookie
use_cookie = random.choice(cookies)
logger.info(f'---派蒙调用用户{use_cookie[0]}的uid{use_cookie[2]}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': use_cookie[0], 'cookie': use_cookie[1], 'uid': use_cookie[2],
2022-04-30 14:13:39 +08:00
'mys_id': use_cookie[3]}
# 获取可用的私人cookie
async def get_own_cookie(uid='', mys_id='', action=''):
if uid:
cookie = (await get_private_cookie(uid, 'uid'))
elif mys_id:
cookie = (await get_private_cookie(mys_id, 'mys_id'))
else:
cookie = None
if not cookie:
return None
else:
cookie = cookie[0]
logger.info(f'---派蒙调用用户{cookie[0]}的uid{cookie[2]}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': cookie[0], 'cookie': cookie[1], 'uid': cookie[2], 'mys_id': cookie[3]}
# 检查数据返回状态10001为ck过期了10101为达到每日30次上线了
async def check_retcode(data, cookie, uid):
if data['retcode'] == 10001 or data['retcode'] == -100:
await delete_cookie(cookie['cookie'], cookie['type'])
await send_cookie_delete_msg(cookie)
return False
elif data['retcode'] == 10101:
if cookie['type'] == 'public':
logger.info(f'{cookie["no"]}号公共cookie达到了每日30次查询上限')
await limit_public_cookie(cookie['cookie'])
await delete_cookie_cache(cookie['cookie'], key='cookie')
elif cookie['type'] == 'private':
logger.info(f'用户{cookie["user_id"]}的uid{cookie["uid"]}私人cookie达到了每日30次查询上限')
return '私人cookie达到了每日30次查询上限'
2022-04-30 14:13:39 +08:00
return False
else:
await update_cookie_cache(cookie['cookie'], uid, 'uid')
return True
# md5加密
def md5(text: str) -> str:
md5 = hashlib.md5()
md5.update(text.encode())
return md5.hexdigest()
# 生成随机字符串
def random_hex(length):
result = hex(random.randint(0, 16 ** length)).replace('0x', '').upper()
if len(result) < length:
result = '0' * (length - len(result)) + result
return result
def random_text(length: int) -> str:
"""
生成指定长度的随机字符串
:param length: 长度
:return: 随机字符串
"""
return ''.join(random.sample(string.ascii_lowercase + string.digits, length))
2022-08-19 01:15:45 +08:00
def get_ds(q: str = '', b: dict = None, mhy_bbs_sign: bool = False) -> str:
"""
生成米游社headers的ds_token
:param q: 查询
:param b: 请求体
:param mhy_bbs_sign: 是否为米游社讨论区签到
:return: ds_token
"""
br = json.dumps(b) if b else ''
if mhy_bbs_sign:
s = 't0qEgfub6cvueAPgR5m9aQWWVciEer7v'
2022-04-30 14:13:39 +08:00
else:
2022-08-19 01:15:45 +08:00
s = 'xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs'
2022-04-30 14:13:39 +08:00
t = str(int(time()))
r = str(random.randint(100000, 200000))
2022-08-19 01:15:45 +08:00
c = md5(f'salt={s}&t={t}&r={r}&b={br}&q={q}')
return f'{t},{r},{c}'
2022-04-30 14:13:39 +08:00
# 米游社爬虫headers
def get_headers(cookie, q='', b=None):
headers = {
'DS': get_ds(q, b),
'Origin': 'https://webstatic.mihoyo.com',
'Cookie': cookie,
2022-04-30 14:13:39 +08:00
'x-rpc-app_version': "2.11.1",
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS '
'X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1',
2022-04-30 14:13:39 +08:00
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/'
2022-04-30 14:13:39 +08:00
}
return headers
def get_old_version_ds(mhy_bbs: bool = False) -> str:
"""
生成米游社旧版本headers的ds_token
"""
if mhy_bbs:
2022-08-19 01:15:45 +08:00
s = '9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7'
else:
2022-08-19 01:15:45 +08:00
s = 'z8DRIUjNDT7IT5IZXvrUAxyupA1peND9'
2022-04-30 14:13:39 +08:00
t = str(int(time()))
r = ''.join(random.sample(string.ascii_lowercase + string.digits, 6))
c = md5(f"salt={s}&t={t}&r={r}")
2022-04-30 14:13:39 +08:00
return f"{t},{r},{c}"
def get_sign_headers(cookie):
headers = {
'User_Agent': 'Mozilla/5.0 (Linux; Android 10; MIX 2 Build/QKQ1.190825.002; wv) AppleWebKit/537.36 ('
'KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.101 Mobile Safari/537.36 '
2022-08-19 01:15:45 +08:00
'miHoYoBBS/2.34.1',
'Cookie': cookie,
'x-rpc-device_id': random_hex(32),
'Origin': 'https://webstatic.mihoyo.com',
'X_Requested_With': 'com.mihoyo.hyperion',
'DS': get_old_version_ds(mhy_bbs=True),
2022-08-19 01:15:45 +08:00
'x-rpc-client_type': '5',
2022-08-05 10:34:27 +08:00
'Referer': 'https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon',
2022-08-19 01:15:45 +08:00
'x-rpc-app_version': '2.34.1'
2022-04-30 14:13:39 +08:00
}
return headers
# 检查cookie是否有效通过查看个人主页来判断
async def check_cookie(cookie):
url = 'https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo?gids=2'
headers = {
'DS': get_ds(),
'Origin': 'https://webstatic.mihoyo.com',
'Cookie': cookie,
2022-04-30 14:13:39 +08:00
'x-rpc-app_version': "2.11.1",
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/'
2022-04-30 14:13:39 +08:00
}
res = await aiorequests.get(url=url, headers=headers)
res = res.json()
if res['retcode'] != 0:
return False
else:
return True
2022-04-30 14:13:39 +08:00
2022-05-20 18:44:18 +08:00