新增原神猜语音、60秒读世界和点餐功能,优化项目结构

This commit is contained in:
CMHopeSunshine 2022-05-15 20:15:10 +08:00
parent 82bec68346
commit 02173dd8ff
17 changed files with 1218 additions and 242 deletions

101
Guess_voice/__init__.py Normal file
View File

@ -0,0 +1,101 @@
import asyncio
from . import util
from typing import Union
from pathlib import Path
from nonebot import on_command
from nonebot.permission import SUPERUSER
from nonebot.params import CommandArg
from nonebot.exception import FinishedException
from nonebot.adapters.onebot.v11 import PrivateMessageEvent, GroupMessageEvent, MessageSegment, Bot, permission
from .handler import Guess, get_random_voice
from . import download_data
from ..utils.config import config
setting_time = config.paimon_guess_voice # 游戏持续时间
dir_name = Path() / 'LittlePaimon' / 'LittlePaimon' / 'Guess_voice' / 'voice'
dir_name.mkdir(parents=True, exist_ok=True)
guess_game = on_command('原神猜语音', priority=12, block=True)
running_guess_game = on_command('我猜', aliases={'guess', 'ig'}, priority=12, permission=permission.GROUP, block=True)
ys_voice = on_command('原神语音', priority=12, block=True)
update_ys_voice = on_command('更新原神语音资源', priority=12, permission=SUPERUSER, block=True)
async def download_voice(bot: Bot, event: Union[PrivateMessageEvent, GroupMessageEvent]):
await bot.send(event, '资源尚未初始化,现在开始下载资源,这需要较长的时间,请耐心等待')
await download_data.update_voice_data()
await bot.send(event, '资源下载完成,请重新发送指令开始游戏')
@guess_game.handle()
async def guess_genshin_voice(bot: Bot, event: GroupMessageEvent, msg=CommandArg()):
await download_voice(bot, event)
keyword = str(msg).strip()
guess = Guess(event.group_id, time=setting_time)
hard_mode = False
if keyword == '排行榜':
await guess_game.finish(await guess.get_rank(bot, event))
if keyword in ['', '中国', '汉语', '中文', '中国话', 'Chinese', 'cn'] or not keyword:
keyword = ''
elif keyword in ['', '日本', '日语', '霓虹', '日本语', 'Japanese', 'jp']:
keyword = ''
elif keyword in ['', '韩国', '韩语', '棒子', '南朝鲜', '南朝鲜语']:
keyword = ''
elif keyword in ['', '英文', '英语', '洋文', 'English', 'en']:
keyword = ''
elif keyword in ['2', '', '困难', '地狱']:
hard_mode = True
else:
await guess_game.finish(f'没有找到{keyword}的语音')
if guess.is_start():
await guess_game.finish('游戏正在进行中哦')
guess.set_start()
await guess_game.send(f'即将发送一段原神语音,将在{setting_time}秒后公布答案')
await asyncio.sleep(1)
try:
if hard_mode:
await guess_game.finish(await guess.start2())
else:
res = await guess.start(keyword.split())
print(res)
await guess_game.finish(res)
except FinishedException:
pass
except Exception as e:
guess.set_end()
await guess_game.finish(str(e))
@running_guess_game.handle()
async def on_input_chara_name(event: GroupMessageEvent, msg=CommandArg()):
msg = str(msg).strip()
guess = Guess(event.group_id, time=setting_time)
if guess.is_start():
await guess.add_answer(event.user_id, msg)
@ys_voice.handle()
async def get_genshin_voice(bot: Bot, event: Union[PrivateMessageEvent, GroupMessageEvent], msg=CommandArg()):
name = str(msg).strip()
if name.startswith(''):
language = ''
name = name[1:]
else:
language = ''
await download_voice(bot, event)
path = await get_random_voice(name, language)
if not path:
await ys_voice.finish(f'没有找到{name}的语音呢')
await ys_voice.finish(MessageSegment.record(file=Path(path)))
@update_ys_voice.handle()
async def update_genshin_voice(bot: Bot, event: Union[PrivateMessageEvent, GroupMessageEvent]):
await update_ys_voice.send('将在后台开始更新原神语音资源,请耐心等待资源下载完成后再使用原神语音')
await download_data.update_voice_data()
await update_ys_voice.finish('原神语音资源更新完成')

188
Guess_voice/character.json Normal file
View File

@ -0,0 +1,188 @@
{
"神里绫华": [
"神里",
"758"
],
"迪卢克": [
"计量单位",
"迪卢克",
"卢老爷",
"卢姥爷",
"卢锅巴",
"正义人",
"暗夜英雄"
],
"七七": [
"77",
"肚饿真君",
"度厄真君",
"四十九",
"49"
],
"丽莎": [
"图书管理员"
],
"凝光": [
"富婆",
"天权星"
],
"刻晴": [
"刻师傅",
"玉衡星",
"屁斜剑法"
],
"可莉": [
"kelee",
"蹦蹦八个蛋",
"火花骑士",
"炸鱼禁闭真君",
"哒哒哒"
],
"旅行者": [
"爷",
"卑鄙的外乡人",
"荣誉骑士"
],
"枫原万叶": [
"万叶",
"风男"
],
"温迪": [
"诶嘿",
"卖唱的",
"巴巴托斯"
],
"烟绯": [
"璃月罗翔"
],
"班尼特": [
"点赞",
"六星战神",
"点赞哥"
],
"琴": [
"代理团长",
"蒙德砍王"
],
"砂糖": [
"雷莹术士"
],
"胡桃": [
"往生堂堂主",
"堂主"
],
"芭芭拉": [
"内鬼",
"加湿器",
"偶像"
],
"莫娜": [
"半部讨龙真君"
],
"菲谢尔": [
"艾咪",
"小艾咪",
"皇女",
"奥兹发射器"
],
"行秋": [
"秋妹妹"
],
"达达利亚": [
"钱包",
"ATM机",
"达达鸭",
"公子"
],
"迪奥娜": [
"猫猫",
"猫娘",
"DIO娜"
],
"重云": [
"冰元素附魔器"
],
"钟离": [
"帝君",
"未来可期真君",
"社会废人",
"契约之神",
"七天神像的噩梦",
"岩王帝君"
],
"雷泽": [
"狼崽"
],
"香菱": [
"锅巴发射器"
],
"魈": [
"快乐风男",
"打桩机",
"降魔大圣",
"护法夜叉大将"
],
"阿贝多": [
"电梯人",
"白垩之子"
],
"甘雨": [
"王小美",
"椰羊"
],
"优菈": [
"游击小队队长"
],
"北斗": [
"大姐头"
],
"安柏": [
"打火姬",
"侦察骑士"
],
"凯亚": [
"凯子",
"凯子哥",
"矿工头子"
],
"诺艾尔": [
"冰莹术士",
"女仆"
],
"辛焱": [
"黑妹"
],
"罗莎莉亚": [
"修女"
],
"留云借风真君": [
"傲娇真君"
],
"雷电将军": [
"雷神",
"将军"
],
"珊瑚宫心海": [
"心海"
],
"戴因斯雷布": [
"大派蒙"
],
"丘丘人":[
"qq人"
],
"???": [
""
],
"「愚人众」守卫": [
"愚人众守卫"
],
"「怪鸟」": [
"怪鸟"
],
"观察「大盗宝家」":[
"大盗宝家"
]
}

View File

@ -0,0 +1,174 @@
# -*- coding: UTF-8 -*-
"""
该脚本可以直接获取wiki上的语音文件 并保存进数据库中
"""
import json
import os
from pathlib import Path
from bs4 import BeautifulSoup
from aiohttp import ClientSession
from sqlitedict import SqliteDict # TODO 加入requirements
from .util import get_path
from nonebot import logger
# OUT_PUT = Path(__file__).parent / 'voice'
OUT_PUT = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'voice'
BASE_URL = 'https://wiki.biligame.com/ys/'
API = {'character_list': '角色', 'voice': '%s语音'}
config = {
# 日 英 韩
'voice_language': ['', '', '']
}
# dir_data = os.path.join(os.path.dirname(__file__), 'data')
dir_data = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'data'
# if not os.path.exists(dir_data):
# os.makedirs(dir_data)
dir_data.mkdir(parents=True, exist_ok=True)
############
def init_db(db_dir, db_name='db.sqlite') -> SqliteDict:
return SqliteDict(get_path(db_dir, db_name),
encode=json.dumps,
decode=json.loads,
autocommit=True)
db = init_db('data', 'voice.sqlite')
############
# 获取角色列表
async def get_character_list():
async with ClientSession() as session:
html = await session.get(BASE_URL + API['character_list'])
soup = BeautifulSoup(await html.text(), 'lxml')
char_list = soup.find(attrs={
'class': 'resp-tab-content',
'style': 'display:block;'
})
char_list1 = char_list.find_all(attrs={'class': 'g C5星'})
res = list(set(map(lambda x: x.find('div', class_='L').text, char_list1)))
char_list2 = char_list.find_all(attrs={'class': 'g C5'})
res.extend(list(set(map(lambda x: x.find('div', class_='L').text, char_list2))))
char_list3 = char_list.find_all(attrs={'class': 'g C4星'})
res.extend(list(set(map(lambda x: x.find('div', class_='L').text, char_list3))))
res.sort()
return res
# 获取角色语音
async def get_voice_info(character_name: str):
logger.info('获取数据: %s' % character_name)
async with ClientSession() as session:
html = await session.get(BASE_URL + API['voice'] % character_name)
soup = BeautifulSoup(await html.text(), 'lxml')
if soup.find(text='本页面目前没有内容。您可以在其他页面中'):
return None
voice_list = soup.find_all(attrs={'class': 'visible-md'})[2:]
info_list = []
for item in voice_list:
item_tab = item.find_all(attrs={'class': ''})[1:]
if isinstance(item_tab[1].next, str):
return info_list
info_list.append({
'title': item_tab[0].text,
'text': item_tab[5].text,
'': item_tab[1].next.attrs.get('data-src', ''),
'': item_tab[2].next.attrs.get('data-src', ''),
'': item_tab[3].next.attrs.get('data-src', ''),
'': item_tab[4].next.attrs.get('data-src', ''),
})
return info_list
# 下载音频文件到本地
async def download(url, path):
async with ClientSession() as session:
res = await session.get(url, timeout=30)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(await res.read())
async def update_voice_data():
# 获取全部人物列表
char_list = await get_character_list()
for char in char_list:
info = await get_voice_info(char)
if not info:
continue
data = []
for v in info:
chn = ''
jap = ''
eng = ''
kor = ''
for language in config['voice_language']:
url = v[language]
if not url:
continue
path = str(Path() / language / char / Path(url).name)
out_path = OUT_PUT / path
out = str(out_path)
if not out_path.exists():
await download(url, out)
if language == '':
chn = path
elif language == '':
jap = path
elif language == '':
eng = path
elif language == '':
kor = path
logger.info('下载成功: %s -> %s' % (char, path))
data.append({
'title': v['title'],
'text': v['text'],
'chn': chn,
'jap': jap,
'eng': eng,
'kor': kor
})
# 存入数据库
db[char] = data
async def voice_list_by_mys():
url = 'https://api-static.mihoyo.com/common/blackboard/ys_obc/v1/home/content/list?app_sn=ys_obc&channel_id=84'
async with ClientSession() as session:
res = await session.get(url, timeout=30)
json_data = await res.json()
if json_data['retcode']:
raise Exception(json_data['message'])
try:
data_list = json_data['data']['list'][0]['list']
except KeyError as e:
raise Exception('获取语音列表失败, 请联系作者修复')
return {x['title'].split()[0]: x for x in data_list}
async def voice_detail_by_mys(content_id):
url = 'https://bbs.mihoyo.com/ys/obc/content/%s/detail?bbs_presentation_style=no_header' % content_id
async with ClientSession() as session:
res = await session.get(url, timeout=30)
soup = BeautifulSoup(await res.text(), 'lxml')
paragraph_box = soup.select('.obc-tmpl__paragraph-box')
return [{
'text': x.get_text(),
'chn': x.find('source').attrs['src']
} for x in paragraph_box]

304
Guess_voice/handler.py Normal file
View File

@ -0,0 +1,304 @@
import os
import random
import datetime
import json
from apscheduler.triggers.date import DateTrigger
from typing import List
from pathlib import Path
from nonebot import get_bot, require, logger
from nonebot.adapters.onebot.v11 import MessageSegment, escape
from sqlitedict import SqliteDict
from .util import get_path, require_file
from .download_data import voice_list_by_mys, voice_detail_by_mys
scheduler = require('nonebot_plugin_apscheduler').scheduler
# dir_data = os.path.join(os.path.dirname(__file__), 'data')
dir_data = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'data'
# data_path = os.path.join(os.path.dirname(__file__), 'voice')
data_path = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'voice'
# data2_path = os.path.join(os.path.dirname(__file__), 'voice2')
data2_path = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'voice2'
# if not os.path.exists(dir_data):
# os.makedirs(dir_data)
dir_data.mkdir(parents=True, exist_ok=True)
db = {}
def init_db(db_dir, db_name='db.sqlite', tablename='unnamed') -> SqliteDict:
db_cache_key = db_name + tablename
if db.get(db_cache_key):
return db[db_cache_key]
db[db_cache_key] = SqliteDict(get_path(db_dir, db_name),
tablename=tablename,
encode=json.dumps,
decode=json.loads,
autocommit=True)
return db[db_cache_key]
user_db = init_db('data', 'user.sqlite')
voice_db = init_db('data', 'voice.sqlite')
voice2_db = init_db('data', 'voice2.sqlite', tablename='voice')
process = {}
with open(os.path.join(os.path.dirname(__file__), 'character.json'), 'r', encoding="utf-8") as f:
character_json: dict = json.loads(f.read())
def get_voice_by_language(data, language_name):
if language_name == '':
return data['chn']
elif language_name == '':
return data['jap']
elif language_name == '':
return data['eng']
elif language_name == '':
return data['kor']
def char_name_by_name(name):
names = character_json.keys()
# 是否就是正确的名字
if name in names:
return name
#
for item in names:
nickname = character_json[item]
if name in nickname:
return item
return name
async def get_random_voice(name, language=''):
char_name = char_name_by_name(name)
if language == '':
# 如果是中文 则选择米游社的源
mys_list = await voice_list_by_mys()
char_voices = mys_list.get(char_name)
if not char_voices:
return
voice_list = await voice_detail_by_mys(char_voices['content_id'])
else:
voice_list = voice_db.get(char_name)
if not voice_list:
return
temp_voice_list = []
for v in voice_list:
voice_path = get_voice_by_language(v, language)
if voice_path:
temp_voice_list.append(voice_path)
if not temp_voice_list:
return
voice_path = random.choice(temp_voice_list)
if language == '':
# 如果是中文 则选择米游社的源
path = os.path.join(data_path, language, char_name, os.path.basename(voice_path))
await require_file(file=path, url=voice_path)
else:
path = os.path.join(data_path, voice_path)
return path
class Guess:
time: int
group_id: int
group = {}
retry_count = 0
def __init__(self, group_id: int, time=30):
self.time = time
self.group_id = group_id
self.group = process.get(self.group_id)
def is_start(self):
if not self.group:
return False
return self.group['start']
def set_start(self):
process[self.group_id] = {'start': True}
def set_end(self):
process[self.group_id] = {}
async def start(self, language: List[str] = None):
if not language:
language = ['']
# 随机一个语言
language = random.choice(language)
# 随机选择一个语音
if language == '':
# 如果是中文 则选择米游社的源
mys_list = await voice_list_by_mys()
answer = random.choice(list(mys_list.keys()))
try:
db_dict = {
answer: await
voice_detail_by_mys(mys_list[answer]['content_id'])
}
self.retry_count = 0
except KeyError as e:
if self.retry_count == 4:
self.retry_count = 0
raise Exception('获取语音数据失败')
self.retry_count += 1
return await self.start([language])
else:
answer = random.choice(list(voice_db.keys()))
db_dict = voice_db
temp_voice_list = []
for v in db_dict[answer]:
if not (answer in v['text']):
voice_path = get_voice_by_language(v, language)
if voice_path:
temp_voice_list.append(voice_path)
if not temp_voice_list:
logger.info('随机到了个哑巴,, 重新随机.. 如果反复出现这个 你应该检查一下数据库')
return await self.start([language])
voice_path = random.choice(temp_voice_list)
if language == '':
# 如果是中文 则选择米游社的源
path = os.path.join(data_path, language, answer, os.path.basename(voice_path))
await require_file(file=path, url=voice_path)
else:
path = os.path.join(data_path, voice_path)
# 记录答案
process[self.group_id] = {
'start': True,
'answer': answer,
'ok': set()
}
job_id = str(self.group_id) + '_guess_voice'
if scheduler.get_job(job_id, 'default'):
scheduler.remove_job(job_id, 'default')
now = datetime.datetime.now()
notify_time = now + datetime.timedelta(seconds=self.time)
scheduler.add_job(self.end_game, trigger=DateTrigger(notify_time),
id=job_id,
misfire_grace_time=60,
coalesce=True,
jobstore='default',
max_instances=1)
return MessageSegment.record(file=Path(path))
async def start2(self):
# hard mode
if not os.path.exists(data2_path):
print('请到github下载genshin_voice压缩包解压到 ' + data2_path)
raise Exception('困难模式语音文件夹不存在')
names = list(voice2_db.keys())
if not names:
raise Exception('数据库错误. 请重新下载数据库文件')
names.remove('')
names.remove('派蒙')
names.remove('旅行者')
answer = random.choice(names)
answer_info = None
while True:
info = random.choice(voice2_db[answer])
if answer not in info['text']:
answer_info = info
break
if not answer_info:
return await self.start2()
# 记录答案
process[self.group_id] = {
'start': True,
'answer': answer,
'ok': set()
}
job_id = str(self.group_id) + '_guess_voice'
if scheduler.get_job(job_id, 'default'):
scheduler.remove_job(job_id, 'default')
now = datetime.datetime.now()
notify_time = now + datetime.timedelta(seconds=self.time)
scheduler.add_job(self.end_game, trigger=DateTrigger(notify_time),
id=job_id,
misfire_grace_time=60,
coalesce=True,
jobstore='default',
max_instances=1)
path = os.path.join(data2_path, answer_info['file'] + '.mp3')
return MessageSegment.record(file=Path(path))
async def end_game(self):
self.group = process.get(self.group_id)
ok_list = list(process[self.group_id]['ok'])
if len(ok_list) > 1: # 只允许1个人猜对
return
if not ok_list:
msg = '还没有人猜中呢'
else:
msg = '回答正确的人: ' + ' '.join([str(MessageSegment.at(qq)) for qq in ok_list])
msg = '正确答案是 %s\n%s' % (self.group['answer'], msg)
try:
await get_bot().send_group_msg(group_id=self.group_id, message=msg)
except Exception as e:
logger.error(e)
# 清理记录
process[self.group_id] = {}
# 记录到数据库给之后奖励做处理
user_group = user_db.get(self.group_id, {})
if not user_group:
user_db[self.group_id] = {}
for user in ok_list:
info = user_group.get(str(user), {'count': 0})
info['count'] += 1
user_group[user] = info
user_db[self.group_id] = user_group
# 只添加正确的答案
async def add_answer(self, qq: int, msg: str):
if self.group.get('answer') and char_name_by_name(msg) == self.group['answer']:
process[self.group_id]['ok'].add(qq)
job_id = str(self.group_id) + '_guess_voice'
if scheduler.get_job(job_id, 'default'):
scheduler.remove_job(job_id, 'default')
await self.end_game()
# 获取排行榜
async def get_rank(self, bot, event):
user_list = user_db.get(self.group_id, {})
user_list = sorted(user_list.items(), key=lambda v: v[1]['count'])
user_list.reverse()
num = 0
msg = '本群猜语音排行榜:'
for user, data in user_list[:10]:
user = await bot.get_group_member_info(group_id=event.group_id, user_id=user)
num += 1
msg += f"\n{num}名: {escape(user['card']) or escape(user['nickname'])}, 猜对{data['count']}"
return msg

44
Guess_voice/util.py Normal file
View File

@ -0,0 +1,44 @@
import os
from aiohttp import ClientSession
from pathlib import Path
import aiofiles
# def get_path(*paths):
# return os.path.join(os.path.dirname(__file__), *paths)
def get_path(dirname, filename):
return Path() / 'data' / 'LittlePaimon' / 'guess_voice' / dirname / filename
async def require_file(file=None,
r_mode='rb',
encoding=None,
url=None,
use_cache=True,
w_mode='wb',
timeout=30):
async def read():
async with aiofiles.open(file, r_mode, encoding=encoding) as fp:
return await fp.read()
if not any([file, url]):
raise ValueError('file or url not null')
file = file and Path(file)
if file and file.exists() and use_cache:
return await read()
if not url:
raise ValueError('url not null')
async with ClientSession() as session:
res = await session.get(url, timeout=timeout)
content = await res.read()
if file:
os.makedirs(os.path.dirname(file), exist_ok=True)
async with aiofiles.open(file, w_mode, encoding=encoding) as fp:
await fp.write(content)
return content
return await read()

View File

@ -1,10 +1,10 @@
import datetime
from io import BytesIO
# from io import BytesIO
import os
from PIL import Image, ImageDraw, ImageFont
from aiohttp import ClientSession
# from aiohttp import ClientSession
from nonebot.adapters.onebot.v11 import MessageSegment
from ..utils.util import pil2b64
from ..utils.util import pil2b64, get_pic
res_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'res')
@ -113,60 +113,65 @@ async def draw_abyss_card(data, uid, floor_num):
top_img.alpha_composite(role_img, (width, 165))
width += 150
defeat_rank = data['defeat_rank'][0]
if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{defeat_rank['avatar_id']}.png")):
async with ClientSession() as session:
defeat_rank_img = await (await session.get(defeat_rank['avatar_icon'])).read()
defeat_rank_img = Image.open(BytesIO(defeat_rank_img)).convert("RGBA").resize((60, 60))
defeat_rank_img.save(os.path.join(res_path, 'role_side_card', f"{defeat_rank['avatar_id']}.png"))
else:
defeat_rank_img = Image.open(os.path.join(res_path, 'role_side_card', f"{defeat_rank['avatar_id']}.png"))
# if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{defeat_rank['avatar_id']}.png")):
# async with ClientSession() as session:
# defeat_rank_img = await (await session.get(defeat_rank['avatar_icon'])).read()
# defeat_rank_img = Image.open(BytesIO(defeat_rank_img)).convert("RGBA").resize((60, 60))
# defeat_rank_img.save(os.path.join(res_path, 'role_side_card', f"{defeat_rank['avatar_id']}.png"))
# else:
# defeat_rank_img = Image.open(os.path.join(res_path, 'role_side_card', f"{defeat_rank['avatar_id']}.png"))
defeat_rank_img = await get_pic(defeat_rank['avatar_icon'], (60, 60), 'RGBA')
top_draw.text((160, 343), str(defeat_rank['value']), font=get_font(21), fill='white')
top_img.alpha_composite(defeat_rank_img, (280, 320))
damage_rank = data['damage_rank'][0]
if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{damage_rank['avatar_id']}.png")):
async with ClientSession() as session:
damage_rank_img = await (await session.get(damage_rank['avatar_icon'])).read()
damage_rank_img = Image.open(BytesIO(damage_rank_img)).convert("RGBA").resize((60, 60))
damage_rank_img.save(os.path.join(res_path, 'role_side_card', f"{damage_rank['avatar_id']}.png"))
else:
damage_rank_img = Image.open(os.path.join(res_path, 'role_side_card', f"{damage_rank['avatar_id']}.png"))
# if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{damage_rank['avatar_id']}.png")):
# async with ClientSession() as session:
# damage_rank_img = await (await session.get(damage_rank['avatar_icon'])).read()
# damage_rank_img = Image.open(BytesIO(damage_rank_img)).convert("RGBA").resize((60, 60))
# damage_rank_img.save(os.path.join(res_path, 'role_side_card', f"{damage_rank['avatar_id']}.png"))
# else:
# damage_rank_img = Image.open(os.path.join(res_path, 'role_side_card', f"{damage_rank['avatar_id']}.png"))
damage_rank_img = await get_pic(damage_rank['avatar_icon'], (60, 60), 'RGBA')
top_draw.text((495, 343), str(damage_rank['value']), font=get_font(21), fill='white')
top_img.alpha_composite(damage_rank_img, (590, 320))
take_damage_rank = data['take_damage_rank'][0]
if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{take_damage_rank['avatar_id']}.png")):
async with ClientSession() as session:
take_damage_rank_img = await (await session.get(take_damage_rank['avatar_icon'])).read()
take_damage_rank_img = Image.open(BytesIO(take_damage_rank_img)).convert("RGBA").resize((60, 60))
take_damage_rank_img.save(os.path.join(res_path, 'role_side_card', f"{take_damage_rank['avatar_id']}.png"))
else:
take_damage_rank_img = Image.open(
os.path.join(res_path, 'role_side_card', f"{take_damage_rank['avatar_id']}.png"))
# if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{take_damage_rank['avatar_id']}.png")):
# async with ClientSession() as session:
# take_damage_rank_img = await (await session.get(take_damage_rank['avatar_icon'])).read()
# take_damage_rank_img = Image.open(BytesIO(take_damage_rank_img)).convert("RGBA").resize((60, 60))
# take_damage_rank_img.save(os.path.join(res_path, 'role_side_card', f"{take_damage_rank['avatar_id']}.png"))
# else:
# take_damage_rank_img = Image.open(
# os.path.join(res_path, 'role_side_card', f"{take_damage_rank['avatar_id']}.png"))
take_damage_rank_img = await get_pic(take_damage_rank['avatar_icon'], (60, 60), 'RGBA')
top_draw.text((180, 389), str(take_damage_rank['value']), font=get_font(21), fill='white')
top_img.alpha_composite(take_damage_rank_img, (280, 365))
energy_skill_rank = data['energy_skill_rank'][0]
if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{energy_skill_rank['avatar_id']}.png")):
async with ClientSession() as session:
energy_skill_rank_img = await (await session.get(energy_skill_rank['avatar_icon'])).read()
energy_skill_rank_img = Image.open(BytesIO(energy_skill_rank_img)).convert("RGBA").resize((60, 60))
energy_skill_rank_img.save(os.path.join(res_path, 'role_side_card', f"{energy_skill_rank['avatar_id']}.png"))
else:
energy_skill_rank_img = Image.open(
os.path.join(res_path, 'role_side_card', f"{energy_skill_rank['avatar_id']}.png"))
# if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{energy_skill_rank['avatar_id']}.png")):
# async with ClientSession() as session:
# energy_skill_rank_img = await (await session.get(energy_skill_rank['avatar_icon'])).read()
# energy_skill_rank_img = Image.open(BytesIO(energy_skill_rank_img)).convert("RGBA").resize((60, 60))
# energy_skill_rank_img.save(os.path.join(res_path, 'role_side_card', f"{energy_skill_rank['avatar_id']}.png"))
# else:
# energy_skill_rank_img = Image.open(
# os.path.join(res_path, 'role_side_card', f"{energy_skill_rank['avatar_id']}.png"))
energy_skill_rank_img = await get_pic(energy_skill_rank['avatar_icon'], (60, 60), 'RGBA')
top_draw.text((530, 389), str(energy_skill_rank['value']), font=get_font(21), fill='white')
top_img.alpha_composite(energy_skill_rank_img, (590, 365))
normal_skill_rank = data['normal_skill_rank'][0]
if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{normal_skill_rank['avatar_id']}.png")):
async with ClientSession() as session:
normal_skill_rank_img = await (await session.get(normal_skill_rank['avatar_icon'])).read()
normal_skill_rank_img = Image.open(BytesIO(normal_skill_rank_img)).convert("RGBA").resize((60, 60))
normal_skill_rank_img.save(os.path.join(res_path, 'role_side_card', f"{normal_skill_rank['avatar_id']}.png"))
else:
normal_skill_rank_img = Image.open(
os.path.join(res_path, 'role_side_card', f"{normal_skill_rank['avatar_id']}.png"))
# if not os.path.exists(os.path.join(res_path, 'role_side_card', f"{normal_skill_rank['avatar_id']}.png")):
# async with ClientSession() as session:
# normal_skill_rank_img = await (await session.get(normal_skill_rank['avatar_icon'])).read()
# normal_skill_rank_img = Image.open(BytesIO(normal_skill_rank_img)).convert("RGBA").resize((60, 60))
# normal_skill_rank_img.save(os.path.join(res_path, 'role_side_card', f"{normal_skill_rank['avatar_id']}.png"))
# else:
# normal_skill_rank_img = Image.open(
# os.path.join(res_path, 'role_side_card', f"{normal_skill_rank['avatar_id']}.png"))
normal_skill_rank_img = await get_pic(normal_skill_rank['avatar_icon'], (60, 60), 'RGBA')
top_draw.text((195, 435), str(normal_skill_rank['value']), font=get_font(21), fill='white')
top_img.alpha_composite(normal_skill_rank_img, (280, 410))

View File

@ -1,12 +1,12 @@
import random
import datetime
from io import BytesIO
# from io import BytesIO
import os
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
from aiohttp import ClientSession
# from aiohttp import ClientSession
from nonebot.adapters.onebot.v11 import MessageSegment
from ..utils.util import pil2b64
from ..utils.util import pil2b64, get_pic
res_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'res')
@ -157,14 +157,6 @@ def get_font(size, font='msyhbd.ttc'):
return ImageFont.truetype(os.path.join(res_path, font), size)
async def get_avatar_pic(avatar_url):
async with ClientSession() as session:
res = await session.get(avatar_url)
res = await res.read()
avatar = Image.open(BytesIO(res)).convert("RGBA").resize((135, 135))
return avatar
async def draw_ring(per):
per_list = [per, 1 - per]
colors = ['#507bd0', '#FFFFFF']
@ -267,7 +259,7 @@ async def draw_daily_note_card(data, uid):
exp = data['expeditions']
i = 0
for role in exp:
role_avatar = await get_avatar_pic(role['avatar_side_icon'])
role_avatar = await get_pic(role['avatar_side_icon'], (135, 135), 'RGBA')
bg_img.alpha_composite(role_avatar, (i * 200 + 168, 1537))
bg_img.alpha_composite(await draw_ring(1 - int(role['remained_time']) / 72000), (i * 201 + 101, 1490))
if role['status'] == 'Ongoing':
@ -293,8 +285,7 @@ async def draw_daily_note_card(data, uid):
bg_draw.text((1408, 1588), last_finish_str, fill="#5680d2",
font=get_font(60, '优设标题黑.ttf'))
role_img = random.choice(os.listdir(os.path.join(res_path, 'emoticons')))
role_img = Image.open(os.path.join(res_path, 'emoticons', role_img)).convert('RGBA')
role_img = Image.open(os.path.join(res_path, 'emoticons', random.choice(os.listdir(os.path.join(res_path, 'emoticons'))))).convert('RGBA')
role_img = role_img.resize((int(role_img.size[0] * 3.5), int(role_img.size[1] * 3.5)), Image.ANTIALIAS)
bg_img.alpha_composite(role_img, (1220, 200))
now = datetime.datetime.now().strftime('%m月%d%H:%M')

View File

@ -2,7 +2,7 @@ import random
from PIL import Image, ImageDraw, ImageFont
import os
import matplotlib.pyplot as plt
from io import BytesIO
# from io import BytesIO
from ..utils.util import pil2b64
from nonebot.adapters.onebot.v11 import MessageSegment
@ -30,31 +30,17 @@ async def get_box(t, num):
return box
async def draw_circle(per, colors):
async def draw_ring(per, colors):
plt.pie(per, startangle=90, colors=colors)
fig, ax = plt.subplots(figsize=(6, 6))
ax.pie(per,
wedgeprops={'width': 0.29},
startangle=90,
colors=colors)
buffer = BytesIO()
canvas = plt.get_current_fig_manager().canvas
canvas.draw()
pil_image = Image.frombytes('RGB', canvas.get_width_height(),
canvas.tostring_rgb())
pil_image.save(buffer, 'PNG')
plt.close()
pil_image = pil_image.convert('RGBA')
w, h = pil_image.size
array = pil_image.load()
for i in range(w):
for j in range(h):
pos = array[i, j]
isEdit = (sum([1 for x in pos[0:3] if x > 240]) == 3)
if isEdit:
array[i, j] = (255, 255, 255, 0)
return pil_image.resize((378, 378))
plt.savefig('temp.png', transparent=True)
img = Image.open('temp.png').resize((378, 378)).convert('RGBA')
os.remove('temp.png')
return img
async def draw_monthinfo_card(data):
@ -87,11 +73,10 @@ async def draw_monthinfo_card(data):
bg_img.alpha_composite(await get_box('原石', data['day_data']['current_primogems']), (40, 328))
bg_img.alpha_composite(await get_box('摩拉', data['day_data']['current_mora']), (40, 388))
# 表情
emoticons_list = ['丽莎-干得漂亮', '九条裟罗-开心', '五郎-开心', '优菈-赞扬', '刻晴-点赞', '可莉-好耶', '宵宫-得意', '枫原万叶-偷笑', '派蒙-出货吧', '温迪-期待',
'珊瑚宫心海-祈祷', '琴-赞扬', '神里绫华-偷笑', '胡桃-爱心', '荒泷一斗-大笑', '钟离-我全都要', '雷电将军-轻笑']
emoticon1 = Image.open(os.path.join(res_path, 'emoticons', f'{random.choice(emoticons_list)}.png')).convert('RGBA')
emoticon1 = Image.open(os.path.join(res_path, 'emoticons', random.choice(os.listdir(os.path.join(res_path, 'emoticons'))))).convert('RGBA')
bg_img.alpha_composite(emoticon1, (360, 140))
emoticon2 = Image.open(os.path.join(res_path, 'emoticons', f'{random.choice(emoticons_list)}.png')).convert('RGBA')
emoticon2 = Image.open(os.path.join(res_path, 'emoticons', random.choice(os.listdir(os.path.join(res_path, 'emoticons'))))).convert('RGBA')
bg_img.alpha_composite(emoticon2, (360, 317))
bg_img.alpha_composite(line, (64, 480))
@ -106,7 +91,7 @@ async def draw_monthinfo_card(data):
name_list = [x['action'] for x in data['month_data']['group_by']]
num_list = [x['num'] for x in data['month_data']['group_by']]
color_list = [color[x] for x in name_list]
bg_img.alpha_composite(await draw_circle(per_list, color_list), (-12, 489))
bg_img.alpha_composite(await draw_ring(per_list, color_list), (-12, 489))
# 百分比描述
h = 550
for name in name_list:

View File

@ -1,9 +1,9 @@
import os, random, re
from PIL import Image, ImageDraw, ImageFont
from ..utils.util import pil2b64
from ..utils.util import pil2b64, get_pic
from nonebot.adapters.onebot.v11 import MessageSegment
from aiohttp import ClientSession
from io import BytesIO
# from aiohttp import ClientSession
# from io import BytesIO
import copy
res_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'res')
@ -43,13 +43,14 @@ async def get_chara_card(data):
weapon_bg = Image.open(os.path.join(res_path, 'player_card', f'{data["weapon"]["rarity"]}星武器.png'))
chara_card.alpha_composite(weapon_bg, (0, 227))
# 武器图标
weapon_name = data['weapon']['icon'].split('/')[-1]
if not os.path.exists(os.path.join(res_path, 'weapon', weapon_name)):
async with ClientSession() as session:
weapon_icon = await (await session.get(data['weapon']['icon'])).read()
weapon_icon = Image.open(BytesIO(weapon_icon)).convert("RGBA")
weapon_icon.save(os.path.join(res_path, 'weapon', weapon_name))
weapon_icon = Image.open(os.path.join(res_path, 'weapon', weapon_name)).resize((63, 63))
# weapon_name = data['weapon']['icon'].split('/')[-1]
# if not os.path.exists(os.path.join(res_path, 'weapon', weapon_name)):
# async with ClientSession() as session:
# weapon_icon = await (await session.get(data['weapon']['icon'])).read()
# weapon_icon = Image.open(BytesIO(weapon_icon)).convert("RGBA")
# weapon_icon.save(os.path.join(res_path, 'weapon', weapon_name))
# weapon_icon = Image.open(os.path.join(res_path, 'weapon', weapon_name)).resize((63, 63))
weapon_icon = await get_pic(data['weapon']['icon'], (63, 63), 'RGBA')
chara_card.alpha_composite(weapon_icon, (0, 230))
# 等级信息
chara_draw = ImageDraw.Draw(chara_card)
@ -273,13 +274,14 @@ async def get_chara_card_long(data):
weapon_bg = Image.open(os.path.join(res_path, 'player_card', f'{data["weapon"]["rarity"]}星武器.png'))
chara_card.alpha_composite(weapon_bg, (3, 288))
# 武器图标
weapon_name = data['weapon']['icon'].split('/')[-1]
if not os.path.exists(os.path.join(res_path, 'weapon', weapon_name)):
async with ClientSession() as session:
weapon_icon = await (await session.get(data['weapon']['icon'])).read()
weapon_icon = Image.open(BytesIO(weapon_icon)).convert("RGBA")
weapon_icon.save(os.path.join(res_path, 'weapon', weapon_name))
weapon_icon = Image.open(os.path.join(res_path, 'weapon', weapon_name)).resize((62, 62))
# weapon_name = data['weapon']['icon'].split('/')[-1]
# if not os.path.exists(os.path.join(res_path, 'weapon', weapon_name)):
# async with ClientSession() as session:
# weapon_icon = await (await session.get(data['weapon']['icon'])).read()
# weapon_icon = Image.open(BytesIO(weapon_icon)).convert("RGBA")
# weapon_icon.save(os.path.join(res_path, 'weapon', weapon_name))
# weapon_icon = Image.open(os.path.join(res_path, 'weapon', weapon_name)).resize((62, 62))
weapon_icon = await get_pic(data['weapon']['icon'], (62, 62), 'RGBA')
chara_card.alpha_composite(weapon_icon, (3, 291))
# 等级信息
chara_draw = ImageDraw.Draw(chara_card)
@ -345,13 +347,14 @@ shadow = Image.open(os.path.join(res_path, 'other', 'shadow.png'))
async def draw_reli_icon(data):
base_icon = Image.open(os.path.join(res_path, 'other', f'star{data["rarity"]}.png')).resize((80, 80))
if not os.path.exists(os.path.join(res_path, 'reliquaries', f'{data["id"]}.png')):
async with ClientSession() as session:
icon = await (await session.get(data["icon"])).read()
icon = Image.open(BytesIO(icon)).convert("RGBA").resize((80, 80))
icon.save(os.path.join(res_path, 'reliquaries', f'{data["id"]}.png'))
else:
icon = Image.open(os.path.join(res_path, 'reliquaries', f'{data["id"]}.png')).resize((80, 80))
# if not os.path.exists(os.path.join(res_path, 'reliquaries', f'{data["id"]}.png')):
# async with ClientSession() as session:
# icon = await (await session.get(data["icon"])).read()
# icon = Image.open(BytesIO(icon)).convert("RGBA").resize((80, 80))
# icon.save(os.path.join(res_path, 'reliquaries', f'{data["id"]}.png'))
# else:
# icon = Image.open(os.path.join(res_path, 'reliquaries', f'{data["id"]}.png')).resize((80, 80))
icon = await get_pic(data["icon"], (80, 80), 'RGBA')
base_icon.alpha_composite(icon, (0, 0))
base_icon.alpha_composite(shadow, (40, 60))
base_icon_draw = ImageDraw.Draw(base_icon)
@ -361,13 +364,14 @@ async def draw_reli_icon(data):
async def draw_const_skill_icon(data, name):
base_icon = Image.open(os.path.join(res_path, 'other', '命座.png')).resize((65, 65))
if not os.path.exists(os.path.join(res_path, 'skill_constellations', f'{name}{data["id"]}.png')):
async with ClientSession() as session:
icon = await (await session.get(data["icon"])).read()
icon = Image.open(BytesIO(icon)).convert("RGBA").resize((65, 65))
icon.save(os.path.join(res_path, 'reliquaries', f'{name}{data["id"]}.png'))
else:
icon = Image.open(os.path.join(res_path, 'reliquaries', f'{name}{data["id"]}.png')).resize((65, 65))
# if not os.path.exists(os.path.join(res_path, 'skill_constellations', f'{name}{data["id"]}.png')):
# async with ClientSession() as session:
# icon = await (await session.get(data["icon"])).read()
# icon = Image.open(BytesIO(icon)).convert("RGBA").resize((65, 65))
# icon.save(os.path.join(res_path, 'reliquaries', f'{name}{data["id"]}.png'))
# else:
# icon = Image.open(os.path.join(res_path, 'reliquaries', f'{name}{data["id"]}.png')).resize((65, 65))
icon = await get_pic(data["icon"], (65, 65), 'RGBA')
base_icon.alpha_composite(icon, (0, 0))
if 'is_actived' in data and not data['is_actived']:
unlock_icon = Image.open(os.path.join(res_path, 'other', '命座未解锁.png')).resize((65, 65))

View File

@ -1,140 +1,23 @@
from aiohttp import ClientSession
from urllib.parse import quote
from typing import Union
from nonebot import on_command, on_regex, get_driver
from nonebot.params import CommandArg, RegexGroup
from nonebot.message import event_preprocessor
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, MessageEvent, MessageSegment, FriendRequestEvent, \
GroupRequestEvent
from ..utils.util import FreqLimiter, auto_withdraw
from ..utils.config import config
from asyncio import sleep
import random
# 点餐功能
from .order import *
superuser = int(list(get_driver().config.superusers)[0])
# 新闻功能
from .news import *
duilian = on_command('对联', aliases={'对对联'}, priority=13, block=True)
cat_pic = on_command('猫图', aliases={'来点猫片', '看看猫猫', '来个猫猫'}, priority=13, block=True)
ecy_pic = on_regex(r'^来点(二次元|二刺螈|银发|兽耳|星空|竖屏|横屏)图?$', priority=13, block=True)
ys_pic = on_command('原神壁纸', aliases={'来点原神图', '来点原神壁纸'}, priority=13, block=True)
# 随机图片功能
from .random_img import *
duilian_limit = FreqLimiter(config.paimon_duilian_cd)
cat_lmt = FreqLimiter(config.paimon_cat_cd)
ecy_lmt = FreqLimiter(config.paimon_ecy_cd)
ys_lmt = FreqLimiter(config.paimon_ysp_cd)
# 处理好友和群请求功能
from .auto_handle import *
# 对联功能
from .couplets import *
@duilian.handle()
async def duilian_handler(event: Union[GroupMessageEvent, MessageEvent], msg=CommandArg()):
if not msg:
await duilian.finish('请输入对联内容')
if not duilian_limit.check(event.group_id or event.user_id):
await duilian.finish(f'对联冷却ing(剩余{duilian_limit.left_time(event.group_id or event.user_id)}秒)')
else:
msg = str(msg).split(' ')
word = msg[0].strip()
try:
num = int(msg[1]) if len(msg) > 1 else 1
except:
num = 1
num = num if num < 10 else 10
duilian_limit.start_cd(event.group_id or event.user_id, config.paimon_duilian_cd)
text = quote(str(word))
url = f'https://ai-backend.binwang.me/v0.2/couplet/{text}'
async with ClientSession() as session:
res = await session.get(url)
res = await res.json()
result = ''
for n in range(0, num):
result += res['output'][n] + '\n'
await duilian.finish(f'上联:{word}\n下联:{result}')
# 如果不需要某项功能将其from xx import * 注释掉即可
@cat_pic.handle()
async def cat_pic_handler(event: Union[GroupMessageEvent, MessageEvent]):
if not cat_lmt.check(event.group_id or event.user_id):
await cat_pic.finish(f'猫片冷却ing(剩余{cat_lmt.left_time(event.group_id or event.user_id)}秒)')
else:
await cat_pic.send('派蒙努力找图ing..请稍候...')
cat_lmt.start_cd(event.group_id or event.user_id, config.paimon_cat_cd)
url = 'http://edgecats.net/'
await cat_pic.finish(MessageSegment.image(file=url))
@ecy_pic.handle()
@auto_withdraw(15)
async def ecy_pic_handler(bot: Bot, event: Union[GroupMessageEvent, MessageEvent], regexGroup=RegexGroup()):
urls = [
'https://www.dmoe.cc/random.php',
'https://acg.toubiec.cn/random.php',
'https://api.ixiaowai.cn/api/api.php',
'https://iw233.cn/api.php?sort=iw233'
]
img_type = regexGroup[0]
if img_type in ['二次元', '二刺螈']:
url = random.choice(urls)
elif img_type == '银发':
url = 'https://iw233.cn/api.php?sort=yin'
elif img_type == '兽耳':
url = 'https://iw233.cn/api.php?sort=cat'
elif img_type == '星空':
url = 'https://iw233.cn/api.php?sort=xing'
elif img_type == '竖屏':
url = 'https://iw233.cn/api.php?sort=mp'
elif img_type == '横屏':
url = 'https://iw233.cn/api.php?sort=pc'
else:
url = ''
if not ecy_lmt.check(event.group_id or event.user_id):
await ecy_pic.finish(f'二次元图片冷却ing(剩余{ecy_lmt.left_time(event.group_id or event.user_id)}秒)')
elif url:
await cat_pic.send('派蒙努力找图ing..请稍候...')
ecy_lmt.start_cd(event.group_id or event.user_id, config.paimon_ecy_cd)
return await cat_pic.send(MessageSegment.image(file=url))
@ys_pic.handle()
@auto_withdraw(30)
async def ys_pic_handler(event: Union[GroupMessageEvent, MessageEvent]):
urls = [
'https://api.r10086.com/img-api.php?type=%E5%8E%9F%E7%A5%9E%E6%A8%AA%E5%B1%8F%E7%B3%BB%E5%88%971',
'https://api.r10086.com/img-api.php?type=%E5%8E%9F%E7%A5%9E%E7%AB%96%E5%B1%8F%E7%B3%BB%E5%88%971',
'https://api.dujin.org/pic/yuanshen/',
'https://api.dreamofice.cn/random-v0/img.php?game=ys'
]
if not ys_lmt.check(event.group_id or event.user_id):
await ys_pic.finish(f'原神壁纸冷却ing(剩余{ys_lmt.left_time(event.group_id or event.user_id)}秒)')
else:
await cat_pic.send('派蒙努力找图ing..请稍候...')
ys_lmt.start_cd(event.group_id or event.user_id, config.paimon_ysp_cd)
await ys_pic.finish(MessageSegment.image(file=random.choice(urls)))
@event_preprocessor
async def addFriend(bot: Bot, event: FriendRequestEvent):
superuser_msg = f'{event.user_id}请求添加派蒙为好友, 验证信息为:{event.comment}'
if config.paimon_add_friend:
superuser_msg += ',已自动同意'
await sleep(random.randint(2, 4))
await event.approve(bot)
await sleep(random.randint(3, 6))
await bot.send_private_msg(user_id=event.user_id, message=f'旅行者你好呀,这里是小派蒙,发送/help查看帮助哦')
else:
superuser_msg += ',请主人自行处理哦'
await bot.send_private_msg(user_id=superuser, message=superuser_msg)
@event_preprocessor
async def addFriend(bot: Bot, event: GroupRequestEvent):
if event.sub_type != 'invite':
return
superuser_msg = f'{event.user_id}邀请派蒙加入群{event.group_id}'
if config.paimon_add_group or event.user_id == superuser:
superuser_msg += ',已自动同意'
await sleep(random.randint(2, 4))
await event.approve(bot)
await sleep(random.randint(3, 6))
await bot.send_group_msg(group_id=event.group_id, message=f'旅行者们大家好呀,这里是小派蒙,发送/help查看帮助哦')
else:
superuser_msg += ',请主人自行处理哦'
await bot.send_private_msg(user_id=superuser, message=superuser_msg)

View File

@ -0,0 +1,38 @@
import random
from asyncio import sleep
from ..utils.config import config
from nonebot import get_driver
from nonebot.message import event_preprocessor
from nonebot.adapters.onebot.v11 import Bot, FriendRequestEvent, GroupRequestEvent
superuser = int(list(get_driver().config.superusers)[0])
@event_preprocessor
async def addFriend(bot: Bot, event: FriendRequestEvent):
superuser_msg = f'{event.user_id}请求添加派蒙为好友, 验证信息为:{event.comment}'
if config.paimon_add_friend:
superuser_msg += ',已自动同意'
await sleep(random.randint(2, 4))
await event.approve(bot)
await sleep(random.randint(3, 6))
await bot.send_private_msg(user_id=event.user_id, message=f'旅行者你好呀,这里是小派蒙,发送/help查看帮助哦')
else:
superuser_msg += ',请主人自行处理哦'
await bot.send_private_msg(user_id=superuser, message=superuser_msg)
@event_preprocessor
async def addGroup(bot: Bot, event: GroupRequestEvent):
if event.sub_type != 'invite':
return
superuser_msg = f'{event.user_id}邀请派蒙加入群{event.group_id}'
if config.paimon_add_group or event.user_id == superuser:
superuser_msg += ',已自动同意'
await sleep(random.randint(2, 4))
await event.approve(bot)
await sleep(random.randint(3, 6))
await bot.send_group_msg(group_id=event.group_id, message=f'旅行者们大家好呀,这里是小派蒙,发送/help查看帮助哦')
else:
superuser_msg += ',请主人自行处理哦'
await bot.send_private_msg(user_id=superuser, message=superuser_msg)

View File

@ -0,0 +1,37 @@
from aiohttp import ClientSession
from urllib.parse import quote
from nonebot import on_command
from nonebot.params import CommandArg
from nonebot.adapters.onebot.v11 import MessageEvent
from ..utils.util import FreqLimiter, get_id
from ..utils.config import config
couplets = on_command('对联', aliases={'对对联'}, priority=13, block=True)
couplets_limit = FreqLimiter(config.paimon_couplets_cd)
@couplets.handle()
async def couplets_handler(event: MessageEvent, msg=CommandArg()):
if not msg:
await couplets.finish('请输入对联内容')
if not couplets_limit.check(get_id(event)):
await couplets.finish(f'对联冷却ing(剩余{couplets_limit.left_time(get_id(event))}秒)')
else:
msg = str(msg).split(' ')
word = msg[0].strip()
try:
num = int(msg[1]) if len(msg) > 1 else 1
except:
num = 1
num = num if num < 10 else 10
couplets_limit.start_cd(get_id(event), config.paimon_couplets_cd)
text = quote(str(word))
url = f'https://ai-backend.binwang.me/v0.2/couplet/{text}'
async with ClientSession() as session:
res = await session.get(url)
res = await res.json()
result = ''
for n in range(0, num):
result += res['output'][n] + '\n'
await couplets.finish(f'上联:{word}\n下联:{result}')

83
Paimon_Plugins/news.py Normal file
View File

@ -0,0 +1,83 @@
import re
from aiohttp import ClientSession
from nonebot import on_command, require, get_bot, logger
from nonebot.params import CommandArg
from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment
from ..utils.util import load_data, save_data, get_id
news60s_pic = on_command('早报', aliases={'今日早报', '今日新闻', '60s读世界'}, priority=13, block=True)
scheduler = require('nonebot_plugin_apscheduler').scheduler
@news60s_pic.handle()
async def news60s_pic_handler(event: MessageEvent, msg=CommandArg()):
msg = str(msg).strip()
if not msg:
url = 'https://api.iyk0.com/60s/'
async with ClientSession() as session:
res = await session.get(url)
res = await res.json()
await news60s_pic.finish(MessageSegment.image(file=res['imageUrl']))
elif msg.startswith('开启推送'):
# 匹配msg中的xx:xx时间
time_str = re.search(r'(\d{2}):(\d{2})', msg)
if time_str:
push_data = load_data('news60s_push.json')
push_id = str(get_id(event))
push_data[push_id] = {
'type': event.message_type,
'hour': int(time_str.group(1)),
'minute': int(time_str.group(2))
}
if event.message_type == 'guild':
push_data[push_id]['guild_id'] = event.guild_id
scheduler.add_job(
func=news60s_push_task,
trigger='cron',
hour=int(time_str.group(1)),
minute=int(time_str.group(2)),
id=str(get_id(event)),
args=(str(get_id(event)),
push_data[str(get_id(event))])
)
save_data(push_data, 'news60s_push.json')
await news60s_pic.finish('开启60s读世界推送成功', at_sender=True)
else:
await news60s_pic.finish('请给出正确的时间格式为12:00', at_sender=True)
elif msg.startswith('关闭推送'):
push_data = load_data('news60s_push.json')
del push_data[str(event.group_id or event.channel_id or event.user_id)]
scheduler.remove_job(str(get_id(event)))
save_data(push_data, 'news60s_push.json')
await news60s_pic.finish('关闭60s读世界推送成功', at_sender=True)
async def news60s_push_task(push_id, push_data: dict):
try:
url = 'https://api.iyk0.com/60s/'
async with ClientSession() as session:
res = await session.get(url)
res = await res.json()
if push_data['type'] == 'group':
await get_bot().send_group_msg(group_id=push_id, message=MessageSegment.image(file=res['imageUrl']))
elif push_data['type'] == 'private':
await get_bot().send_private_msg(user_id=push_id, message=MessageSegment.image(file=res['imageUrl']))
elif push_data['type'] == 'guild':
await get_bot().send_guild_channel_msg(guild_id=push_data['guild_id'], channel_id=push_id,
message=MessageSegment.image(file=res['imageUrl']))
logger.info(f'{push_data["type"]}{push_id}的60秒读世界推送成功')
except Exception as e:
logger.exception(f'{push_data["type"]}{push_id}的60秒读世界推送失败{e}')
for push_id, push_data in load_data('news60s_push.json').items():
scheduler.add_job(
func=news60s_push_task,
trigger='cron',
hour=push_data['hour'],
minute=push_data['minute'],
id=push_id,
args=(push_id,
push_data)
)

36
Paimon_Plugins/order.py Normal file
View File

@ -0,0 +1,36 @@
import random
from aiohttp import ClientSession
from nonebot import on_command
from nonebot.params import CommandArg
from nonebot.adapters.onebot.v11 import Message, MessageEvent, MessageSegment
from ..utils.util import FreqLimiter
order_pic = on_command('点菜', aliases={'点餐', '食谱', '我想吃'}, priority=13, block=True)
order_lmt = FreqLimiter(10)
@order_pic.handle()
async def order_pic_handler(event: MessageEvent, msg=CommandArg()):
msg = str(msg).strip()
if not msg:
return
if not order_lmt.check(event.user_id):
await order_pic.finish(f'点餐冷却ing(剩余{order_lmt.left_time(event.user_id)}秒)')
else:
url = 'https://api.iyk0.com/shipu/?key=' + msg
async with ClientSession() as session:
res = await session.get(url)
res = await res.json()
if res['code'] == 202:
await order_pic.finish('没有找到这种食品哦~')
order_lmt.start_cd(event.user_id, 10)
number = random.randint(1, 3)
recipe_list = []
for i in range(0, number):
recipe = random.choice(res['data'])
if recipe not in recipe_list:
recipe_list.append(recipe)
mes = Message()
for recipe in recipe_list:
mes += MessageSegment.text(recipe['name'] + '\n') + MessageSegment.image(recipe['img']) + '\n'
await order_pic.finish(mes, at_sender=True)

View File

@ -0,0 +1,72 @@
import random
from nonebot import on_command, on_regex
from nonebot.params import RegexGroup
from nonebot.adapters.onebot.v11 import Bot, MessageEvent, MessageSegment
from ..utils.config import config
from ..utils.util import FreqLimiter, auto_withdraw, get_id
cat_lmt = FreqLimiter(config.paimon_cat_cd)
ecy_lmt = FreqLimiter(config.paimon_ecy_cd)
ys_lmt = FreqLimiter(config.paimon_ysp_cd)
cat_img = on_command('猫图', aliases={'来点猫片', '看看猫猫', '来个猫猫'}, priority=13, block=True)
ecy_img = on_regex(r'^来点(二次元|二刺螈|银发|兽耳|星空|竖屏|横屏)图?$', priority=13, block=True)
ys_img = on_command('原神壁纸', aliases={'来点原神图', '来点原神壁纸'}, priority=13, block=True)
@cat_img.handle()
async def cat_img_handler(event: MessageEvent):
if not cat_lmt.check(get_id(event)):
await cat_img.finish(f'猫片冷却ing(剩余{cat_lmt.left_time(get_id(event))}秒)')
else:
await cat_img.send('派蒙努力找图ing..请稍候...')
cat_lmt.start_cd(get_id(event), config.paimon_cat_cd)
url = 'http://edgecats.net/'
await cat_img.finish(MessageSegment.image(file=url))
@ecy_img.handle()
@auto_withdraw(15)
async def ecy_img_handler(bot: Bot, event: MessageEvent, regexGroup=RegexGroup()):
urls = [
'https://www.dmoe.cc/random.php',
'https://acg.toubiec.cn/random.php',
'https://api.ixiaowai.cn/api/api.php',
'https://iw233.cn/api.php?sort=iw233'
]
img_type = regexGroup[0]
if img_type in ['二次元', '二刺螈']:
url = random.choice(urls)
elif img_type == '银发':
url = 'https://iw233.cn/api.php?sort=yin'
elif img_type == '兽耳':
url = 'https://iw233.cn/api.php?sort=cat'
elif img_type == '星空':
url = 'https://iw233.cn/api.php?sort=xing'
elif img_type == '竖屏':
url = 'https://iw233.cn/api.php?sort=mp'
elif img_type == '横屏':
url = 'https://iw233.cn/api.php?sort=pc'
else:
url = ''
if not ecy_lmt.check(get_id(event)):
await ecy_img.finish(f'二次元图片冷却ing(剩余{ecy_lmt.left_time(get_id(event))}秒)')
elif url:
await ecy_img.send('派蒙努力找图ing..请稍候...')
ecy_lmt.start_cd(get_id(event), config.paimon_ecy_cd)
return await ecy_img.send(MessageSegment.image(file=url))
@ys_img.handle()
@auto_withdraw(30)
async def ys_img_handler(event: MessageEvent):
urls = [
'https://api.dujin.org/img/yuanshen/',
'https://api.dreamofice.cn/random-v0/img.php?game=ys'
]
if not ys_lmt.check(get_id(event)):
await ys_img.finish(f'原神壁纸冷却ing(剩余{ys_lmt.left_time(get_id(event))}秒)')
else:
await ys_img.send('派蒙努力找图ing..请稍候...')
ys_lmt.start_cd(get_id(event), config.paimon_ysp_cd)
await ys_img.finish(MessageSegment.image(file=random.choice(urls)))

View File

@ -20,7 +20,7 @@ class PluginConfig(BaseModel):
# 自动签到开始时间(分钟)
paimon_sign_minute: int = 0
# 对联冷却(秒)
paimon_duilian_cd: int = 6
paimon_couplets_cd: int = 6
# 猫图冷却(秒)
paimon_cat_cd: int = 12
# 二次元图冷却(秒)
@ -33,6 +33,8 @@ class PluginConfig(BaseModel):
paimon_add_group: bool = False
# 派蒙聊天开启群组
paimon_chat_group: List[int] = []
# 派蒙猜语音持续时间
paimon_guess_voice: int = 30
driver = get_driver()

View File

@ -1,11 +1,13 @@
import hashlib
from collections import defaultdict
from io import BytesIO
from pathlib import Path
import random
import base64
import datetime
from time import time
import re
import os
import string
import functools
import inspect
@ -138,11 +140,16 @@ class FreqLimiter2:
return int(self.next_time[key1][key2] - time()) + 1
async def get_pic(url):
# 从网络url中获取图片
async def get_pic(url: str, size: tuple = None, mode: str = None):
async with ClientSession() as session:
res = await session.get(url)
res = await res.read()
img = Image.open(BytesIO(res))
if size:
img = img.resize(size)
if mode:
img = img.convert(mode)
return img
@ -361,3 +368,25 @@ async def send_cookie_delete_msg(cookie_info):
await get_bot().send_private_msg(user_id=superuser, message=msg + ',派蒙帮你删除啦!')
except Exception as e:
logger.error(f'发送cookie删除消息失败: {e}')
def load_data(data_file):
data_path = Path() / 'data' / 'LittlePaimon' / data_file
if not data_path.exists():
save_data({}, data_file)
return json.load(data_path.open('r', encoding='utf-8'))
def save_data(data, data_file):
data_path = Path() / 'data' / 'LittlePaimon' / data_file
data_path.parent.mkdir(parents=True, exist_ok=True)
json.dump(data, data_path.open('w', encoding='utf-8'), ensure_ascii=False, indent=2)
def get_id(event):
if event.message_type == 'private':
return event.user_id
elif event.message_type == 'group':
return event.group_id
elif event.message_type == 'guild':
return event.channel_id