LittlePaimon/utils/message_util.py

258 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from io import BytesIO
from pathlib import Path
from time import time
from typing import Union, Optional, Tuple
from PIL import Image
from littlepaimon_utils import aiorequests
from littlepaimon_utils.files import load_image
from nonebot import get_bot, logger
from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment
from .db_util import get_last_query, update_last_query
# 加载敏感违禁词列表
ban_word = []
with open(Path(__file__).parent / 'json_data' / 'ban_word.txt', 'r', encoding='utf-8') as f:
for line in f:
ban_word.append(line.strip())
class MessageBuild:
@classmethod
def Image(cls,
img: Union[Image.Image, Path, str],
*,
size: Optional[Union[Tuple[int, int], float]] = None,
crop: Optional[Tuple[int, int, int, int]] = None,
quality: Optional[int] = 100,
mode: Optional[str] = None
) -> MessageSegment:
"""
说明:
图片预处理并构造成MessageSegment
:param img: 图片Image对象或图片路径
:param size: 预处理尺寸
:param crop: 预处理裁剪大小
:param quality: 预处理图片质量
:param mode: 预处理图像模式
:return: MessageSegment.image
"""
if isinstance(img, str) or isinstance(img, Path):
img = load_image(path=img, size=size, mode=mode, crop=crop)
else:
if size:
if isinstance(size, float):
img = img.resize((int(img.size[0] * size), int(img.size[1] * size)), Image.ANTIALIAS)
elif isinstance(size, tuple):
img = img.resize(size, Image.ANTIALIAS)
if crop:
img = img.crop(crop)
if mode:
img = img.convert(mode)
bio = BytesIO()
img.save(bio, format='JPEG' if img.mode == 'RGB' else 'PNG', quality=quality)
return MessageSegment.image(bio)
@classmethod
async def StaticImage(cls,
url: str,
size: Optional[Tuple[int, int]] = None,
crop: Optional[Tuple[int, int, int, int]] = None,
quality: Optional[int] = 100,
mode: Optional[str] = None,
tips: Optional[str] = None,
is_check_time: Optional[bool] = True,
check_time_day: Optional[int] = 3
):
"""
从url下载图片并预处理并构造成MessageSegment如果url的图片已存在本地则直接读取本地图片
:param url: 图片url
:param size: 预处理尺寸
:param crop: 预处理裁剪大小
:param quality: 预处理图片质量
:param mode: 预处理图像模式
:param tips: url中不存在该图片时的提示语
:param is_check_time: 是否检查本地图片最后修改时间
:param check_time_day: 检查本地图片最后修改时间的天数,超过该天数则重新下载图片
:return: MessageSegment.image
"""
path = Path() / 'resources' / url
if path.exists() and (
not is_check_time or (is_check_time and not check_time(path.stat().st_mtime, check_time_day))):
img = Image.open(path)
else:
path.parent.mkdir(parents=True, exist_ok=True)
img = await aiorequests.get_img(url='https://static.cherishmoon.fun/' + url, save_path=path)
if img == 'No Such File':
return MessageBuild.Text(tips or '缺少该静态资源')
if size:
img = img.resize(size)
if crop:
img = img.crop(crop)
if mode:
img = img.convert(mode)
bio = BytesIO()
img.save(bio, format='JPEG' if img.mode == 'RGB' else 'PNG', quality=quality)
return MessageSegment.image(bio)
@classmethod
def Text(cls, text: str) -> MessageSegment:
"""
过滤文本中的敏感违禁词
:param text: 文本
:return: MessageSegment.text
"""
for word in ban_word[2:]:
if word and word in text:
text = text.replace(word, '*' * len(word))
return MessageSegment.text(text)
@classmethod
def Record(cls, path: str) -> MessageSegment:
return MessageSegment.record(path)
@classmethod
async def StaticRecord(cls, url: str) -> MessageSegment:
"""
从url中下载音频文件并构造成MessageSegment如果本地已有该音频文件则直接读取本地文件
:param url: 语音url
:return: MessageSegment.record
"""
path = Path() / 'data' / url
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
resp = await aiorequests.get(url='https://static.cherishmoon.fun/' + url)
content = resp.content
path.write_bytes(content)
return MessageSegment.record(file=path)
@classmethod
def Video(cls, path: str) -> MessageSegment:
return MessageSegment.video(path)
@classmethod
async def StaticVideo(cls, url: str) -> MessageSegment:
"""
从url中下载视频文件并构造成MessageSegment如果本地已有该视频文件则直接读取本地文件
:param url: 视频url
:return: MessageSegment.video
"""
path = Path() / 'data' / url
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
resp = await aiorequests.get(url='https://static.cherishmoon.fun/' + url)
content = resp.content
path.write_bytes(content)
return MessageSegment.video(file=path)
async def get_at_target(msg):
for msg_seg in msg:
if msg_seg.type == "at":
return msg_seg.data['qq']
return None
# message预处理获取uid、干净的msg、user_id、是否缓存
async def get_uid_in_msg(event: MessageEvent, msg: Message):
msg = msg.extract_plain_text().strip()
if not msg:
uid = await get_last_query(str(event.user_id))
return uid, '', str(event.user_id), True
user_id = await get_at_target(event.message) or str(event.user_id)
# msg = re.sub(r'\[CQ.*?\]', '', msg)
use_cache = False if '-r' in msg else True
msg = msg.replace('-r', '').strip()
find_uid = r'(?P<uid>(1|2|5)\d{8})'
for msg_seg in event.message:
if msg_seg.type == 'text':
match = re.search(find_uid, msg_seg.data['text'])
if match:
await update_last_query(user_id, match.group('uid'), 'uid')
return match.group('uid'), msg.replace(match.group('uid'), '').strip(), user_id, use_cache
uid = await get_last_query(user_id)
return uid, msg.strip(), user_id, use_cache
# 向超级用户私聊发送cookie删除信息
async def send_cookie_delete_msg(cookie_info):
msg = ''
if cookie_info['type'] == 'public':
msg = f'公共池的{cookie_info["no"]}号cookie已失效'
elif cookie_info['type'] == 'private':
if cookie_info['uid']:
msg = f'用户{cookie_info["user_id"]}的uid{cookie_info["uid"]}的cookie已失效'
elif cookie_info['mys_id']:
msg = f'用户{cookie_info["user_id"]}的mys_id{cookie_info["mys_id"]}的cookie已失效'
if msg:
logger.info(f'---{msg}---')
for superuser in get_bot().config.superusers:
try:
await get_bot().send_private_msg(user_id=superuser, message=msg + ',派蒙帮你删除啦!')
except Exception as e:
logger.error(f'发送cookie删除消息失败: {e}')
def get_message_id(event):
if event.message_type == 'private':
return event.user_id
elif event.message_type == 'group':
return event.group_id
elif event.message_type == 'guild':
return event.channel_id
def uid_userId_to_dict(uid, user_id) -> Tuple[dict, Message]:
total_result = Message()
query_dict = {}
if isinstance(uid, str) and isinstance(user_id, str):
query_dict[uid] = user_id
elif isinstance(uid, list) and isinstance(user_id, str):
for u in uid:
query_dict[u] = user_id
elif isinstance(uid, list) and isinstance(user_id, list):
for u, us in zip(uid, user_id):
if u is not None:
query_dict[u] = us
else:
total_result += MessageSegment.text(f'派蒙没有{us}{u}信息哦请把uid给派蒙吧~')
return query_dict, total_result
def replace_all(raw_text: str, text_list: Union[str, list]):
if not text_list:
return raw_text
else:
if isinstance(text_list, str):
text_list = [text_list]
for text in text_list:
raw_text = raw_text.replace(text, '')
return raw_text
def transform_uid(msg):
if not msg:
return None
if isinstance(msg, Message):
msg = msg.extract_plain_text().strip()
check_uid = msg.split(' ')
uid_list = []
for check in check_uid:
uid = re.search(r'(?P<uid>(1|2|5)\d{8})', check)
if uid:
uid_list.append(uid.group('uid'))
return uid_list if len(uid_list) > 1 else uid_list[0] if uid_list else None
# 检查该时间戳和当前时间戳相差是否超过n天 超过则返回True
def check_time(time_stamp, n=1):
time_stamp = int(time_stamp)
now = int(time())
if (now - time_stamp) / 86400 > n:
return True
else:
return False