393 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
from collections import defaultdict
from io import BytesIO
from pathlib import Path
import random
import base64
import datetime
from time import time
import re
import os
import string
import functools
import inspect
import json
import asyncio
from PIL import Image
from json import JSONDecodeError
from aiohttp import ClientSession
from nonebot import get_bot
from nonebot import logger
from nonebot.exception import FinishedException
from nonebot.adapters.onebot.v11 import MessageEvent, Message
from nonebot.adapters.onebot.v11.exception import ActionFailed
from .db_util import get_private_cookie, delete_cookie
from .db_util import get_public_cookie, limit_public_cookie
from .db_util import get_cookie_cache, update_cookie_cache, delete_cookie_cache
from .db_util import get_last_query, update_last_query
def auto_withdraw(seconds: int = -1):
def wrapper(func):
@functools.wraps(func)
async def wrapped(**kwargs):
try:
message_id = await func(**kwargs)
if message_id and seconds >= 1:
await asyncio.sleep(seconds)
await get_bot().delete_msg(message_id=message_id['message_id'])
except Exception as e:
raise e
return wrapped
return wrapper
# 缓存装饰器 ttl为过期时间 参数use_cache决定是否使用缓存默认为True
def cache(ttl=datetime.timedelta(hours=1), **kwargs):
def wrap(func):
cache_data = {}
@functools.wraps(func)
async def wrapped(*args, **kw):
nonlocal cache_data
bound = inspect.signature(func).bind(*args, **kw)
bound.apply_defaults()
ins_key = '|'.join(['%s_%s' % (k, v) for k, v in bound.arguments.items()])
default_data = {"time": None, "value": None}
data = cache_data.get(ins_key, default_data)
now = datetime.datetime.now()
if 'use_cache' not in kw:
kw['use_cache'] = True
if not kw['use_cache'] or not data['time'] or now - data['time'] > ttl:
try:
data['value'] = await func(*args, **kw)
data['time'] = now
cache_data[ins_key] = data
except Exception as e:
raise e
return data['value']
return wrapped
return wrap
# 异常处理装饰器
def exception_handler():
def wrapper(func):
@functools.wraps(func)
async def wrapped(**kwargs):
event = kwargs['event']
try:
await func(**kwargs)
except FinishedException:
raise
except ActionFailed:
logger.exception('账号可能被风控,消息发送失败')
await get_bot().send(event, f'派蒙可能被风控,也可能是没有该图片资源,消息发送失败')
except JSONDecodeError:
await get_bot().send(event, '派蒙获取信息失败,重试一下吧')
# except IndexError or KeyError as e:
# await get_bot().send(event, f'派蒙获取信息失败,请确认参数无误,{e}')
# except TypeError or AttributeError as e:
# await get_bot().send(event, f'派蒙好像没有该UID的绑定信息 {e}')
except FileNotFoundError as e:
file_name = re.search(r'\'(.*)\'', str(e)).group(1)
file_name = file_name.replace('\\\\', '/').split('/')
file_name = file_name[-2] + '\\' + file_name[-1]
await get_bot().send(event, f"派蒙缺少{file_name}资源,请联系开发者补充")
except Exception as e:
await get_bot().send(event, f'派蒙好像出了点问题,{e}')
return wrapped
return wrapper
# 冷却时间限制器
class FreqLimiter:
def __init__(self, default_cd_seconds: int = 60):
self.next_time = defaultdict(float)
self.default_cd = default_cd_seconds
def check(self, key) -> bool:
return bool(time() >= self.next_time[key])
def start_cd(self, key, cd_time=0):
self.next_time[key] = time() + (cd_time if cd_time > 0 else self.default_cd)
def left_time(self, key) -> int:
return int(self.next_time[key] - time()) + 1
# 双参数冷却时间限制器
class FreqLimiter2:
def __init__(self, default_cd_seconds: int = 60):
self.next_time = defaultdict(lambda: defaultdict(float))
self.default_cd = default_cd_seconds
def check(self, key1, key2) -> bool:
return bool(time() >= self.next_time[key1][key2])
def start_cd(self, key1, key2, cd_time=0):
self.next_time[key1][key2] = time() + (cd_time if cd_time > 0 else self.default_cd)
def left_time(self, key1, key2) -> int:
return int(self.next_time[key1][key2] - time()) + 1
# 从网络url中获取图片
async def get_pic(url: str, size: tuple = None, mode: str = None):
async with ClientSession() as session:
res = await session.get(url)
res = await res.read()
img = Image.open(BytesIO(res))
if size:
img = img.resize(size)
if mode:
img = img.convert(mode)
return img
# 获取可用的cookie
async def get_use_cookie(user_id, uid='', mys_id='', action=''):
cache_cookie = await get_cookie_cache(uid, 'uid')
cookies = await get_private_cookie(user_id, 'user_id')
if not cookies:
if cache_cookie:
if cache_cookie['type'] == 'public':
logger.info(f'---派蒙调用{uid}的缓存公共cookie执行{action}操作---')
else:
logger.info(f'---派蒙调用{uid}的缓存私人cookie执行{action}操作---')
return cache_cookie
public_cookie = await get_public_cookie()
if not public_cookie:
logger.info(f'---派蒙当前没有可用的公共cookie可能是都达到了上限或没有公共cookie---')
return None
else:
logger.info(f'---派蒙调用{public_cookie[0]}号公共cookie执行{action}操作---')
return {'type': 'public', 'cookie': public_cookie[1], 'no': public_cookie[0]}
else:
for user_id_, cookie, uid_, mys_id_ in cookies:
if (uid and uid_ == uid) or (mys_id and mys_id_ == mys_id):
logger.info(f'---派蒙调用用户{user_id_}的uid{uid_}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': user_id_, 'cookie': cookie, 'uid': uid_, 'mys_id': mys_id_}
if cache_cookie:
if cache_cookie['type'] == 'public':
logger.info(f'---派蒙调用{uid}的缓存公共cookie执行{action}操作---')
else:
logger.info(f'---派蒙调用{uid}的缓存私人cookie执行{action}操作---')
return cache_cookie
use_cookie = random.choice(cookies)
logger.info(f'---派蒙调用用户{use_cookie[0]}的uid{use_cookie[2]}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': use_cookie[0], 'cookie': use_cookie[1], 'uid': use_cookie[2],
'mys_id': use_cookie[3]}
# 获取可用的私人cookie
async def get_own_cookie(uid='', mys_id='', action=''):
if uid:
cookie = (await get_private_cookie(uid, 'uid'))
elif mys_id:
cookie = (await get_private_cookie(mys_id, 'mys_id'))
else:
cookie = None
if not cookie:
return None
else:
cookie = cookie[0]
logger.info(f'---派蒙调用用户{cookie[0]}的uid{cookie[2]}私人cookie执行{action}操作---')
return {'type': 'private', 'user_id': cookie[0], 'cookie': cookie[1], 'uid': cookie[2], 'mys_id': cookie[3]}
# 检查数据返回状态10001为ck过期了10101为达到每日30次上线了
async def check_retcode(data, cookie, uid):
if data['retcode'] == 10001 or data['retcode'] == -100:
await delete_cookie(cookie['cookie'], cookie['type'])
await send_cookie_delete_msg(cookie)
return False
elif data['retcode'] == 10101:
if cookie['type'] == 'public':
logger.info(f'{cookie["no"]}号公共cookie达到了每日30次查询上限')
await limit_public_cookie(cookie['cookie'])
await delete_cookie_cache(cookie['cookie'], key='cookie')
elif cookie['type'] == 'private':
logger.info(f'用户{cookie["user_id"]}的uid{cookie["uid"]}私人cookie达到了每日30次查询上限')
return '私人cookie达到了每日30次查询上限'
return False
else:
await update_cookie_cache(cookie['cookie'], uid, 'uid')
return True
# 图片转b64q为质量压缩比例
def pil2b64(data, q=85):
bio = BytesIO()
data = data.convert("RGB")
data.save(bio, format='JPEG', quality=q)
base64_str = base64.b64encode(bio.getvalue()).decode()
return 'base64://' + base64_str
# 获取message中的艾特对象
async def get_at_target(msg):
for msg_seg in msg:
if msg_seg.type == "at":
return msg_seg.data['qq']
return None
# message预处理获取uid、干净的msg、user_id、是否缓存
async def get_uid_in_msg(event: MessageEvent, msg: Message):
msg = str(msg).strip()
if not msg:
uid = await get_last_query(str(event.user_id))
return uid, '', str(event.user_id), True
user_id = await get_at_target(event.message) or str(event.user_id)
msg = re.sub(r'\[CQ.*?\]', '', msg)
use_cache = False if '-r' in msg else True
msg = msg.replace('-r', '').strip()
find_uid = r'(?P<uid>(1|2|5)\d{8})'
for msg_seg in event.message:
if msg_seg.type == 'text':
match = re.search(find_uid, msg_seg.data['text'])
if match:
await update_last_query(user_id, match.group('uid'), 'uid')
return match.group('uid'), msg.replace(match.group('uid'), '').strip(), user_id, use_cache
uid = await get_last_query(user_id)
return uid, msg.strip(), user_id, use_cache
# md5加密
def md5(text: str) -> str:
md5 = hashlib.md5()
md5.update(text.encode())
return md5.hexdigest()
# 生成随机字符串
def random_hex(length):
result = hex(random.randint(0, 16 ** length)).replace('0x', '').upper()
if len(result) < length:
result = '0' * (length - len(result)) + result
return result
# 米游社headers的ds_token对应版本2.11.1
def get_ds(q="", b=None) -> str:
if b:
br = json.dumps(b)
else:
br = ""
s = "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs"
t = str(int(time()))
r = str(random.randint(100000, 200000))
c = md5("salt=" + s + "&t=" + t + "&r=" + r + "&b=" + br + "&q=" + q)
return f"{t},{r},{c}"
# 米游社爬虫headers
def get_headers(cookie, q='', b=None):
headers = {
'DS': get_ds(q, b),
'Origin': 'https://webstatic.mihoyo.com',
'Cookie': cookie,
'x-rpc-app_version': "2.11.1",
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS '
'X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1',
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/'
}
return headers
def get_old_version_ds() -> str:
s = 'h8w582wxwgqvahcdkpvdhbh2w9casgfl'
t = str(int(time()))
r = ''.join(random.sample(string.ascii_lowercase + string.digits, 6))
c = md5("salt=" + s + "&t=" + t + "&r=" + r)
return f"{t},{r},{c}"
def get_sign_headers(cookie):
headers = {
'User_Agent': 'Mozilla/5.0 (Linux; Android 10; MIX 2 Build/QKQ1.190825.002; wv) AppleWebKit/537.36 ('
'KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.101 Mobile Safari/537.36 '
'miHoYoBBS/2.3.0',
'Cookie': cookie,
'x-rpc-device_id': random_hex(32),
'Origin': 'https://webstatic.mihoyo.com',
'X_Requested_With': 'com.mihoyo.hyperion',
'DS': get_old_version_ds(),
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?bbs_auth_required=true&act_id'
'=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon',
'x-rpc-app_version': '2.3.0'
}
return headers
# 检查cookie是否有效通过查看个人主页来判断
async def check_cookie(cookie):
url = 'https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo?gids=2'
headers = {
'DS': get_ds(),
'Origin': 'https://webstatic.mihoyo.com',
'Cookie': cookie,
'x-rpc-app_version': "2.11.1",
'x-rpc-client_type': '5',
'Referer': 'https://webstatic.mihoyo.com/'
}
async with ClientSession() as session:
res = await session.get(url=url, headers=headers)
res = await res.json()
if res['retcode'] != 0:
return False
else:
return True
# 向超级用户私聊发送cookie删除信息
async def send_cookie_delete_msg(cookie_info):
msg = ''
if cookie_info['type'] == 'public':
msg = f'公共池的{cookie_info["no"]}号cookie已失效'
elif cookie_info['type'] == 'private':
if cookie_info['uid']:
msg = f'用户{cookie_info["user_id"]}的uid{cookie_info["uid"]}的cookie已失效'
elif cookie_info['mys_id']:
msg = f'用户{cookie_info["user_id"]}的mys_id{cookie_info["mys_id"]}的cookie已失效'
if msg:
logger.info(f'---{msg}---')
for superuser in get_bot().config.superusers:
try:
await get_bot().send_private_msg(user_id=superuser, message=msg + ',派蒙帮你删除啦!')
except Exception as e:
logger.error(f'发送cookie删除消息失败: {e}')
def load_data(data_file):
data_path = Path() / 'data' / 'LittlePaimon' / data_file
if not data_path.exists():
save_data({}, data_file)
return json.load(data_path.open('r', encoding='utf-8'))
def save_data(data, data_file):
data_path = Path() / 'data' / 'LittlePaimon' / data_file
data_path.parent.mkdir(parents=True, exist_ok=True)
json.dump(data, data_path.open('w', encoding='utf-8'), ensure_ascii=False, indent=2)
def get_id(event):
if event.message_type == 'private':
return event.user_id
elif event.message_type == 'group':
return event.group_id
elif event.message_type == 'guild':
return event.channel_id