全ys功能支持私聊&fix bug

This commit is contained in:
CMHopeSunshine 2022-03-14 14:18:25 +08:00
parent 0efe65ce7a
commit 1eb94ec9b1
12 changed files with 72 additions and 334 deletions

View File

@ -3,7 +3,7 @@ from hoshino import R,MessageSegment
from hoshino.typing import CQEvent, Message
from hoshino.util import PriFreqLimiter
import hoshino
from .util import Dict
from ..util import Dict
from hoshino import aiorequests
from .gacha_role import *
from .gacha_wish import more_ten

View File

@ -3,7 +3,7 @@ import re, copy ,json
from enum import Enum
from pathlib import Path
from PIL import Image, PngImagePlugin, ImageDraw, ImageFont
from .util import filter_list, pil2b64,dict_to_object
from ..util import pil2b64
import os
from .gacha_role import init_user_info, user_info, save_user_info
@ -249,5 +249,5 @@ async def more_ten(uid, gacha_data, num, sd):
for i in range(0, num):
item_img = await ten(uid, gacha_data, sd)
img.paste(item_img, (0, 575 * i))
return pil2b64(img)
return pil2b64(img, 75)

View File

@ -1,293 +0,0 @@
# -*- coding: UTF-8 -*-
import base64
import datetime
import functools
import hashlib
import inspect
import json
import os
import re
import time
from io import BytesIO
from pathlib import Path
import aiofiles
import yaml
from hoshino import CanceledException, aiorequests, priv, trigger
from nonebot import *
from PIL import ImageFont
from sqlitedict import SqliteDict
bot = get_bot()
try:
import locale
# 解决部分系统无法格式化中文时间问题
locale.setlocale(locale.LC_CTYPE, 'chinese')
except Exception as e:
pass
class Dict(dict):
__setattr__ = dict.__setitem__
__getattr__ = dict.__getitem__
def md5(context):
return hashlib.md5(context).hexdigest()
def dict_to_object(dict_obj):
if not isinstance(dict_obj, dict):
return dict_obj
inst = Dict()
for k, v in dict_obj.items():
inst[k] = dict_to_object(v)
return inst
# 获取字符串中的关键字
def get_msg_keyword(keyword, msg, is_first=False):
msg = msg[0] if isinstance(msg, tuple) else msg
res = re.split(format_reg(keyword, is_first), msg, 1)
res = tuple(res[::-1]) if len(res) == 2 else False
return ''.join(res) if is_first and res else res
# 格式化配置中的正则表达式
def format_reg(keyword, is_first=False):
keyword = keyword if isinstance(keyword, list) else [keyword]
return f"{'|'.join([f'^{i}' for i in keyword] if is_first else keyword)}"
def get_path(*paths):
return os.path.join(os.path.dirname(__file__), *paths)
db = {}
# 初始化数据库
def init_db(db_dir, db_name='db.sqlite', tablename='unnamed') -> SqliteDict:
db_cache_key = db_name + tablename
if db.get(db_cache_key):
return db[db_cache_key]
db[db_cache_key] = SqliteDict(get_path(db_dir, db_name),
tablename=tablename,
encode=json.dumps,
decode=json.loads,
autocommit=True)
return db[db_cache_key]
# 寻找MessageSegment里的某个关键字的位置
def find_ms_str_index(ms, keyword, is_first=False):
for index, item in enumerate(ms):
if item['type'] == 'text' and re.search(format_reg(keyword, is_first),
item['data']['text']):
return index
return -1
def filter_list(plist, func):
return list(filter(func, plist))
def list_split(items, n):
return [items[i:i + n] for i in range(0, len(items), n)]
def is_group_admin(ctx):
return ctx['sender']['role'] in ['owner', 'admin', 'administrator']
def get_next_day():
return time.mktime((datetime.date.today() +
datetime.timedelta(days=+1)).timetuple()) + 1000
def get_font(size, w='85'):
return ImageFont.truetype(get_path('assets', 'font', f'HYWenHei {w}W.ttf'),
size=size)
def pil2b64(data):
bio = BytesIO()
data = data.convert("RGB")
data.save(bio, format='JPEG', quality=75)
base64_str = base64.b64encode(bio.getvalue()).decode()
return 'base64://' + base64_str
private_prefix = []
# support private message
@message_preprocessor
async def private_handler(bot, ev, _):
if ev.detail_type != 'private':
return
for t in trigger.chain:
for service in t.find_handler(ev):
sv = service.sv
if sv in private_prefix:
if priv.get_user_priv(ev) >= priv.NORMAL:
try:
await service.func(bot, ev)
except CanceledException:
raise
sv.logger.info(
f'Private Message {ev.message_id} triggered {service.func.__name__}.'
)
def support_private(sv):
def wrap(func):
private_prefix.append(sv)
return func
return wrap
def cache(ttl=datetime.timedelta(hours=1), **kwargs):
def wrap(func):
cache_data = {}
@functools.wraps(func)
async def wrapped(*args, **kw):
nonlocal cache_data
bound = inspect.signature(func).bind(*args, **kw)
bound.apply_defaults()
ins_key = '|'.join(['%s_%s' % (k, v) for k, v in bound.arguments.items()])
default_data = {"time": None, "value": None}
data = cache_data.get(ins_key, default_data)
now = datetime.datetime.now()
if not data['time'] or now - data['time'] > ttl:
try:
data['value'] = await func(*args, **kw)
data['time'] = now
cache_data[ins_key] = data
except Exception as e:
raise e
return data['value']
return wrapped
return wrap
async def github(path):
try:
url = f'https://cdn.jsdelivr.net/gh/{path}'
res = await aiorequests.get(url, timeout=10)
return await res.content
except aiorequests.exceptions.ConnectionError:
raise
gh_end_point = 'pcrbot/erinilis-modules/egenshin/'
async def gh_json(file_path):
return json.loads(await github(gh_end_point + file_path), object_hook=Dict)
async def gh_file(file_path, **kw):
kw['url'] = gh_end_point + file_path
return await require_file(**kw)
async def require_file(file=None,
r_mode='rb',
encoding=None,
url=None,
use_cache=True,
w_mode='wb',
timeout=30):
async def read():
async with aiofiles.open(file, r_mode, encoding=encoding) as fp:
return await fp.read()
if not any([file, url]):
raise ValueError('file or url not null')
file = file and Path(file)
if file and file.exists() and use_cache:
return await read()
if not url:
raise ValueError('url not null')
try:
res = await aiorequests.get(url, timeout=timeout)
content = await res.content
except aiorequests.exceptions.ConnectionError:
raise
if file:
os.makedirs(os.path.dirname(file), exist_ok=True)
async with aiofiles.open(file, w_mode, encoding=encoding) as fp:
await fp.write(content)
return content
return await read()
@cache(ttl=datetime.timedelta(minutes=30), arg_key='url')
async def cache_request_json(url):
res = await aiorequests.get(url, timeout=10)
return await res.json(object_hook=Dict)
@cache(ttl=datetime.timedelta(hours=24))
async def get_game_version():
url = 'https://sdk-static.mihoyo.com/hk4e_cn/mdk/launcher/api/resource?key=eYd89JmJ&launcher_id=18'
res = await aiorequests.get(url, timeout=10)
json_data = await res.json(object_hook=Dict)
if json_data.retcode != 0:
raise Exception(json_data.message)
latest = json_data.data.game.latest
return latest.version
running = {}
class process:
def __init__(self, key, timeout=0):
self.key = key
self.timeout = timeout
def get(self):
return running.get(self.key, {})
def start(self):
running[self.key] = {'run': True, 'start_time': time.time()}
return self
def ok(self):
if running.get(self.key):
del running[self.key]
def is_run(self):
run = self.get()
if not run:
return False
if run.get('start_time') + self.timeout < time.time(
) and not self.timeout == 0:
self.ok()
return False
return bool(run.get('run'))
async def get_all_group():
for self_id in bot._wsr_api_clients.keys():
group = await bot.get_group_list(self_id=self_id)
for group_info in group:
yield group_info
async def get_group_info(group_id):
async for group in get_all_group():
if int(group_id) == group['group_id']:
return dict_to_object(group)

View File

@ -180,7 +180,7 @@ async def draw_player_card(data, chara_data, uid, nickname="旅行者"):
bg_img.alpha_composite(chara_card.resize((180, 249)), (840 + (i - 4) * 205, 974))
elif i > 8:
break
bg_img = pil2b64(bg_img, 75)
bg_img = pil2b64(bg_img, 80)
bg_img = MessageSegment.image(bg_img)
return bg_img
@ -262,7 +262,7 @@ async def draw_all_chara_card(data, uid):
n += 1
bg_img.paste(bg_bottom,(0,382+col*474-50))
bg_img = pil2b64(bg_img, 50)
bg_img = pil2b64(bg_img, 55)
bg_img = MessageSegment.image(bg_img)
return bg_img
@ -426,6 +426,6 @@ async def draw_chara_card(data, skill_data, chara_name, uid):
i += 1
bg_draw.text((330, 371), 'Created by 惜月の小派蒙', font=get_font(20), fill='white')
bg_img = pil2b64(bg_img, 65)
bg_img = pil2b64(bg_img, 70)
bg_img = MessageSegment.image(bg_img)
return bg_img

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -1,6 +1,6 @@
from hoshino import MessageSegment, Service, trigger, priv, CanceledException
from hoshino.typing import CQEvent, Message
from ..util import update_last_query_to_qq, bind_cookie
from ..util import update_last_query_to_qq, bind_cookie, bind_public_cookie
from nonebot import message_preprocessor
sv = Service('原神绑定',visible=False,enable_on_default=True)
@ -8,30 +8,30 @@ sv = Service('原神绑定',visible=False,enable_on_default=True)
private_prefix = []
# support private message
@message_preprocessor
async def private_handler(bot, ev, _):
if ev.detail_type != 'private':
return
for t in trigger.chain:
for service in t.find_handler(ev):
sv = service.sv
if sv in private_prefix:
if priv.get_user_priv(ev) >= priv.NORMAL:
try:
await service.func(bot, ev)
except CanceledException:
raise
sv.logger.info(
f'Private Message {ev.message_id} triggered {service.func.__name__}.'
)
# @message_preprocessor
# async def private_handler(bot, ev, _):
# if ev.detail_type != 'private':
# return
# for t in trigger.chain:
# for service in t.find_handler(ev):
# sv = service.sv
# if sv in private_prefix:
# if priv.get_user_priv(ev) >= priv.NORMAL:
# try:
# await service.func(bot, ev)
# except CanceledException:
# raise
# sv.logger.info(
# f'Private Message {ev.message_id} triggered {service.func.__name__}.'
# )
def support_private(sv):
def wrap(func):
private_prefix.append(sv)
return func
return wrap
# def support_private(sv):
# def wrap(func):
# private_prefix.append(sv)
# return func
# return wrap
@support_private(sv)
# @support_private(sv)
@sv.on_prefix(('原神绑定','ysb'))
async def bind(bot,ev):
msg = ev.message.extract_plain_text().strip().split('#')
@ -53,3 +53,10 @@ async def bind(bot,ev):
cookie = msg[1]
res = await bind_cookie(qq,uid,cookie)
await bot.send(ev,res,at_sender=True)
# @support_private(sv)
@sv.on_prefix('添加公共cookie')
async def bing_public(bot,ev):
cookie = ev.message.extract_plain_text().strip()
res = await bind_public_cookie(cookie)
await bot.send(ev,res,at_sender=True)

View File

@ -13,6 +13,7 @@ import datetime
import functools
import inspect
# user_cookies.json数据文件模版如没有.json文件就会按这个模版生成文件
user_cookies_example = {
"通用": [
{
@ -50,7 +51,7 @@ def save_data():
except:
traceback.print_exc()
# 缓存装饰器
# 缓存装饰器 ttl为过期时间 参数use_cache决定是否使用缓存默认为True
def cache(ttl=datetime.timedelta(hours=1), **kwargs):
def wrap(func):
cache_data = {}
@ -81,7 +82,11 @@ def cache(ttl=datetime.timedelta(hours=1), **kwargs):
return wrap
# 图片转b64
class Dict(dict):
__setattr__ = dict.__setitem__
__getattr__ = dict.__getitem__
# 图片转b64q为质量压缩比例
def pil2b64(data, q=85):
bio = BytesIO()
data = data.convert("RGB")
@ -95,7 +100,7 @@ def md5(text: str) -> str:
md5.update(text.encode())
return md5.hexdigest()
# 米游社headers的ds_token
# 米游社headers的ds_token对应版本2.11.1
def get_ds(q="", b=None) -> str:
if b:
br = json.dumps(b)
@ -122,7 +127,7 @@ def get_headers(cookie, q='',b=None):
#print(headers)
return headers
# 检查cookie是否有效
# 检查cookie是否有效通过查看个人主页是否返回ok来判断
async def check_cookie(cookie):
url = 'https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo?gids=2'
headers ={
@ -140,7 +145,7 @@ async def check_cookie(cookie):
else:
return True
# 通过qq号获取cookie
# 通过qq号获取最后查询的uid
def get_uid_by_qq(qq):
if qq not in user_cookies['私人']:
return None
@ -184,6 +189,15 @@ async def bind_cookie(qq, uid, cookie):
save_data()
return '绑定成功啦!'
# 绑定cookie到公共cookie池
async def bind_public_cookie(cookie):
if not await check_cookie(cookie):
return '这cookie没有用哦检查一下是不是复制错了或者过期了(试试重新登录米游社再获取)'
else:
user_cookies['通用'].append({"cookie": cookie, "no": len(user_cookies['通用']) + 1})
save_data()
return '添加公共cookie成功'
# 获取公共池可用的cookie
async def get_public_cookie():
for cookie in user_cookies['通用']:
@ -195,7 +209,7 @@ async def get_public_cookie():
logger.error('--CMgenshin原神查询公共cookie池已全部失效--')
return None
# 获取可用的cookie
# 获取可用的cookie优先获取私人cookie没有则获取公共池cookie
async def get_cookie(qq, uid, only_private = False, only_match_uid = False):
if qq not in user_cookies['私人']:
if only_private:
@ -221,4 +235,5 @@ async def get_cookie(qq, uid, only_private = False, only_match_uid = False):
else:
return await get_public_cookie()
# 初始化读取cookie数据
load_data()

View File

@ -159,6 +159,9 @@ class Service:
def _check_all(self, ev: CQEvent):
if ev.detail_type == 'private':
return True
else:
gid = ev.group_id
return self.check_enabled(gid) and not priv.check_block_group(gid) and priv.check_priv(ev, self.use_priv)

View File

@ -1,15 +1,21 @@
nonebot[scheduler]~=1.8.0
aiocqhttp~=1.4.0
lxml>=4.4.1
pytz>=2019.3
pytz>=2021.3
requests>=2.22.0
sogou_tr_free>=0.0.6
zhconv>=1.4.0
Pillow>=6.2.1
Pillow>=8.4.0
matplotlib>=3.2.0
numpy>=1.18.0
beautifulsoup4>=4.9.0
pygtrie>=2.0
tinydb>=4.0
aiohttp>=3.6
peony-twitter[all]~=1.1.7
openpyxl>=3.0.9
psutil>=5.4.3
PyYAML>=5.4.1
functools>=0.5
uuid>=1.30
filetype
imageio
loguru