2022-05-20 18:44:18 +08:00
|
|
|
|
import re
|
2022-07-03 21:26:33 +08:00
|
|
|
|
from io import BytesIO
|
2022-05-20 18:44:18 +08:00
|
|
|
|
from pathlib import Path
|
2022-07-03 21:26:33 +08:00
|
|
|
|
from time import time
|
2022-05-20 18:44:18 +08:00
|
|
|
|
from typing import Union, Optional, Tuple
|
|
|
|
|
|
2022-07-03 21:26:33 +08:00
|
|
|
|
from PIL import Image
|
|
|
|
|
from littlepaimon_utils import aiorequests
|
|
|
|
|
from littlepaimon_utils.files import load_image
|
2022-05-20 18:44:18 +08:00
|
|
|
|
from nonebot import get_bot, logger
|
|
|
|
|
from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment
|
|
|
|
|
|
|
|
|
|
from .db_util import get_last_query, update_last_query
|
|
|
|
|
|
2022-06-21 16:26:13 +08:00
|
|
|
|
# 加载敏感违禁词列表
|
|
|
|
|
ban_word = []
|
2022-07-03 21:26:33 +08:00
|
|
|
|
with open(Path(__file__).parent / 'json_data' / 'ban_word.txt', 'r', encoding='utf-8') as f:
|
2022-06-21 16:26:13 +08:00
|
|
|
|
for line in f:
|
|
|
|
|
ban_word.append(line.strip())
|
|
|
|
|
|
2022-05-20 18:44:18 +08:00
|
|
|
|
|
|
|
|
|
class MessageBuild:
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def Image(cls,
|
|
|
|
|
img: Union[Image.Image, Path, str],
|
|
|
|
|
*,
|
|
|
|
|
size: Optional[Union[Tuple[int, int], float]] = None,
|
2022-05-21 14:55:23 +08:00
|
|
|
|
crop: Optional[Tuple[int, int, int, int]] = None,
|
2022-05-20 18:44:18 +08:00
|
|
|
|
quality: Optional[int] = 100,
|
2022-06-22 10:56:21 +08:00
|
|
|
|
mode: Optional[str] = None
|
2022-05-20 18:44:18 +08:00
|
|
|
|
) -> MessageSegment:
|
2022-06-21 16:26:13 +08:00
|
|
|
|
"""
|
|
|
|
|
说明:
|
|
|
|
|
图片预处理并构造成MessageSegment
|
|
|
|
|
:param img: 图片Image对象或图片路径
|
|
|
|
|
:param size: 预处理尺寸
|
|
|
|
|
:param crop: 预处理裁剪大小
|
|
|
|
|
:param quality: 预处理图片质量
|
|
|
|
|
:param mode: 预处理图像模式
|
|
|
|
|
:return: MessageSegment.image
|
|
|
|
|
"""
|
2022-05-20 18:44:18 +08:00
|
|
|
|
if isinstance(img, str) or isinstance(img, Path):
|
2022-05-21 14:55:23 +08:00
|
|
|
|
img = load_image(path=img, size=size, mode=mode, crop=crop)
|
2022-06-01 12:59:03 +08:00
|
|
|
|
else:
|
|
|
|
|
if size:
|
|
|
|
|
if isinstance(size, float):
|
|
|
|
|
img = img.resize((int(img.size[0] * size), int(img.size[1] * size)), Image.ANTIALIAS)
|
|
|
|
|
elif isinstance(size, tuple):
|
|
|
|
|
img = img.resize(size, Image.ANTIALIAS)
|
|
|
|
|
if crop:
|
|
|
|
|
img = img.crop(crop)
|
|
|
|
|
if mode:
|
|
|
|
|
img = img.convert(mode)
|
2022-05-20 18:44:18 +08:00
|
|
|
|
bio = BytesIO()
|
2022-06-22 11:26:10 +08:00
|
|
|
|
img.save(bio, format='JPEG' if img.mode == 'RGB' else 'PNG', quality=quality)
|
2022-06-01 12:59:03 +08:00
|
|
|
|
return MessageSegment.image(bio)
|
2022-05-20 18:44:18 +08:00
|
|
|
|
|
2022-05-21 17:16:12 +08:00
|
|
|
|
@classmethod
|
|
|
|
|
async def StaticImage(cls,
|
|
|
|
|
url: str,
|
|
|
|
|
size: Optional[Tuple[int, int]] = None,
|
|
|
|
|
crop: Optional[Tuple[int, int, int, int]] = None,
|
|
|
|
|
quality: Optional[int] = 100,
|
2022-06-21 16:26:13 +08:00
|
|
|
|
mode: Optional[str] = None,
|
|
|
|
|
tips: Optional[str] = None,
|
|
|
|
|
is_check_time: Optional[bool] = True,
|
|
|
|
|
check_time_day: Optional[int] = 3
|
2022-05-21 17:16:12 +08:00
|
|
|
|
):
|
2022-06-21 16:26:13 +08:00
|
|
|
|
"""
|
|
|
|
|
从url下载图片,并预处理并构造成MessageSegment,如果url的图片已存在本地,则直接读取本地图片
|
|
|
|
|
:param url: 图片url
|
|
|
|
|
:param size: 预处理尺寸
|
|
|
|
|
:param crop: 预处理裁剪大小
|
|
|
|
|
:param quality: 预处理图片质量
|
|
|
|
|
:param mode: 预处理图像模式
|
|
|
|
|
:param tips: url中不存在该图片时的提示语
|
|
|
|
|
:param is_check_time: 是否检查本地图片最后修改时间
|
|
|
|
|
:param check_time_day: 检查本地图片最后修改时间的天数,超过该天数则重新下载图片
|
|
|
|
|
:return: MessageSegment.image
|
|
|
|
|
"""
|
2022-07-03 21:26:33 +08:00
|
|
|
|
path = Path() / 'resources' / url
|
2022-06-21 16:26:13 +08:00
|
|
|
|
if path.exists() and (
|
|
|
|
|
not is_check_time or (is_check_time and not check_time(path.stat().st_mtime, check_time_day))):
|
2022-05-31 18:27:47 +08:00
|
|
|
|
img = Image.open(path)
|
|
|
|
|
else:
|
2022-05-21 17:16:12 +08:00
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
img = await aiorequests.get_img(url='https://static.cherishmoon.fun/' + url, save_path=path)
|
2022-05-31 18:27:47 +08:00
|
|
|
|
if img == 'No Such File':
|
2022-06-22 10:56:21 +08:00
|
|
|
|
return MessageBuild.Text(tips or '缺少该静态资源')
|
2022-05-21 17:16:12 +08:00
|
|
|
|
if size:
|
|
|
|
|
img = img.resize(size)
|
|
|
|
|
if crop:
|
|
|
|
|
img = img.crop(crop)
|
2022-06-21 16:26:13 +08:00
|
|
|
|
if mode:
|
|
|
|
|
img = img.convert(mode)
|
2022-05-21 17:16:12 +08:00
|
|
|
|
bio = BytesIO()
|
2022-06-22 11:26:10 +08:00
|
|
|
|
img.save(bio, format='JPEG' if img.mode == 'RGB' else 'PNG', quality=quality)
|
2022-06-01 12:59:03 +08:00
|
|
|
|
return MessageSegment.image(bio)
|
2022-05-21 17:16:12 +08:00
|
|
|
|
|
2022-05-20 18:44:18 +08:00
|
|
|
|
@classmethod
|
|
|
|
|
def Text(cls, text: str) -> MessageSegment:
|
2022-06-21 16:26:13 +08:00
|
|
|
|
"""
|
2022-06-22 11:26:10 +08:00
|
|
|
|
过滤文本中的敏感违禁词
|
2022-06-21 16:26:13 +08:00
|
|
|
|
:param text: 文本
|
|
|
|
|
:return: MessageSegment.text
|
|
|
|
|
"""
|
2022-06-22 11:26:10 +08:00
|
|
|
|
for word in ban_word[2:]:
|
|
|
|
|
if word and word in text:
|
|
|
|
|
text = text.replace(word, '*' * len(word))
|
2022-05-20 18:44:18 +08:00
|
|
|
|
return MessageSegment.text(text)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def Record(cls, path: str) -> MessageSegment:
|
|
|
|
|
return MessageSegment.record(path)
|
|
|
|
|
|
2022-05-21 17:16:12 +08:00
|
|
|
|
@classmethod
|
|
|
|
|
async def StaticRecord(cls, url: str) -> MessageSegment:
|
2022-06-21 16:26:13 +08:00
|
|
|
|
"""
|
|
|
|
|
从url中下载音频文件,并构造成MessageSegment,如果本地已有该音频文件,则直接读取本地文件
|
|
|
|
|
:param url: 语音url
|
|
|
|
|
:return: MessageSegment.record
|
|
|
|
|
"""
|
2022-05-21 17:16:12 +08:00
|
|
|
|
path = Path() / 'data' / url
|
|
|
|
|
if not path.exists():
|
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
resp = await aiorequests.get(url='https://static.cherishmoon.fun/' + url)
|
|
|
|
|
content = resp.content
|
|
|
|
|
path.write_bytes(content)
|
|
|
|
|
return MessageSegment.record(file=path)
|
|
|
|
|
|
2022-06-21 16:26:13 +08:00
|
|
|
|
@classmethod
|
|
|
|
|
def Video(cls, path: str) -> MessageSegment:
|
|
|
|
|
return MessageSegment.video(path)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
async def StaticVideo(cls, url: str) -> MessageSegment:
|
|
|
|
|
"""
|
|
|
|
|
从url中下载视频文件,并构造成MessageSegment,如果本地已有该视频文件,则直接读取本地文件
|
|
|
|
|
:param url: 视频url
|
|
|
|
|
:return: MessageSegment.video
|
|
|
|
|
"""
|
|
|
|
|
path = Path() / 'data' / url
|
|
|
|
|
if not path.exists():
|
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
resp = await aiorequests.get(url='https://static.cherishmoon.fun/' + url)
|
|
|
|
|
content = resp.content
|
|
|
|
|
path.write_bytes(content)
|
|
|
|
|
return MessageSegment.video(file=path)
|
|
|
|
|
|
2022-05-20 18:44:18 +08:00
|
|
|
|
|
|
|
|
|
async def get_at_target(msg):
|
|
|
|
|
for msg_seg in msg:
|
|
|
|
|
if msg_seg.type == "at":
|
|
|
|
|
return msg_seg.data['qq']
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# message预处理,获取uid、干净的msg、user_id、是否缓存
|
2022-07-20 16:16:34 +08:00
|
|
|
|
async def get_uid_in_msg(event: MessageEvent, msg: Union[Message, str]):
|
|
|
|
|
if isinstance(msg, Message):
|
|
|
|
|
msg = msg.extract_plain_text().strip()
|
2022-05-20 18:44:18 +08:00
|
|
|
|
if not msg:
|
|
|
|
|
uid = await get_last_query(str(event.user_id))
|
|
|
|
|
return uid, '', str(event.user_id), True
|
|
|
|
|
user_id = await get_at_target(event.message) or str(event.user_id)
|
|
|
|
|
use_cache = False if '-r' in msg else True
|
|
|
|
|
msg = msg.replace('-r', '').strip()
|
|
|
|
|
find_uid = r'(?P<uid>(1|2|5)\d{8})'
|
|
|
|
|
for msg_seg in event.message:
|
|
|
|
|
if msg_seg.type == 'text':
|
|
|
|
|
match = re.search(find_uid, msg_seg.data['text'])
|
|
|
|
|
if match:
|
|
|
|
|
await update_last_query(user_id, match.group('uid'), 'uid')
|
|
|
|
|
return match.group('uid'), msg.replace(match.group('uid'), '').strip(), user_id, use_cache
|
|
|
|
|
uid = await get_last_query(user_id)
|
|
|
|
|
return uid, msg.strip(), user_id, use_cache
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 向超级用户私聊发送cookie删除信息
|
|
|
|
|
async def send_cookie_delete_msg(cookie_info):
|
|
|
|
|
msg = ''
|
|
|
|
|
if cookie_info['type'] == 'public':
|
|
|
|
|
msg = f'公共池的{cookie_info["no"]}号cookie已失效'
|
|
|
|
|
elif cookie_info['type'] == 'private':
|
|
|
|
|
if cookie_info['uid']:
|
|
|
|
|
msg = f'用户{cookie_info["user_id"]}的uid{cookie_info["uid"]}的cookie已失效'
|
|
|
|
|
elif cookie_info['mys_id']:
|
|
|
|
|
msg = f'用户{cookie_info["user_id"]}的mys_id{cookie_info["mys_id"]}的cookie已失效'
|
|
|
|
|
if msg:
|
|
|
|
|
logger.info(f'---{msg}---')
|
|
|
|
|
for superuser in get_bot().config.superusers:
|
|
|
|
|
try:
|
|
|
|
|
await get_bot().send_private_msg(user_id=superuser, message=msg + ',派蒙帮你删除啦!')
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f'发送cookie删除消息失败: {e}')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_message_id(event):
|
|
|
|
|
if event.message_type == 'private':
|
|
|
|
|
return event.user_id
|
|
|
|
|
elif event.message_type == 'group':
|
|
|
|
|
return event.group_id
|
|
|
|
|
elif event.message_type == 'guild':
|
|
|
|
|
return event.channel_id
|
2022-05-24 17:36:07 +08:00
|
|
|
|
|
|
|
|
|
|
2022-05-25 13:04:34 +08:00
|
|
|
|
def uid_userId_to_dict(uid, user_id) -> Tuple[dict, Message]:
|
|
|
|
|
total_result = Message()
|
|
|
|
|
query_dict = {}
|
|
|
|
|
if isinstance(uid, str) and isinstance(user_id, str):
|
|
|
|
|
query_dict[uid] = user_id
|
|
|
|
|
elif isinstance(uid, list) and isinstance(user_id, str):
|
|
|
|
|
for u in uid:
|
|
|
|
|
query_dict[u] = user_id
|
|
|
|
|
elif isinstance(uid, list) and isinstance(user_id, list):
|
|
|
|
|
for u, us in zip(uid, user_id):
|
|
|
|
|
if u is not None:
|
|
|
|
|
query_dict[u] = us
|
|
|
|
|
else:
|
|
|
|
|
total_result += MessageSegment.text(f'派蒙没有{us}的{u}信息哦,请把uid给派蒙吧~')
|
|
|
|
|
return query_dict, total_result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def replace_all(raw_text: str, text_list: Union[str, list]):
|
|
|
|
|
if not text_list:
|
|
|
|
|
return raw_text
|
|
|
|
|
else:
|
|
|
|
|
if isinstance(text_list, str):
|
|
|
|
|
text_list = [text_list]
|
|
|
|
|
for text in text_list:
|
|
|
|
|
raw_text = raw_text.replace(text, '')
|
|
|
|
|
return raw_text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def transform_uid(msg):
|
2022-06-21 16:26:13 +08:00
|
|
|
|
if not msg:
|
|
|
|
|
return None
|
2022-05-25 13:04:34 +08:00
|
|
|
|
if isinstance(msg, Message):
|
|
|
|
|
msg = msg.extract_plain_text().strip()
|
|
|
|
|
check_uid = msg.split(' ')
|
|
|
|
|
uid_list = []
|
|
|
|
|
for check in check_uid:
|
|
|
|
|
uid = re.search(r'(?P<uid>(1|2|5)\d{8})', check)
|
|
|
|
|
if uid:
|
|
|
|
|
uid_list.append(uid.group('uid'))
|
|
|
|
|
return uid_list if len(uid_list) > 1 else uid_list[0] if uid_list else None
|
2022-05-31 18:27:47 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 检查该时间戳和当前时间戳相差是否超过n天, 超过则返回True
|
|
|
|
|
def check_time(time_stamp, n=1):
|
|
|
|
|
time_stamp = int(time_stamp)
|
|
|
|
|
now = int(time())
|
|
|
|
|
if (now - time_stamp) / 86400 > n:
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
return False
|