LittlePaimon/utils/auth_util.py
2022-08-19 01:15:45 +08:00

214 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import json
import random
import string
from collections import defaultdict
from time import time
from littlepaimon_utils import aiorequests
from nonebot import logger
from .db_util import get_cookie_cache, update_cookie_cache, delete_cookie_cache
from .db_util import get_private_cookie, delete_cookie
from .db_util import get_public_cookie, limit_public_cookie
from .message_util import send_cookie_delete_msg
# 双参数冷却时间限制器
class FreqLimiter2:
def __init__(self, default_cd_seconds: int = 60):
self.next_time = defaultdict(lambda: defaultdict(float))
self.default_cd = default_cd_seconds
def check(self, key1, key2) -> bool:
return bool(time() >= self.next_time[key1][key2])
def start_cd(self, key1, key2, cd_time=0):
self.next_time[key1][key2] = time() + (cd_time if cd_time > 0 else self.default_cd)
def left_time(self, key1, key2) -> int:
return int(self.next_time[key1][key2] - time()) + 1
# 获取可用的cookie
async def get_use_cookie(user_id, uid='', mys_id='', action=''):
cache_cookie = await get_cookie_cache(uid, 'uid')
cookies = await get_private_cookie(user_id, 'user_id')
if not cookies:
if cache_cookie:
if cache_cookie['type'] == 'public':
logger.info(f'---派蒙调用{uid}的缓存公共cookie执行{action}操作---')
else:
logger.info(f'---派蒙调用{uid}的缓存私人cookie执行{action}操作---')
return cache_cookie
public_cookie = await get_public_cookie()
if not public_cookie:
logger.info(f'---派蒙当前没有可用的公共cookie可能是都达到了上限或没有公共cookie---')
return None
else:
logger.info(f'---派蒙调用{public_cookie[0]}号公共cookie执行{action}操作---')
return {'type': 'public', 'cookie': public_cookie[1], 'no': public_cookie[0]}
else:
for user_id_, cookie, uid_, mys_id_ in cookies:
if (uid and uid_ == uid) or (mys_id and mys_id_ == mys_id):
logger.info(f'---派蒙调用用户{user_id_}的uid{uid_}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': user_id_, 'cookie': cookie, 'uid': uid_, 'mys_id': mys_id_}
if cache_cookie:
if cache_cookie['type'] == 'public':
logger.info(f'---派蒙调用{uid}的缓存公共cookie执行{action}操作---')
else:
logger.info(f'---派蒙调用{uid}的缓存私人cookie执行{action}操作---')
return cache_cookie
use_cookie = random.choice(cookies)
logger.info(f'---派蒙调用用户{use_cookie[0]}的uid{use_cookie[2]}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': use_cookie[0], 'cookie': use_cookie[1], 'uid': use_cookie[2],
'mys_id': use_cookie[3]}
# 获取可用的私人cookie
async def get_own_cookie(uid='', mys_id='', action=''):
if uid:
cookie = (await get_private_cookie(uid, 'uid'))
elif mys_id:
cookie = (await get_private_cookie(mys_id, 'mys_id'))
else:
cookie = None
if not cookie:
return None
else:
cookie = cookie[0]
logger.info(f'---派蒙调用用户{cookie[0]}的uid{cookie[2]}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': cookie[0], 'cookie': cookie[1], 'uid': cookie[2], 'mys_id': cookie[3]}
# 检查数据返回状态10001为ck过期了10101为达到每日30次上线了
async def check_retcode(data, cookie, uid):
if data['retcode'] == 10001 or data['retcode'] == -100:
await delete_cookie(cookie['cookie'], cookie['type'])
await send_cookie_delete_msg(cookie)
return False
elif data['retcode'] == 10101:
if cookie['type'] == 'public':
logger.info(f'{cookie["no"]}号公共cookie达到了每日30次查询上限')
await limit_public_cookie(cookie['cookie'])
await delete_cookie_cache(cookie['cookie'], key='cookie')
elif cookie['type'] == 'private':
logger.info(f'用户{cookie["user_id"]}的uid{cookie["uid"]}私人cookie达到了每日30次查询上限')
return '私人cookie达到了每日30次查询上限'
return False
else:
await update_cookie_cache(cookie['cookie'], uid, 'uid')
return True
# md5加密
def md5(text: str) -> str:
md5 = hashlib.md5()
md5.update(text.encode())
return md5.hexdigest()
# 生成随机字符串
def random_hex(length):
result = hex(random.randint(0, 16 ** length)).replace('0x', '').upper()
if len(result) < length:
result = '0' * (length - len(result)) + result
return result
def random_text(length: int) -> str:
"""
生成指定长度的随机字符串
:param length: 长度
:return: 随机字符串
"""
return ''.join(random.sample(string.ascii_lowercase + string.digits, length))
def get_ds(q: str = '', b: dict = None, mhy_bbs_sign: bool = False) -> str:
"""
生成米游社headers的ds_token
:param q: 查询
:param b: 请求体
:param mhy_bbs_sign: 是否为米游社讨论区签到
:return: ds_token
"""
br = json.dumps(b) if b else ''
if mhy_bbs_sign:
s = 't0qEgfub6cvueAPgR5m9aQWWVciEer7v'
else:
s = 'xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs'
t = str(int(time()))
r = str(random.randint(100000, 200000))
c = md5(f'salt={s}&t={t}&r={r}&b={br}&q={q}')
return f'{t},{r},{c}'
# 米游社爬虫headers
def get_headers(cookie, q='', b=None):
headers = {
'DS': get_ds(q, b),
'Origin': 'https://webstatic.mihoyo.com',
'Cookie': cookie,
'x-rpc-app_version': "2.11.1",
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS '
'X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1',
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/'
}
return headers
def get_old_version_ds(mhy_bbs: bool = False) -> str:
"""
生成米游社旧版本headers的ds_token
"""
if mhy_bbs:
s = '9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7'
else:
s = 'z8DRIUjNDT7IT5IZXvrUAxyupA1peND9'
t = str(int(time()))
r = ''.join(random.sample(string.ascii_lowercase + string.digits, 6))
c = md5(f"salt={s}&t={t}&r={r}")
return f"{t},{r},{c}"
def get_sign_headers(cookie):
headers = {
'User_Agent': 'Mozilla/5.0 (Linux; Android 10; MIX 2 Build/QKQ1.190825.002; wv) AppleWebKit/537.36 ('
'KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.101 Mobile Safari/537.36 '
'miHoYoBBS/2.34.1',
'Cookie': cookie,
'x-rpc-device_id': random_hex(32),
'Origin': 'https://webstatic.mihoyo.com',
'X_Requested_With': 'com.mihoyo.hyperion',
'DS': get_old_version_ds(mhy_bbs=True),
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon',
'x-rpc-app_version': '2.34.1'
}
return headers
# 检查cookie是否有效通过查看个人主页来判断
async def check_cookie(cookie):
url = 'https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo?gids=2'
headers = {
'DS': get_ds(),
'Origin': 'https://webstatic.mihoyo.com',
'Cookie': cookie,
'x-rpc-app_version': "2.11.1",
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/'
}
res = await aiorequests.get(url=url, headers=headers)
res = res.json()
if res['retcode'] != 0:
return False
else:
return True