import os import random import datetime import json from apscheduler.triggers.date import DateTrigger from typing import List from pathlib import Path from nonebot import get_bot, require, logger from nonebot.adapters.onebot.v11 import MessageSegment, escape from sqlitedict import SqliteDict from .util import get_path, require_file from nonebot.rule import Rule from nonebot import on_regex from nonebot.adapters.onebot.v11 import GroupMessageEvent from .download_data import voice_list_by_mys, voice_detail_by_mys scheduler = require('nonebot_plugin_apscheduler').scheduler # dir_data = os.path.join(os.path.dirname(__file__), 'data') dir_data = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'data' # data_path = os.path.join(os.path.dirname(__file__), 'voice') data_path = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'voice' # data2_path = os.path.join(os.path.dirname(__file__), 'voice2') data2_path = Path() / 'data' / 'LittlePaimon' / 'guess_voice' / 'voice2' # if not os.path.exists(dir_data): # os.makedirs(dir_data) dir_data.mkdir(parents=True, exist_ok=True) db = {} def init_db(db_dir, db_name='db.sqlite', tablename='unnamed') -> SqliteDict: db_cache_key = db_name + tablename if db.get(db_cache_key): return db[db_cache_key] db[db_cache_key] = SqliteDict(str(get_path(db_dir, db_name)), tablename=tablename, encode=json.dumps, decode=json.loads, autocommit=True) return db[db_cache_key] user_db = init_db('data', 'user.sqlite') voice_db = init_db('data', 'voice.sqlite') voice2_db = init_db('data', 'voice2.sqlite', tablename='voice') process = {} with open(os.path.join(os.path.dirname(__file__), 'character.json'), 'r', encoding="utf-8") as f: character_json: dict = json.loads(f.read()) def create_guess_matcher(role_name, second, group_id): def check_group(event: GroupMessageEvent): if event.group_id == group_id: return True return False alias_list = character_json[role_name] re_str = role_name + '|' + '|'.join(alias_list) guess_matcher = on_regex(re_str, temp=True, rule=Rule(check_group)) guess_matcher.plugin_name = "Guess_voice" @guess_matcher.handle() async def _(event: GroupMessageEvent): guess = Guess(event.group_id, time=second) if guess.is_start(): await guess.add_answer(event.user_id, event.message.extract_plain_text()) def get_voice_by_language(data, language_name): if language_name == '中': return data['chn'] elif language_name == '日': return data['jap'] elif language_name == '英': return data['eng'] elif language_name == '韩': return data['kor'] def char_name_by_name(name): names = character_json.keys() # 是否就是正确的名字 if name in names: return name # for item in names: nickname = character_json[item] if name in nickname: return item return name async def get_random_voice(name, language='中'): char_name = char_name_by_name(name) if language == '中': # 如果是中文 则选择米游社的源 mys_list = await voice_list_by_mys() char_voices = mys_list.get(char_name) if not char_voices: return voice_list = await voice_detail_by_mys(char_voices['content_id']) else: voice_list = voice_db.get(char_name) if not voice_list: return temp_voice_list = [] for v in voice_list: voice_path = get_voice_by_language(v, language) if voice_path: temp_voice_list.append(voice_path) if not temp_voice_list: return voice_path = random.choice(temp_voice_list) if language == '中': # 如果是中文 则选择米游社的源 path = os.path.join(data_path, language, char_name, os.path.basename(voice_path)) await require_file(file=path, url=voice_path) else: path = os.path.join(data_path, voice_path) return path class Guess: time: int group_id: int group = {} retry_count = 0 def __init__(self, group_id: int, time=30): self.time = time self.group_id = group_id self.group = process.get(self.group_id) def is_start(self): if not self.group: return False return self.group['start'] def set_start(self): process[self.group_id] = {'start': True} def set_end(self): process[self.group_id] = {} async def start(self, language: List[str] = None): if not language: language = ['中'] # 随机一个语言 language = random.choice(language) # 随机选择一个语音 if language == '中': # 如果是中文 则选择米游社的源 mys_list = await voice_list_by_mys() answer = random.choice(list(mys_list.keys())) try: db_dict = { answer: await voice_detail_by_mys(mys_list[answer]['content_id']) } self.retry_count = 0 except KeyError as e: if self.retry_count == 4: self.retry_count = 0 raise Exception('获取语音数据失败') self.retry_count += 1 return await self.start([language]) else: answer = random.choice(list(voice_db.keys())) db_dict = voice_db temp_voice_list = [] for v in db_dict[answer]: if not (answer in v['text']): voice_path = get_voice_by_language(v, language) if voice_path: temp_voice_list.append(voice_path) if not temp_voice_list: logger.info('随机到了个哑巴,, 重新随机.. 如果反复出现这个 你应该检查一下数据库') return await self.start([language]) voice_path = random.choice(temp_voice_list) if language == '中': # 如果是中文 则选择米游社的源 path = os.path.join(data_path, language, answer, os.path.basename(voice_path)) await require_file(file=path, url=voice_path) else: path = os.path.join(data_path, voice_path) # 记录答案 process[self.group_id] = { 'start': True, 'answer': answer, 'ok': set() } job_id = str(self.group_id) + '_guess_voice' if scheduler.get_job(job_id, 'default'): scheduler.remove_job(job_id, 'default') now = datetime.datetime.now() notify_time = now + datetime.timedelta(seconds=self.time) scheduler.add_job(self.end_game, trigger=DateTrigger(notify_time), id=job_id, misfire_grace_time=60, coalesce=True, jobstore='default', max_instances=1) create_guess_matcher(answer, self.time, self.group_id) return MessageSegment.record(file=Path(path)) async def start2(self): # hard mode if not os.path.exists(data2_path): print('请到github下载genshin_voice压缩包解压到 ' + data2_path) raise Exception('困难模式语音文件夹不存在') names = list(voice2_db.keys()) if not names: raise Exception('数据库错误. 请重新下载数据库文件') names.remove('') names.remove('派蒙') names.remove('旅行者') answer = random.choice(names) answer_info = None while True: info = random.choice(voice2_db[answer]) if answer not in info['text']: answer_info = info break if not answer_info: return await self.start2() # 记录答案 process[self.group_id] = { 'start': True, 'answer': answer, 'ok': set() } job_id = str(self.group_id) + '_guess_voice' if scheduler.get_job(job_id, 'default'): scheduler.remove_job(job_id, 'default') now = datetime.datetime.now() notify_time = now + datetime.timedelta(seconds=self.time) scheduler.add_job(self.end_game, trigger=DateTrigger(notify_time), id=job_id, misfire_grace_time=60, coalesce=True, jobstore='default', max_instances=1) path = os.path.join(data2_path, answer_info['file'] + '.mp3') return MessageSegment.record(file=Path(path)) async def end_game(self): self.group = process.get(self.group_id) ok_list = list(process[self.group_id]['ok']) if len(ok_list) > 1: # 只允许1个人猜对 return if not ok_list: msg = '还没有人猜中呢' else: msg = '回答正确的人: ' + ' '.join([str(MessageSegment.at(qq)) for qq in ok_list]) msg = '正确答案是 %s\n%s' % (self.group['answer'], msg) try: await get_bot().send_group_msg(group_id=self.group_id, message=msg) except Exception as e: logger.error(e) # 清理记录 process[self.group_id] = {} # 记录到数据库给之后奖励做处理 user_group = user_db.get(self.group_id, {}) if not user_group: user_db[self.group_id] = {} for user in ok_list: info = user_group.get(str(user), {'count': 0}) info['count'] += 1 user_group[user] = info user_db[self.group_id] = user_group # 只添加正确的答案 async def add_answer(self, qq: int, msg: str): if self.group.get('answer'): process[self.group_id]['ok'].add(qq) job_id = str(self.group_id) + '_guess_voice' if scheduler.get_job(job_id, 'default'): scheduler.remove_job(job_id, 'default') await self.end_game() # 获取排行榜 async def get_rank(self, bot, event): user_list = user_db.get(self.group_id, {}) user_list = sorted(user_list.items(), key=lambda v: v[1]['count']) user_list.reverse() num = 0 msg = '本群猜语音排行榜:' for user, data in user_list[:10]: user = await bot.get_group_member_info(group_id=event.group_id, user_id=user) num += 1 msg += f"\n第{num}名: {escape(user['card']) or escape(user['nickname'])}, 猜对{data['count']}次" return msg