mirror of
https://github.com/xuthus83/LittlePaimon.git
synced 2024-10-21 16:27:15 +08:00
327 lines
11 KiB
Python
327 lines
11 KiB
Python
import datetime
|
|
import json
|
|
import os
|
|
import random
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
from apscheduler.triggers.date import DateTrigger
|
|
from nonebot import get_bot, require, logger
|
|
from nonebot import on_regex
|
|
from nonebot.adapters.onebot.v11 import GroupMessageEvent
|
|
from nonebot.adapters.onebot.v11 import MessageSegment, escape
|
|
from nonebot.rule import Rule
|
|
from sqlitedict import SqliteDict
|
|
|
|
from .download_data import voice_list_by_mys, voice_detail_by_mys
|
|
from .util import get_path, require_file
|
|
|
|
scheduler = require('nonebot_plugin_apscheduler').scheduler
|
|
|
|
# dir_data = os.path.join(os.path.dirname(__file__), 'data')
|
|
dir_data = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'data'
|
|
|
|
# data_path = os.path.join(os.path.dirname(__file__), 'voice')
|
|
data_path = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'voice'
|
|
|
|
# data2_path = os.path.join(os.path.dirname(__file__), 'voice2')
|
|
data2_path = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'voice2'
|
|
|
|
# if not os.path.exists(dir_data):
|
|
# os.makedirs(dir_data)
|
|
|
|
dir_data.mkdir(parents=True, exist_ok=True)
|
|
|
|
db = {}
|
|
|
|
|
|
def init_db(db_dir, db_name='db.sqlite', tablename='unnamed') -> SqliteDict:
|
|
db_cache_key = db_name + tablename
|
|
if db.get(db_cache_key):
|
|
return db[db_cache_key]
|
|
db[db_cache_key] = SqliteDict(str(get_path(db_dir, db_name)),
|
|
tablename=tablename,
|
|
encode=json.dumps,
|
|
decode=json.loads,
|
|
autocommit=True)
|
|
return db[db_cache_key]
|
|
|
|
|
|
user_db = init_db('data', 'user.sqlite')
|
|
voice_db = init_db('data', 'voice.sqlite')
|
|
voice2_db = init_db('data', 'voice2.sqlite', tablename='voice')
|
|
process = {}
|
|
|
|
with open(os.path.join(os.path.dirname(__file__), 'character.json'), 'r', encoding="utf-8") as f:
|
|
character_json: dict = json.loads(f.read())
|
|
|
|
|
|
def create_guess_matcher(role_name, second, group_id):
|
|
def check_group(event: GroupMessageEvent):
|
|
if event.group_id == group_id:
|
|
return True
|
|
return False
|
|
|
|
alias_list = character_json.get(role_name, [])
|
|
re_str = role_name + '|' + '|'.join(alias_list)
|
|
guess_matcher = on_regex(re_str, temp=True, rule=Rule(check_group))
|
|
guess_matcher.plugin_name = "Guess_voice"
|
|
|
|
@guess_matcher.handle()
|
|
async def _(event: GroupMessageEvent):
|
|
guess = Guess(event.group_id, time=second)
|
|
if guess.is_start():
|
|
await guess.add_answer(event.user_id, event.message.extract_plain_text())
|
|
|
|
|
|
def get_voice_by_language(data, language_name):
|
|
if language_name == '中':
|
|
return data['chn']
|
|
elif language_name == '日':
|
|
return data['jap']
|
|
elif language_name == '英':
|
|
return data['eng']
|
|
elif language_name == '韩':
|
|
return data['kor']
|
|
|
|
|
|
def char_name_by_name(name):
|
|
names = character_json.keys()
|
|
# 是否就是正确的名字
|
|
if name in names:
|
|
return name
|
|
#
|
|
for item in names:
|
|
nickname = character_json[item]
|
|
if name in nickname:
|
|
return item
|
|
return name
|
|
|
|
|
|
async def get_random_voice(name, language='中'):
|
|
char_name = char_name_by_name(name)
|
|
if language == '中':
|
|
# 如果是中文 则选择米游社的源
|
|
mys_list = await voice_list_by_mys()
|
|
char_voices = mys_list.get(char_name)
|
|
if not char_voices:
|
|
return
|
|
voice_list = await voice_detail_by_mys(char_voices['content_id'])
|
|
else:
|
|
voice_list = voice_db.get(char_name)
|
|
|
|
if not voice_list:
|
|
return
|
|
temp_voice_list = []
|
|
for v in voice_list:
|
|
voice_path = get_voice_by_language(v, language)
|
|
if voice_path:
|
|
temp_voice_list.append(voice_path)
|
|
if not temp_voice_list:
|
|
return
|
|
|
|
voice_path = random.choice(temp_voice_list)
|
|
|
|
if language == '中':
|
|
# 如果是中文 则选择米游社的源
|
|
path = os.path.join(data_path, language, char_name, os.path.basename(voice_path))
|
|
await require_file(file=path, url=voice_path)
|
|
else:
|
|
path = os.path.join(data_path, voice_path)
|
|
|
|
return path
|
|
|
|
|
|
class Guess:
|
|
time: int
|
|
group_id: int
|
|
group = {}
|
|
retry_count = 0
|
|
|
|
def __init__(self, group_id: int, time=30):
|
|
self.time = time
|
|
self.group_id = group_id
|
|
self.group = process.get(self.group_id)
|
|
|
|
def is_start(self):
|
|
if not self.group:
|
|
return False
|
|
return self.group['start']
|
|
|
|
def set_start(self):
|
|
process[self.group_id] = {'start': True}
|
|
|
|
def set_end(self):
|
|
process[self.group_id] = {}
|
|
|
|
async def start(self, language: List[str] = None):
|
|
if not language:
|
|
language = ['中']
|
|
# 随机一个语言
|
|
language = random.choice(language)
|
|
# 随机选择一个语音
|
|
if language == '中':
|
|
# 如果是中文 则选择米游社的源
|
|
mys_list = await voice_list_by_mys()
|
|
|
|
answer = random.choice(list(mys_list.keys()))
|
|
try:
|
|
db_dict = {
|
|
answer: await
|
|
voice_detail_by_mys(mys_list[answer]['content_id'])
|
|
}
|
|
|
|
self.retry_count = 0
|
|
except KeyError as e:
|
|
if self.retry_count == 4:
|
|
self.retry_count = 0
|
|
raise Exception('获取语音数据失败')
|
|
self.retry_count += 1
|
|
return await self.start([language])
|
|
|
|
else:
|
|
answer = random.choice(list(voice_db.keys()))
|
|
db_dict = voice_db
|
|
|
|
temp_voice_list = []
|
|
for v in db_dict[answer]:
|
|
if not (answer in v['text']):
|
|
voice_path = get_voice_by_language(v, language)
|
|
if voice_path:
|
|
temp_voice_list.append(voice_path)
|
|
|
|
if not temp_voice_list:
|
|
logger.info('随机到了个哑巴,, 重新随机.. 如果反复出现这个 你应该检查一下数据库')
|
|
return await self.start([language])
|
|
|
|
voice_path = random.choice(temp_voice_list)
|
|
|
|
if language == '中':
|
|
# 如果是中文 则选择米游社的源
|
|
path = os.path.join(data_path, language, answer, os.path.basename(voice_path))
|
|
await require_file(file=path, url=voice_path)
|
|
else:
|
|
path = os.path.join(data_path, voice_path)
|
|
|
|
# 记录答案
|
|
process[self.group_id] = {
|
|
'start': True,
|
|
'answer': answer,
|
|
'ok': set()
|
|
}
|
|
|
|
job_id = str(self.group_id) + '_guess_voice'
|
|
if scheduler.get_job(job_id, 'default'):
|
|
scheduler.remove_job(job_id, 'default')
|
|
|
|
now = datetime.datetime.now()
|
|
notify_time = now + datetime.timedelta(seconds=self.time)
|
|
scheduler.add_job(self.end_game, trigger=DateTrigger(notify_time),
|
|
id=job_id,
|
|
misfire_grace_time=60,
|
|
coalesce=True,
|
|
jobstore='default',
|
|
max_instances=1)
|
|
create_guess_matcher(answer, self.time, self.group_id)
|
|
return MessageSegment.record(file=Path(path))
|
|
|
|
async def start2(self):
|
|
# hard mode
|
|
if not os.path.exists(data2_path):
|
|
print('请到github下载genshin_voice压缩包解压到 ' + data2_path)
|
|
raise Exception('困难模式语音文件夹不存在')
|
|
|
|
names = list(voice2_db.keys())
|
|
if not names:
|
|
raise Exception('数据库错误. 请重新下载数据库文件')
|
|
|
|
names.remove('')
|
|
names.remove('派蒙')
|
|
names.remove('旅行者')
|
|
answer = random.choice(names)
|
|
answer_info = None
|
|
while True:
|
|
info = random.choice(voice2_db[answer])
|
|
if answer not in info['text']:
|
|
answer_info = info
|
|
break
|
|
|
|
if not answer_info:
|
|
return await self.start2()
|
|
|
|
# 记录答案
|
|
process[self.group_id] = {
|
|
'start': True,
|
|
'answer': answer,
|
|
'ok': set()
|
|
}
|
|
|
|
job_id = str(self.group_id) + '_guess_voice'
|
|
if scheduler.get_job(job_id, 'default'):
|
|
scheduler.remove_job(job_id, 'default')
|
|
|
|
now = datetime.datetime.now()
|
|
notify_time = now + datetime.timedelta(seconds=self.time)
|
|
scheduler.add_job(self.end_game, trigger=DateTrigger(notify_time),
|
|
id=job_id,
|
|
misfire_grace_time=60,
|
|
coalesce=True,
|
|
jobstore='default',
|
|
max_instances=1)
|
|
|
|
path = os.path.join(data2_path, answer_info['file'] + '.mp3')
|
|
|
|
return MessageSegment.record(file=Path(path))
|
|
|
|
async def end_game(self):
|
|
self.group = process.get(self.group_id)
|
|
ok_list = list(process[self.group_id]['ok'])
|
|
if len(ok_list) > 1: # 只允许1个人猜对
|
|
return
|
|
if not ok_list:
|
|
msg = '还没有人猜中呢'
|
|
else:
|
|
msg = '回答正确的人: ' + ' '.join([str(MessageSegment.at(qq)) for qq in ok_list])
|
|
msg = '正确答案是 %s\n%s' % (self.group['answer'], msg)
|
|
try:
|
|
await get_bot().send_group_msg(group_id=self.group_id, message=msg)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
# 清理记录
|
|
process[self.group_id] = {}
|
|
|
|
# 记录到数据库给之后奖励做处理
|
|
user_group = user_db.get(self.group_id, {})
|
|
if not user_group:
|
|
user_db[self.group_id] = {}
|
|
|
|
for user in ok_list:
|
|
info = user_group.get(str(user), {'count': 0})
|
|
info['count'] += 1
|
|
user_group[user] = info
|
|
user_db[self.group_id] = user_group
|
|
|
|
# 只添加正确的答案
|
|
async def add_answer(self, qq: int, msg: str):
|
|
if self.group.get('answer'):
|
|
process[self.group_id]['ok'].add(qq)
|
|
job_id = str(self.group_id) + '_guess_voice'
|
|
if scheduler.get_job(job_id, 'default'):
|
|
scheduler.remove_job(job_id, 'default')
|
|
await self.end_game()
|
|
|
|
# 获取排行榜
|
|
async def get_rank(self, bot, event):
|
|
user_list = user_db.get(self.group_id, {})
|
|
|
|
user_list = sorted(user_list.items(), key=lambda v: v[1]['count'])
|
|
user_list.reverse()
|
|
num = 0
|
|
msg = '本群猜语音排行榜:'
|
|
for user, data in user_list[:10]:
|
|
user = await bot.get_group_member_info(group_id=event.group_id, user_id=user)
|
|
num += 1
|
|
msg += f"\n第{num}名: {escape(user['card']) or escape(user['nickname'])}, 猜对{data['count']}次"
|
|
return msg
|