mirror of
https://github.com/xuthus83/LittlePaimon.git
synced 2024-12-16 13:40:53 +08:00
✨ 命令别名
增强,可以在Web UI
中进行设置,优化群聊学习
This commit is contained in:
parent
a850fdbfc7
commit
512685a707
@ -1,2 +1,3 @@
|
||||
from .config.manage import ConfigManager, ConfigModel, config
|
||||
from .plugin.manage import PluginManager, HIDDEN_PLUGINS, MatcherInfo, PluginInfo
|
||||
from .command import handle_command_alias
|
||||
|
46
LittlePaimon/config/command/__init__.py
Normal file
46
LittlePaimon/config/command/__init__.py
Normal file
@ -0,0 +1,46 @@
|
||||
import re
|
||||
from typing import List
|
||||
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent
|
||||
from nonebot.message import event_preprocessor
|
||||
from tortoise.queryset import Q
|
||||
|
||||
from LittlePaimon.config import config
|
||||
from LittlePaimon.database import CommandAlias, AliasMode
|
||||
|
||||
|
||||
@event_preprocessor
|
||||
async def handle_command_alias(event: MessageEvent):
|
||||
if not config.command_alias_enable:
|
||||
return
|
||||
msgs = event.get_message()
|
||||
if len(msgs) < 1 or msgs[0].type != 'text':
|
||||
return
|
||||
msg = str(msgs[0]).lstrip()
|
||||
if not msg:
|
||||
return
|
||||
if isinstance(event, GroupMessageEvent):
|
||||
filter_arg = Q(group_id=str(event.group_id)) | Q(group_id='all')
|
||||
else:
|
||||
filter_arg = Q(group_id='all')
|
||||
all_alias = await CommandAlias.filter(filter_arg).order_by('priority')
|
||||
new_msg = modify_msg(all_alias, msg)
|
||||
event.message[0].data['text'] = new_msg
|
||||
|
||||
|
||||
def combine_msg(new_command: str, extra_msg: str, is_reverse: bool):
|
||||
return (new_command + extra_msg) if not is_reverse else (extra_msg + new_command)
|
||||
|
||||
|
||||
def modify_msg(all_alias: List[CommandAlias], msg: str):
|
||||
for alias in all_alias:
|
||||
if alias.is_regex:
|
||||
msg = re.sub(alias.alias, alias.command, msg)
|
||||
else:
|
||||
if alias.mode == AliasMode.prefix and msg.startswith(alias.alias):
|
||||
msg = combine_msg(alias.command, msg[len(alias.alias):], alias.is_reverse)
|
||||
elif alias.mode == AliasMode.suffix and msg.endswith(alias.alias):
|
||||
msg = combine_msg(msg[:-len(alias.alias)], alias.command, alias.is_reverse)
|
||||
elif alias.mode == AliasMode.full_match and msg == alias.alias:
|
||||
msg = alias.command
|
||||
return msg
|
@ -45,6 +45,8 @@ class ConfigModel(BaseModel):
|
||||
secret_key: str = Field('49c294d32f69b732ef6447c18379451ce1738922a75cd1d4812ef150318a2ed0', alias='Web端token密钥')
|
||||
admin_theme: Literal['default', 'antd', 'ang', 'dark'] = Field('default', alias='Web端主题')
|
||||
|
||||
command_alias_enable: bool = Field(True, alias='启用命令别名')
|
||||
|
||||
@property
|
||||
def alias_dict(self):
|
||||
return {v.alias: k for k, v in self.__fields__.items()}
|
||||
|
@ -4,66 +4,75 @@ from pathlib import Path
|
||||
from tortoise import Tortoise
|
||||
from nonebot.log import logger
|
||||
from LittlePaimon.utils import scheduler, logger as my_logger
|
||||
from LittlePaimon.utils.path import GENSHIN_DB_PATH, SUB_DB_PATH, GENSHIN_VOICE_DB_PATH, MANAGER_DB_PATH, YSC_TEMP_IMG_PATH
|
||||
from LittlePaimon.utils.path import GENSHIN_DB_PATH, SUB_DB_PATH, GENSHIN_VOICE_DB_PATH, MANAGER_DB_PATH, \
|
||||
YSC_TEMP_IMG_PATH
|
||||
from .models import *
|
||||
|
||||
|
||||
DATABASE = {
|
||||
"connections": {
|
||||
"paimon_genshin": {
|
||||
"engine": "tortoise.backends.sqlite",
|
||||
"credentials": {"file_path": GENSHIN_DB_PATH},
|
||||
'connections': {
|
||||
'paimon_genshin': {
|
||||
'engine': 'tortoise.backends.sqlite',
|
||||
'credentials': {'file_path': GENSHIN_DB_PATH},
|
||||
},
|
||||
"paimon_subscription": {
|
||||
"engine": "tortoise.backends.sqlite",
|
||||
"credentials": {"file_path": SUB_DB_PATH},
|
||||
'paimon_subscription': {
|
||||
'engine': 'tortoise.backends.sqlite',
|
||||
'credentials': {'file_path': SUB_DB_PATH},
|
||||
},
|
||||
'paimon_genshin_voice': {
|
||||
"engine": "tortoise.backends.sqlite",
|
||||
"credentials": {"file_path": GENSHIN_VOICE_DB_PATH},
|
||||
'engine': 'tortoise.backends.sqlite',
|
||||
'credentials': {'file_path': GENSHIN_VOICE_DB_PATH},
|
||||
},
|
||||
'paimon_manager': {
|
||||
"engine": "tortoise.backends.sqlite",
|
||||
"credentials": {"file_path": MANAGER_DB_PATH},
|
||||
}
|
||||
'paimon_manage': {
|
||||
'engine': 'tortoise.backends.sqlite',
|
||||
'credentials': {'file_path': MANAGER_DB_PATH},
|
||||
},
|
||||
# 'memory_db': 'sqlite://:memory:'
|
||||
},
|
||||
"apps": {
|
||||
"paimon_genshin": {
|
||||
"models": ['LittlePaimon.database.models.player_info',
|
||||
'LittlePaimon.database.models.abyss_info',
|
||||
'LittlePaimon.database.models.character',
|
||||
'LittlePaimon.database.models.cookie'],
|
||||
"default_connection": "paimon_genshin",
|
||||
'apps': {
|
||||
'paimon_genshin': {
|
||||
'models': [player_info.__name__,
|
||||
abyss_info.__name__,
|
||||
character.__name__,
|
||||
cookie.__name__],
|
||||
'default_connection': 'paimon_genshin',
|
||||
},
|
||||
"paimon_subscription": {
|
||||
"models": ['LittlePaimon.database.models.subscription'],
|
||||
"default_connection": "paimon_subscription",
|
||||
'paimon_subscription': {
|
||||
'models': [subscription.__name__],
|
||||
'default_connection': 'paimon_subscription',
|
||||
},
|
||||
"paimon_genshin_voice": {
|
||||
"models": ['LittlePaimon.database.models.genshin_voice'],
|
||||
"default_connection": "paimon_genshin_voice",
|
||||
'paimon_genshin_voice': {
|
||||
'models': [genshin_voice.__name__],
|
||||
'default_connection': 'paimon_genshin_voice',
|
||||
},
|
||||
"paimon_manager": {
|
||||
"models": ['LittlePaimon.database.models.manager'],
|
||||
"default_connection": "paimon_manager",
|
||||
}
|
||||
'paimon_manage': {
|
||||
'models': [manage.__name__],
|
||||
'default_connection': 'paimon_manage',
|
||||
},
|
||||
# 'memory_db': {
|
||||
# 'models': [memory_db.__name__],
|
||||
# 'default_connection': 'memory_db',
|
||||
# }
|
||||
},
|
||||
'use_tz': False,
|
||||
'timezone': 'Asia/Shanghai'
|
||||
}
|
||||
|
||||
|
||||
def register_database(db_name: str, models: List[Union[str, Path]], db_path: Optional[Union[str, Path]]):
|
||||
def register_database(db_name: str, models: str, db_path: Optional[Union[str, Path]]):
|
||||
"""
|
||||
注册数据库
|
||||
"""
|
||||
if db_name in DATABASE['connections'] and db_name in DATABASE['apps']:
|
||||
DATABASE['apps'][db_name]['models'].extend(models)
|
||||
DATABASE['apps'][db_name]['models'].append(models)
|
||||
else:
|
||||
DATABASE['connections'][db_name] = {
|
||||
"engine": "tortoise.backends.sqlite",
|
||||
"credentials": {"file_path": db_path},
|
||||
'engine': 'tortoise.backends.sqlite',
|
||||
'credentials': {'file_path': db_path},
|
||||
}
|
||||
DATABASE['apps'][db_name] = {
|
||||
"models": models,
|
||||
"default_connection": db_name,
|
||||
'models': [models],
|
||||
'default_connection': db_name,
|
||||
}
|
||||
|
||||
|
||||
@ -74,9 +83,9 @@ async def connect():
|
||||
try:
|
||||
await Tortoise.init(DATABASE)
|
||||
await Tortoise.generate_schemas()
|
||||
logger.opt(colors=True).success("<u><y>[数据库]</y></u><g>连接成功</g>")
|
||||
logger.opt(colors=True).success('<u><y>[数据库]</y></u><g>连接成功</g>')
|
||||
except Exception as e:
|
||||
logger.opt(colors=True).warning(f"<u><y>[数据库]</y></u><r>连接失败:{e}</r>")
|
||||
logger.opt(colors=True).warning(f'<u><y>[数据库]</y></u><r>连接失败:{e}</r>')
|
||||
raise e
|
||||
|
||||
|
||||
@ -85,7 +94,7 @@ async def disconnect():
|
||||
断开数据库连接
|
||||
"""
|
||||
await Tortoise.close_connections()
|
||||
logger.opt(colors=True).success("<u><y>[数据库]</y></u><r>连接已断开</r>")
|
||||
logger.opt(colors=True).success('<u><y>[数据库]</y></u><r>连接已断开</r>')
|
||||
|
||||
|
||||
@scheduler.scheduled_job('cron', hour=0, minute=0, misfire_grace_time=10)
|
||||
@ -112,3 +121,5 @@ async def daily_reset():
|
||||
if YSC_TEMP_IMG_PATH.exists():
|
||||
shutil.rmtree(YSC_TEMP_IMG_PATH)
|
||||
YSC_TEMP_IMG_PATH.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
await MysAuthKey.filter()
|
||||
|
@ -3,6 +3,8 @@ from .cookie import *
|
||||
from .player_info import *
|
||||
from .subscription import *
|
||||
from .genshin_voice import *
|
||||
from .manager import *
|
||||
from .manage import *
|
||||
from .abyss_info import *
|
||||
from .learning_chat import *
|
||||
# from .memory_db import *
|
||||
|
||||
from . import abyss_info, character, cookie, genshin_voice, manage, other, player_info, subscription
|
||||
|
@ -1,171 +0,0 @@
|
||||
import time
|
||||
from typing import List, Optional, Iterator
|
||||
from pydantic import BaseModel
|
||||
|
||||
try:
|
||||
import ujson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
from tortoise import fields
|
||||
from tortoise.models import Model
|
||||
|
||||
|
||||
class BanWord(BaseModel):
|
||||
keywords: str
|
||||
group_id: int
|
||||
reason: Optional[str]
|
||||
time: Optional[int]
|
||||
|
||||
|
||||
class BanWords(BaseModel):
|
||||
bans: List[BanWord] = []
|
||||
|
||||
def __len__(self):
|
||||
return len(self.bans)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.bans[item]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.bans[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.bans[key]
|
||||
|
||||
def __iter__(self) -> Iterator[BanWord]:
|
||||
return iter(self.bans)
|
||||
|
||||
def __reversed__(self):
|
||||
return reversed(self.bans)
|
||||
|
||||
def append(self, ban: BanWord):
|
||||
self.bans.append(ban)
|
||||
|
||||
def index(self, ban: BanWord) -> int:
|
||||
return self.bans.index(ban)
|
||||
|
||||
|
||||
|
||||
# @staticmethod
|
||||
# def tortoise_decoder(text: str) -> List["BanWord"]:
|
||||
# print('ban_decoder:', text)
|
||||
# return [BanWord.parse_obj(item) for item in json.loads(text)]
|
||||
#
|
||||
# @staticmethod
|
||||
# def tortoise_encoder(models: List["BanWord"]) -> str:
|
||||
# print('ban_encoder:', models)
|
||||
# if not models:
|
||||
# return ''
|
||||
# elif isinstance(models[0], BanWord):
|
||||
# return json.dumps([model.dict() for model in models])
|
||||
|
||||
|
||||
class Answer(BaseModel):
|
||||
keywords: str
|
||||
group_id: int
|
||||
count: int
|
||||
time: int
|
||||
messages: List[str]
|
||||
|
||||
# @staticmethod
|
||||
# def tortoise_decoder(text: str) -> List["Answer"]:
|
||||
# print('answer_decoder:', text)
|
||||
# return [Answer.parse_obj(item) for item in json.loads(text)]
|
||||
#
|
||||
# @staticmethod
|
||||
# def tortoise_encoder(models: List["Answer"]) -> str:
|
||||
# print('answer_encoder:', models)
|
||||
# if not models:
|
||||
# return ''
|
||||
# elif isinstance(models[0], BanWord):
|
||||
# return json.dumps([model.dict() for model in models])
|
||||
|
||||
|
||||
class Answers(BaseModel):
|
||||
answers: List[Answer] = []
|
||||
|
||||
def __len__(self):
|
||||
return len(self.answers)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.answers[item]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.answers[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.answers[key]
|
||||
|
||||
def __iter__(self) -> Iterator[Answer]:
|
||||
return iter(self.answers)
|
||||
|
||||
def __reversed__(self):
|
||||
return reversed(self.answers)
|
||||
|
||||
def append(self, answer: Answer):
|
||||
self.answers.append(answer)
|
||||
|
||||
def index(self, answer: Answer) -> int:
|
||||
return self.answers.index(answer)
|
||||
|
||||
|
||||
class Message(Model):
|
||||
id: int = fields.IntField(pk=True, generated=True, auto_increment=True)
|
||||
"""自增主键"""
|
||||
group_id: int = fields.IntField()
|
||||
"""群id"""
|
||||
user_id: int = fields.IntField()
|
||||
"""用户id"""
|
||||
raw_message: str = fields.TextField()
|
||||
"""原始消息"""
|
||||
is_plain_text: bool = fields.BooleanField()
|
||||
"""是否为纯文本"""
|
||||
plain_text: str = fields.TextField()
|
||||
"""纯文本"""
|
||||
keywords: str = fields.TextField()
|
||||
"""关键词"""
|
||||
time: int = fields.IntField()
|
||||
"""时间戳"""
|
||||
|
||||
class Meta:
|
||||
table = 'Message'
|
||||
indexes = ('time',)
|
||||
ordering = ['-time']
|
||||
|
||||
|
||||
class Context(Model):
|
||||
id: int = fields.IntField(pk=True, generated=True, auto_increment=True)
|
||||
"""自增主键"""
|
||||
keywords: str = fields.TextField()
|
||||
"""关键词"""
|
||||
time: int = fields.IntField(default=int(time.time()))
|
||||
"""时间戳"""
|
||||
count: int = fields.IntField(default=1)
|
||||
"""次数"""
|
||||
answers: Answers = fields.JSONField(encoder=Answers.json, decoder=Answers.parse_raw, default=Answers())
|
||||
"""答案列表"""
|
||||
clear_time: Optional[int] = fields.IntField(null=True)
|
||||
"""清除时间戳"""
|
||||
ban: BanWords = fields.JSONField(encoder=BanWords.json, decoder=BanWords.parse_raw, default=BanWords())
|
||||
"""禁用词列表"""
|
||||
|
||||
class Meta:
|
||||
table = 'Context'
|
||||
indexes = ('keywords', 'count', 'time')
|
||||
ordering = ['-time', '-count']
|
||||
|
||||
|
||||
class BlackList(Model):
|
||||
id: int = fields.IntField(pk=True, generated=True, auto_increment=True)
|
||||
"""自增主键"""
|
||||
group_id: int = fields.IntField()
|
||||
"""群id"""
|
||||
answers: List[str] = fields.JSONField(default=[])
|
||||
"""答案"""
|
||||
answers_reserve: List[str] = fields.JSONField(default=[])
|
||||
"""备用答案"""
|
||||
|
||||
class Meta:
|
||||
table = 'BlackList'
|
||||
indexes = ('group_id',)
|
@ -1,5 +1,6 @@
|
||||
import datetime
|
||||
from typing import List
|
||||
from enum import Enum
|
||||
|
||||
from tortoise import fields
|
||||
from tortoise.models import Model
|
||||
@ -43,3 +44,30 @@ class PluginStatistics(Model):
|
||||
|
||||
class Meta:
|
||||
table = 'plugin_statistics'
|
||||
|
||||
|
||||
class AliasMode(Enum):
|
||||
prefix: str = '前缀'
|
||||
suffix: str = '后缀'
|
||||
full_match: str = '全匹配'
|
||||
|
||||
|
||||
class CommandAlias(Model):
|
||||
id = fields.IntField(pk=True, generated=True, auto_increment=True)
|
||||
command: str = fields.TextField()
|
||||
"""目标命令"""
|
||||
alias: str = fields.TextField()
|
||||
"""命令别名"""
|
||||
mode: AliasMode = fields.CharEnumField(AliasMode, max_length=10)
|
||||
"""别名模式"""
|
||||
is_regex: bool = fields.BooleanField(default=False)
|
||||
"""是否为正则表达式"""
|
||||
is_reverse: bool = fields.BooleanField(default=False)
|
||||
"""是否反转"""
|
||||
group_id: str = fields.CharField(max_length=30)
|
||||
"""启用的群id,all为全局启用"""
|
||||
priority: int = fields.IntField(default=99)
|
||||
"""优先级,数字越大优先级越高"""
|
||||
|
||||
class Meta:
|
||||
table = 'command_alias'
|
16
LittlePaimon/database/models/memory_db.py
Normal file
16
LittlePaimon/database/models/memory_db.py
Normal file
@ -0,0 +1,16 @@
|
||||
import datetime
|
||||
|
||||
from tortoise import fields
|
||||
from tortoise.models import Model
|
||||
|
||||
|
||||
class MysAuthKey(Model):
|
||||
id: int = fields.IntField(pk=True, generated=True, auto_increment=True)
|
||||
user_id: str = fields.TextField()
|
||||
"""用户id"""
|
||||
uid: str = fields.TextField()
|
||||
"""原神uid"""
|
||||
authkey: str = fields.TextField()
|
||||
"""authkey"""
|
||||
generate_time: datetime.datetime = fields.DatetimeField(auto_now_add=True)
|
||||
"""生成时间"""
|
@ -67,32 +67,40 @@ class LearningChat:
|
||||
self.role = 'superuser' if event.user_id in SUPERUSERS else event.sender.role
|
||||
self.config = config_manager.get_group_config(self.data.group_id)
|
||||
self.ban_users = set(chat_config.ban_users + self.config.ban_users)
|
||||
self.ban_words = set(chat_config.ban_words + self.config.ban_words)
|
||||
|
||||
async def _learn(self) -> Result:
|
||||
# logger.debug('群聊学习', f'收到来自群<m>{self.data.group_id}</m>的消息<m>{self.data.message}</m>')
|
||||
if not chat_config.total_enable or not self.config.enable or self.data.user_id in self.ban_users:
|
||||
# 如果未开启群聊学习或者发言人在屏蔽列表中,跳过
|
||||
if not chat_config.total_enable or not self.config.enable:
|
||||
logger.debug('群聊学习', f'➤该群<m>{self.data.group_id}</m>未开启群聊学习,跳过')
|
||||
# 如果未开启群聊学习,跳过
|
||||
return Result.Pass
|
||||
elif self.data.user_id in self.ban_users:
|
||||
# 发言人在屏蔽列表中,跳过
|
||||
logger.debug('群聊学习', f'➤发言人<m>{self.data.user_id}</m>在屏蔽列表中,跳过')
|
||||
elif self.to_me and '不可以' in self.data.message:
|
||||
# 如果是对某句话进行禁言
|
||||
return Result.Ban
|
||||
elif self.to_me and any(w in self.data.message for w in {'学说话', '快学', '开启学习', '闭嘴', '别学', '关闭学习'}):
|
||||
elif self.to_me and any(
|
||||
w in self.data.message for w in {'学说话', '快学', '开启学习', '闭嘴', '别学', '关闭学习'}):
|
||||
return Result.SetEnable
|
||||
elif not await self._check_allow(self.data):
|
||||
# 本消息不合法,跳过
|
||||
logger.debug('群聊学习', f'➤消息未通过校验,跳过')
|
||||
return Result.Pass
|
||||
elif self.reply:
|
||||
# 如果是回复消息
|
||||
if not (message := await ChatMessage.get_or_none(message_id=self.reply.message_id)):
|
||||
# 回复的消息在数据库中有记录
|
||||
logger.debug('群聊学习', '➤是否学习:回复的消息不在数据库中,不学习')
|
||||
logger.debug('群聊学习', '➤回复的消息不在数据库中,跳过')
|
||||
return Result.Pass
|
||||
if message.user_id in self.ban_users:
|
||||
# 且回复的人不在屏蔽列表中
|
||||
logger.debug('群聊学习', '➤回复的人在屏蔽列表中,跳过')
|
||||
return Result.Pass
|
||||
if not await self._check_allow(message):
|
||||
# 且回复的内容通过校验
|
||||
logger.debug('群聊学习', '➤是否学习:回复的消息未通过校验,不学习')
|
||||
logger.debug('群聊学习', '➤回复的消息未通过校验,跳过')
|
||||
return Result.Pass
|
||||
# 则将该回复作为该消息的答案
|
||||
await self._set_answer(message)
|
||||
@ -102,7 +110,7 @@ class LearningChat:
|
||||
# 获取本群一个小时内的最后5条消息
|
||||
if messages[0].message == self.data.message:
|
||||
# 判断是否为复读中
|
||||
logger.debug('群聊学习', '➤是否学习:复读中,不学习')
|
||||
logger.debug('群聊学习', '➤复读中,跳过')
|
||||
return Result.Repeat
|
||||
for message in messages:
|
||||
# 如果5条内有相关信息,就作为该消息的答案
|
||||
@ -114,7 +122,7 @@ class LearningChat:
|
||||
# 如果没有相关信息
|
||||
if messages[0].user_id in self.ban_users or not await self._check_allow(messages[0]):
|
||||
# 且最后一条消息的发送者不在屏蔽列表中并通过校验
|
||||
logger.debug('群聊学习', '➤是否学习:最后一条消息未通过校验,不学习')
|
||||
logger.debug('群聊学习', '➤最后一条消息未通过校验,跳过')
|
||||
return Result.Pass
|
||||
# 则作为最后一条消息的答案
|
||||
await self._set_answer(messages[0])
|
||||
@ -162,23 +170,24 @@ class LearningChat:
|
||||
elif result == Result.Repeat and (messages := await ChatMessage.filter(group_id=self.data.group_id,
|
||||
time__gte=self.data.time - 3600).limit(
|
||||
self.config.repeat_threshold)):
|
||||
# 如果达到阈值,进行复读
|
||||
# 如果达到阈值,且bot没有回复过,且不是全都为同一个人在说,则进行复读
|
||||
if len(messages) >= self.config.repeat_threshold and all(
|
||||
message.message == self.data.message and message.user_id != self.bot_id for message in
|
||||
messages):
|
||||
message.message == self.data.message and message.user_id != self.bot_id
|
||||
for message in messages) and not all(
|
||||
message.user_id == messages[0].user_id for message in messages):
|
||||
if random.random() < self.config.break_probability:
|
||||
logger.debug('群聊学习', f'➤➤是否回复:达到复读阈值,打断复读!')
|
||||
logger.debug('群聊学习', f'➤➤达到复读阈值,打断复读!')
|
||||
return [random.choice(BREAK_REPEAT_WORDS)]
|
||||
else:
|
||||
logger.debug('群聊学习', f'➤➤是否回复:达到复读阈值,复读<m>{messages[0].message}</m>')
|
||||
logger.debug('群聊学习', f'➤➤达到复读阈值,复读<m>{messages[0].message}</m>')
|
||||
return [self.data.message]
|
||||
else:
|
||||
# 回复
|
||||
if self.data.is_plain_text and len(self.data.plain_text) <= 1:
|
||||
logger.debug('群聊学习', '➤➤是否回复:消息过短,不回复')
|
||||
logger.debug('群聊学习', '➤➤消息过短,不回复')
|
||||
return None
|
||||
if not (context := await ChatContext.get_or_none(keywords=self.data.keywords)):
|
||||
logger.debug('群聊学习', '➤➤是否回复:尚未有已学习的回复,不回复')
|
||||
logger.debug('群聊学习', '➤➤尚未有已学习的回复,不回复')
|
||||
return None
|
||||
|
||||
# 获取回复阈值
|
||||
@ -195,7 +204,7 @@ class LearningChat:
|
||||
else:
|
||||
answer_count_threshold = 1
|
||||
cross_group_threshold = 1
|
||||
|
||||
logger.debug('群聊学习', f'➤➤本次回复阈值为<m>{answer_count_threshold}</m>,跨群阈值为<m>{cross_group_threshold}</m>')
|
||||
# 获取满足跨群条件的回复
|
||||
answers_cross = await ChatAnswer.filter(context=context, count__gte=answer_count_threshold,
|
||||
keywords__in=await ChatAnswer.annotate(
|
||||
@ -215,7 +224,7 @@ class LearningChat:
|
||||
# answer.count -= answer_count_threshold - 1
|
||||
candidate_answers.append(answer)
|
||||
if not candidate_answers:
|
||||
logger.debug('群聊学习', '➤➤是否回复:没有符合条件的候选回复')
|
||||
logger.debug('群聊学习', '➤➤没有符合条件的候选回复')
|
||||
return None
|
||||
|
||||
# 从候选回复中进行选择
|
||||
@ -225,13 +234,13 @@ class LearningChat:
|
||||
per_list.append(1 - sum(per_list))
|
||||
answer_dict = tuple(zip(candidate_answers, per_list))
|
||||
logger.debug('群聊学习',
|
||||
f'➤➤是否回复:候选回复有<m>{"|".join([f"""{a.keywords}({round(p, 3)})""" for a, p in answer_dict])}|不回复({round(per_list[-1], 3)})</m>')
|
||||
f'➤➤候选回复有<m>{"|".join([f"""{a.keywords}({round(p, 3)})""" for a, p in answer_dict])}|不回复({round(per_list[-1], 3)})</m>')
|
||||
|
||||
if (result := random.choices(candidate_answers + [None], weights=per_list)[0]) is None:
|
||||
logger.debug('群聊学习', '➤➤是否回复:但不进行回复')
|
||||
logger.debug('群聊学习', '➤➤但不进行回复')
|
||||
return None
|
||||
result_message = random.choice(result.messages)
|
||||
logger.debug('群聊学习', f'➤➤是否回复:将回复<m>{result_message}</m>')
|
||||
logger.debug('群聊学习', f'➤➤将回复<m>{result_message}</m>')
|
||||
return [result_message]
|
||||
|
||||
async def _ban(self, message_id: Optional[int] = None) -> bool:
|
||||
@ -345,9 +354,13 @@ class LearningChat:
|
||||
|
||||
popularity: List[Tuple[int, List[ChatMessage]]] = sorted(total_messages.items(),
|
||||
key=cmp_to_key(group_popularity_cmp))
|
||||
logger.debug('群聊学习', f'主动发言:群热度排行<m>{">".join([str(g[0]) for g in popularity])}</m>')
|
||||
logger.debug('群聊学习', f'主动发言:群热度排行<m>{">>".join([str(g[0]) for g in popularity])}</m>')
|
||||
for group_id, messages in popularity:
|
||||
if len(messages) < 30:
|
||||
continue
|
||||
|
||||
config = config_manager.get_group_config(group_id)
|
||||
ban_words = set(chat_config.ban_words + config.ban_words + ['[CQ:xml', '[CQ:json', '[CQ:at', '[CQ:video', '[CQ:record', '[CQ:share'])
|
||||
|
||||
# 是否开启了主动发言
|
||||
if not config.speak_enable:
|
||||
@ -378,6 +391,12 @@ class LearningChat:
|
||||
weights=[answer.count + 1 if answer.time >= today_time else answer.count
|
||||
for answer in answers])[0]
|
||||
message = random.choice(answer.messages)
|
||||
if len(message) < 2:
|
||||
continue
|
||||
if message.startswith('[') and message.endswith(']'):
|
||||
continue
|
||||
if any(word in message for word in ban_words):
|
||||
continue
|
||||
speak_list.append(message)
|
||||
while random.random() < config.speak_continuously_probability and len(
|
||||
speak_list) < config.speak_continuously_max_len:
|
||||
@ -390,6 +409,12 @@ class LearningChat:
|
||||
weights=[a.count + 1 if a.time >= today_time else a.count
|
||||
for a in follow_answers])[0]
|
||||
message = random.choice(answer.messages)
|
||||
if len(message) < 2:
|
||||
continue
|
||||
if message.startswith('[') and message.endswith(']'):
|
||||
continue
|
||||
if any(word in message for word in ban_words):
|
||||
continue
|
||||
speak_list.append(message)
|
||||
else:
|
||||
break
|
||||
@ -439,18 +464,19 @@ class LearningChat:
|
||||
|
||||
async def _check_allow(self, message: Union[ChatMessage, ChatAnswer]) -> bool:
|
||||
raw_message = message.message if isinstance(message, ChatMessage) else message.messages[0]
|
||||
keywords = message.keywords
|
||||
if len(raw_message) < 2:
|
||||
return False
|
||||
if any(i in raw_message for i in
|
||||
{'[CQ:xml', '[CQ:json', '[CQ:at', '[CQ:video', '[CQ:record', '[CQ:share'}):
|
||||
# logger.debug('群聊学习', f'➤检验<m>{keywords}</m><r>不通过</r>')
|
||||
return False
|
||||
if any(i in raw_message for i in self.config.ban_words):
|
||||
if any(i in raw_message for i in self.ban_words):
|
||||
# logger.debug('群聊学习', f'➤检验<m>{keywords}</m><r>不通过</r>')
|
||||
return False
|
||||
if raw_message.startswith('[') and raw_message.endswith(']'):
|
||||
# logger.debug('群聊学习', f'➤检验<m>{keywords}</m><r>不通过</r>')
|
||||
return False
|
||||
if ban_word := await ChatBlackList.get_or_none(keywords=keywords):
|
||||
if ban_word := await ChatBlackList.get_or_none(keywords=message.keywords):
|
||||
if ban_word.global_ban or message.group_id in ban_word.ban_group_id:
|
||||
# logger.debug('群聊学习', f'➤检验<m>{keywords}</m><r>不通过</r>')
|
||||
return False
|
||||
|
@ -18,6 +18,7 @@ from tortoise.models import Model
|
||||
from LittlePaimon.database import register_database
|
||||
from LittlePaimon.utils.path import DATABASE_PATH
|
||||
from .config import config_manager
|
||||
|
||||
config = config_manager.config
|
||||
|
||||
JSON_DUMPS = functools.partial(json.dumps, ensure_ascii=False)
|
||||
@ -125,4 +126,4 @@ class ChatBlackList(Model):
|
||||
indexes = ('keywords',)
|
||||
|
||||
|
||||
register_database(db_name='LearningChat', models=['LittlePaimon.plugins.Learning_Chat.models'], db_path=DATABASE_PATH / 'LearningChat.db')
|
||||
register_database(db_name='LearningChat', models=__name__, db_path=DATABASE_PATH / 'LearningChat.db')
|
||||
|
@ -36,7 +36,7 @@ global_config_form = Form(
|
||||
actions=[Action(label='保存', level=LevelEnum.success, type='submit'),
|
||||
Action(label='重置', level=LevelEnum.warning, type='reset')]
|
||||
)
|
||||
group_select = Select(label='分群配置', name='group_id', source='/LittlePaimon/api/get_group_list',
|
||||
group_select = Select(label='分群配置', name='group_id', source='${group_list}',
|
||||
placeholder='选择群')
|
||||
group_config_form = Form(
|
||||
title='分群配置',
|
||||
@ -106,9 +106,10 @@ blacklist_table = TableCRUD(mode='table',
|
||||
api='delete:/LittlePaimon/api/delete_chat?type=blacklist&id=${id}')
|
||||
],
|
||||
footable=True,
|
||||
columns=[TableColumn(type='tpl', tpl='${keywords|truncate:15}', label='内容/关键词',
|
||||
columns=[TableColumn(type='tpl', tpl='${keywords|truncate:20}', label='内容/关键词',
|
||||
name='keywords',
|
||||
searchable=True, popOver={'mode': 'dialog', 'title': '全文',
|
||||
'className': 'break-all',
|
||||
'body': {'type': 'tpl',
|
||||
'tpl': '${keywords}'}}),
|
||||
TableColumn(label='已禁用的群', name='bans', searchable=True),
|
||||
@ -117,7 +118,7 @@ message_table = TableCRUD(mode='table',
|
||||
title='',
|
||||
syncLocation=False,
|
||||
api='/LittlePaimon/api/get_chat_messages',
|
||||
interval=6000,
|
||||
interval=12000,
|
||||
headerToolbar=[ActionType.Ajax(label='删除所有聊天记录',
|
||||
level=LevelEnum.warning,
|
||||
confirmText='确定要删除所有聊天记录吗?',
|
||||
@ -135,8 +136,9 @@ message_table = TableCRUD(mode='table',
|
||||
columns=[TableColumn(label='消息ID', name='message_id'),
|
||||
TableColumn(label='群ID', name='group_id', searchable=True),
|
||||
TableColumn(label='用户ID', name='user_id', searchable=True),
|
||||
TableColumn(type='tpl', tpl='${message|truncate:15}', label='消息', name='message',
|
||||
TableColumn(type='tpl', tpl='${raw_message|truncate:20}', label='消息', name='message',
|
||||
searchable=True, popOver={'mode': 'dialog', 'title': '消息全文',
|
||||
'className': 'break-all',
|
||||
'body': {'type': 'tpl',
|
||||
'tpl': '${raw_message}'}}),
|
||||
TableColumn(type='tpl', tpl='${time|date:YYYY-MM-DD HH\\:mm\\:ss}', label='时间',
|
||||
@ -147,7 +149,7 @@ answer_table = TableCRUD(
|
||||
syncLocation=False,
|
||||
footable=True,
|
||||
api='/LittlePaimon/api/get_chat_answers',
|
||||
interval=6000,
|
||||
interval=12000,
|
||||
headerToolbar=[ActionType.Ajax(label='删除所有已学习的回复',
|
||||
level=LevelEnum.warning,
|
||||
confirmText='确定要删除所有已学习的回复吗?',
|
||||
@ -162,8 +164,8 @@ answer_table = TableCRUD(
|
||||
api='delete:/LittlePaimon/api/delete_chat?type=answer&id=${id}')],
|
||||
columns=[TableColumn(label='ID', name='id', visible=False),
|
||||
TableColumn(label='群ID', name='group_id', searchable=True),
|
||||
TableColumn(type='tpl', tpl='${keywords|truncate:15}', label='内容/关键词', name='keywords',
|
||||
searchable=True, popOver={'mode': 'dialog', 'title': '内容全文',
|
||||
TableColumn(type='tpl', tpl='${keywords|truncate:20}', label='内容/关键词', name='keywords',
|
||||
searchable=True, popOver={'mode': 'dialog', 'title': '内容全文', 'className': 'break-all',
|
||||
'body': {'type': 'tpl', 'tpl': '${keywords}'}}),
|
||||
TableColumn(type='tpl', tpl='${time|date:YYYY-MM-DD HH\\:mm\\:ss}', label='最后学习时间', name='time',
|
||||
sortable=True),
|
||||
@ -176,7 +178,7 @@ answer_table_on_context = TableCRUD(
|
||||
syncLocation=False,
|
||||
footable=True,
|
||||
api='/LittlePaimon/api/get_chat_answers?context_id=${id}&page=${page}&perPage=${perPage}&orderBy=${orderBy}&orderDir=${orderDir}',
|
||||
interval=6000,
|
||||
interval=12000,
|
||||
headerToolbar=[ActionType.Ajax(label='删除该内容所有回复',
|
||||
level=LevelEnum.warning,
|
||||
confirmText='确定要删除该条内容已学习的回复吗?',
|
||||
@ -191,8 +193,8 @@ answer_table_on_context = TableCRUD(
|
||||
api='delete:/LittlePaimon/api/delete_chat?type=answer&id=${id}')],
|
||||
columns=[TableColumn(label='ID', name='id', visible=False),
|
||||
TableColumn(label='群ID', name='group_id'),
|
||||
TableColumn(type='tpl', tpl='${keywords|truncate:15}', label='内容/关键词', name='keywords',
|
||||
searchable=True, popOver={'mode': 'dialog', 'title': '内容全文',
|
||||
TableColumn(type='tpl', tpl='${keywords|truncate:20}', label='内容/关键词', name='keywords',
|
||||
searchable=True, popOver={'mode': 'dialog', 'title': '内容全文', 'className': 'break-all',
|
||||
'body': {'type': 'tpl', 'tpl': '${keywords}'}}),
|
||||
TableColumn(type='tpl', tpl='${time|date:YYYY-MM-DD HH\\:mm\\:ss}', label='最后学习时间', name='time',
|
||||
sortable=True),
|
||||
@ -204,7 +206,7 @@ context_table = TableCRUD(mode='table',
|
||||
title='',
|
||||
syncLocation=False,
|
||||
api='/LittlePaimon/api/get_chat_contexts',
|
||||
interval=6000,
|
||||
interval=12000,
|
||||
headerToolbar=[ActionType.Ajax(label='删除所有学习内容',
|
||||
level=LevelEnum.warning,
|
||||
confirmText='确定要删除所有已学习的内容吗?',
|
||||
@ -225,9 +227,9 @@ context_table = TableCRUD(mode='table',
|
||||
],
|
||||
footable=True,
|
||||
columns=[TableColumn(label='ID', name='id', visible=False),
|
||||
TableColumn(type='tpl', tpl='${keywords|truncate:15}', label='内容/关键词',
|
||||
TableColumn(type='tpl', tpl='${keywords|truncate:20}', label='内容/关键词',
|
||||
name='keywords', searchable=True,
|
||||
popOver={'mode': 'dialog', 'title': '内容全文',
|
||||
popOver={'mode': 'dialog', 'title': '内容全文', 'className': 'break-all',
|
||||
'body': {'type': 'tpl', 'tpl': '${keywords}'}}),
|
||||
TableColumn(type='tpl', tpl='${time|date:YYYY-MM-DD HH\\:mm\\:ss}',
|
||||
label='最后学习时间', name='time', sortable=True),
|
||||
@ -271,6 +273,6 @@ blacklist_page = PageSchema(url='/chat/blacklist', icon='fa fa-ban', label='禁
|
||||
database_page = PageSchema(label='数据库', icon='fa fa-database',
|
||||
children=[message_page, context_page, answer_page, blacklist_page])
|
||||
config_page = PageSchema(url='/chat/configs', icon='fa fa-wrench', label='配置',
|
||||
schema=Page(title='配置', body=[global_config_form, group_select, group_config_form]))
|
||||
schema=Page(title='配置', initApi='/LittlePaimon/api/get_group_list', body=[global_config_form, group_select, group_config_form]))
|
||||
chat_page = PageSchema(label='群聊学习', icon='fa fa-wechat (alias)', children=[config_page, database_page])
|
||||
admin_app.pages[0].children.append(chat_page)
|
||||
|
@ -1,127 +0,0 @@
|
||||
from argparse import Namespace
|
||||
from nonebot import on_shell_command
|
||||
from nonebot.rule import ArgumentParser
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot.params import ShellCommandArgs
|
||||
from nonebot.adapters.onebot.v11 import Bot, MessageEvent, PrivateMessageEvent
|
||||
|
||||
from .handler import handle, get_id
|
||||
from .alias_list import aliases
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="命令别名",
|
||||
description="为机器人指令创建别名",
|
||||
usage=(
|
||||
"添加别名:alias {name}={command}\n"
|
||||
"查看别名:alias {name}\n"
|
||||
"别名列表:alias -p\n"
|
||||
"删除别名:unalias {name}\n"
|
||||
"清空别名:unalias -a"
|
||||
),
|
||||
extra={
|
||||
"unique_name": "alias",
|
||||
"example": "alias '喷水'='echo 呼风唤雨'\nunalias '喷水'",
|
||||
"author": "meetwq <meetwq@gmail.com>",
|
||||
"version": "0.3.2",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
alias_parser = ArgumentParser()
|
||||
alias_parser.add_argument("-p", "--print", action="store_true")
|
||||
alias_parser.add_argument("-g", "--globally", action="store_true")
|
||||
alias_parser.add_argument("names", nargs="*")
|
||||
|
||||
alias = on_shell_command("alias", parser=alias_parser, priority=10, block=True)
|
||||
|
||||
unalias_parser = ArgumentParser()
|
||||
unalias_parser.add_argument("-a", "--all", action="store_true")
|
||||
unalias_parser.add_argument("-g", "--globally", action="store_true")
|
||||
unalias_parser.add_argument("names", nargs="*")
|
||||
|
||||
unalias = on_shell_command("unalias", parser=unalias_parser, priority=10)
|
||||
|
||||
|
||||
@alias.handle()
|
||||
async def _(bot: Bot, event: MessageEvent, args: Namespace = ShellCommandArgs()):
|
||||
gl = args.globally
|
||||
id = "global" if gl else get_id(event)
|
||||
word = "全局别名" if gl else "别名"
|
||||
|
||||
if args.print:
|
||||
message = "全局别名:" if gl else ""
|
||||
alias_all = aliases.get_alias_all(id)
|
||||
for name in sorted(alias_all):
|
||||
message += f"\n{name}='{alias_all[name]}'"
|
||||
if not gl:
|
||||
alias_all_gl = aliases.get_alias_all("global")
|
||||
if alias_all_gl:
|
||||
message += "\n全局别名:"
|
||||
for name in sorted(alias_all_gl):
|
||||
message += f"\n{name}='{alias_all_gl[name]}'"
|
||||
message = message.strip()
|
||||
if message:
|
||||
await alias.finish(message)
|
||||
else:
|
||||
await alias.finish(f"尚未添加任何{word}")
|
||||
|
||||
is_admin = event.sender.role in ["admin", "owner"]
|
||||
is_superuser = str(event.user_id) in bot.config.superusers
|
||||
is_private = isinstance(event, PrivateMessageEvent)
|
||||
|
||||
if gl and not is_superuser:
|
||||
await alias.finish("管理全局别名需要超级用户权限!")
|
||||
|
||||
if not (is_admin or is_superuser or is_private):
|
||||
await alias.finish("管理别名需要群管理员权限!")
|
||||
|
||||
message = ""
|
||||
names = args.names
|
||||
for name in names:
|
||||
if "=" in name:
|
||||
name, command = name.split("=", 1)
|
||||
if name and command and aliases.add_alias(id, name, command):
|
||||
message += f"成功添加{word}:{name}='{command}'\n"
|
||||
else:
|
||||
command = aliases.get_alias(id, name)
|
||||
if command:
|
||||
message += f"{name}='{command}'\n"
|
||||
else:
|
||||
message += f"不存在的{word}:{name}\n"
|
||||
|
||||
message = message.strip()
|
||||
if message:
|
||||
await alias.send(message)
|
||||
|
||||
|
||||
@unalias.handle()
|
||||
async def _(bot: Bot, event: MessageEvent, args: Namespace = ShellCommandArgs()):
|
||||
gl = args.globally
|
||||
id = "global" if gl else get_id(event)
|
||||
word = "全局别名" if gl else "别名"
|
||||
|
||||
is_admin = event.sender.role in ["admin", "owner"]
|
||||
is_superuser = str(event.user_id) in bot.config.superusers
|
||||
is_private = isinstance(event, PrivateMessageEvent)
|
||||
|
||||
if gl and not is_superuser:
|
||||
await alias.finish("管理全局别名需要超级用户权限!")
|
||||
|
||||
if not (is_admin or is_superuser or is_private):
|
||||
await alias.finish("管理别名需要群管理员权限!")
|
||||
|
||||
if args.all and aliases.del_alias_all(id):
|
||||
await unalias.finish(f"成功删除所有{word}")
|
||||
|
||||
message = ""
|
||||
names = args.names
|
||||
for name in names:
|
||||
if aliases.get_alias(id, name):
|
||||
if aliases.del_alias(id, name):
|
||||
message += f"成功删除{word}:{name}\n"
|
||||
else:
|
||||
message += f"不存在的{word}:{name}\n"
|
||||
|
||||
message = message.strip()
|
||||
if message:
|
||||
await unalias.send(message)
|
@ -1,61 +0,0 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
data_path = Path("data/alias")
|
||||
if not data_path.exists():
|
||||
data_path.mkdir(parents=True)
|
||||
|
||||
|
||||
class AliasList:
|
||||
def __init__(self, path: Path):
|
||||
self.path = path
|
||||
self.list = self._load_alias()
|
||||
|
||||
def _load_alias(self) -> dict:
|
||||
if self.path.exists():
|
||||
return json.load(self.path.open("r", encoding="utf-8"))
|
||||
else:
|
||||
return {}
|
||||
|
||||
def _dump_alias(self) -> bool:
|
||||
json.dump(
|
||||
self.list,
|
||||
self.path.open("w", encoding="utf-8"),
|
||||
indent=4,
|
||||
separators=(",", ": "),
|
||||
ensure_ascii=False,
|
||||
)
|
||||
return True
|
||||
|
||||
def add_alias(self, id: str, name: str, command: str) -> bool:
|
||||
if id not in self.list:
|
||||
self.list[id] = {}
|
||||
self.list[id][name] = command
|
||||
return self._dump_alias()
|
||||
|
||||
def del_alias(self, id: str, name: str) -> bool:
|
||||
if id not in self.list:
|
||||
return False
|
||||
self.list[id].pop(name, "")
|
||||
if not self.list[id]:
|
||||
self.list.pop(id, {})
|
||||
return self._dump_alias()
|
||||
|
||||
def del_alias_all(self, id: str) -> bool:
|
||||
self.list.pop(id, {})
|
||||
return self._dump_alias()
|
||||
|
||||
def get_alias(self, id: str, name: str) -> str:
|
||||
if id not in self.list:
|
||||
return ""
|
||||
if name not in self.list[id]:
|
||||
return ""
|
||||
return self.list[id][name]
|
||||
|
||||
def get_alias_all(self, id: str) -> dict:
|
||||
if id not in self.list:
|
||||
return {}
|
||||
return self.list[id].copy()
|
||||
|
||||
|
||||
aliases = AliasList(data_path / "aliases.json")
|
@ -1,30 +0,0 @@
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent
|
||||
from nonebot.message import event_preprocessor
|
||||
|
||||
from .parser import parse_msg
|
||||
|
||||
|
||||
@event_preprocessor
|
||||
async def handle(event: MessageEvent):
|
||||
msgs = event.get_message()
|
||||
if len(msgs) < 1 or msgs[0].type != "text":
|
||||
return
|
||||
msg = str(msgs[0]).lstrip()
|
||||
if not msg:
|
||||
return
|
||||
|
||||
|
||||
try:
|
||||
msg = parse_msg(msg, get_id(event))
|
||||
event.message[0].data["text"] = msg
|
||||
except Exception:
|
||||
return
|
||||
|
||||
|
||||
def get_id(event: MessageEvent) -> str:
|
||||
if event.message_type == 'group':
|
||||
return 'group_' + str(event.group_id)
|
||||
elif event.message_type == 'guild':
|
||||
return 'guild_' + str(event.guild_id)
|
||||
else:
|
||||
return 'private_' + str(event.user_id)
|
@ -1,48 +0,0 @@
|
||||
import shlex
|
||||
from expandvars import expand, ExpandvarsException
|
||||
|
||||
from .alias_list import aliases
|
||||
|
||||
|
||||
def parse_msg(msg: str, id: str) -> str:
|
||||
alias_all = aliases.get_alias_all(id)
|
||||
alias_all_gl = aliases.get_alias_all("global")
|
||||
alias_all_gl.update(alias_all)
|
||||
for name in sorted(alias_all_gl, reverse=True):
|
||||
if msg.startswith(name):
|
||||
return replace_msg(name, msg, alias_all_gl[name])
|
||||
return msg
|
||||
|
||||
|
||||
def replace_msg(cmd: str, msg: str, alias: str) -> str:
|
||||
if "$" not in alias:
|
||||
return alias + msg[len(cmd):]
|
||||
|
||||
args = parse_args(cmd, msg)
|
||||
env = set_env(args)
|
||||
return parse_alias(alias, env)
|
||||
|
||||
|
||||
def parse_args(cmd: str, msg: str) -> list:
|
||||
if cmd.strip() == msg.strip():
|
||||
return []
|
||||
arg = msg[len(cmd):]
|
||||
try:
|
||||
return shlex.split(arg)
|
||||
except ValueError:
|
||||
return [arg]
|
||||
|
||||
|
||||
def set_env(args: list) -> dict:
|
||||
env = {}
|
||||
for i, arg in enumerate(args, start=1):
|
||||
env[str(i)] = arg
|
||||
env["a"] = " ".join(args)
|
||||
return env
|
||||
|
||||
|
||||
def parse_alias(alias: str, env: dict = {}) -> str:
|
||||
try:
|
||||
return expand(alias, environ=env)
|
||||
except ExpandvarsException:
|
||||
return alias
|
@ -6,6 +6,7 @@ from .login import route as login_route
|
||||
from .plugin import route as plugin_route
|
||||
from .status import route as status_route
|
||||
from .utils import authentication
|
||||
from .command_alias import route as command_alias_route
|
||||
|
||||
BaseApiRouter = APIRouter(prefix='/LittlePaimon/api')
|
||||
|
||||
@ -14,3 +15,4 @@ BaseApiRouter.include_router(plugin_route)
|
||||
BaseApiRouter.include_router(bot_info_route)
|
||||
BaseApiRouter.include_router(status_route)
|
||||
BaseApiRouter.include_router(login_route)
|
||||
BaseApiRouter.include_router(command_alias_route)
|
||||
|
@ -19,10 +19,20 @@ route = APIRouter()
|
||||
|
||||
|
||||
@route.get('/get_group_list', response_class=JSONResponse, dependencies=[authentication()])
|
||||
async def get_group_list():
|
||||
@cache(datetime.timedelta(minutes=3))
|
||||
async def get_group_list(include_all: bool = False):
|
||||
try:
|
||||
group_list = await get_bot().get_group_list()
|
||||
return [{'label': f'{group["group_name"]}({group["group_id"]})', 'value': group['group_id']} for group in group_list]
|
||||
group_list = [{'label': f'{group["group_name"]}({group["group_id"]})', 'value': group['group_id']} for group in group_list]
|
||||
if include_all:
|
||||
group_list.insert(0, {'label': '全局', 'value': 'all'})
|
||||
return {
|
||||
'status': 0,
|
||||
'msg': 'ok',
|
||||
'data': {
|
||||
'group_list': group_list
|
||||
}
|
||||
}
|
||||
except ValueError:
|
||||
return {
|
||||
'status': -100,
|
||||
@ -31,6 +41,7 @@ async def get_group_list():
|
||||
|
||||
|
||||
@route.get('/get_group_members', response_class=JSONResponse, dependencies=[authentication()])
|
||||
@cache(datetime.timedelta(minutes=3))
|
||||
async def get_group_members(group_id: int):
|
||||
try:
|
||||
return await get_bot().get_group_member_list(group_id=group_id)
|
||||
@ -42,7 +53,7 @@ async def get_group_members(group_id: int):
|
||||
|
||||
|
||||
@route.get('/get_groups_and_members', response_class=JSONResponse, dependencies=[authentication()])
|
||||
@cache(datetime.timedelta(minutes=10))
|
||||
@cache(datetime.timedelta(minutes=3))
|
||||
async def get_groups_and_members():
|
||||
result = []
|
||||
try:
|
||||
@ -87,7 +98,7 @@ async def get_groups_and_members():
|
||||
|
||||
|
||||
@route.get('/get_friend_list', response_class=JSONResponse, dependencies=[authentication()])
|
||||
@cache(datetime.timedelta(minutes=10))
|
||||
@cache(datetime.timedelta(minutes=3))
|
||||
async def get_friend_list():
|
||||
try:
|
||||
bot: Bot = get_bot()
|
||||
|
55
LittlePaimon/web/api/command_alias.py
Normal file
55
LittlePaimon/web/api/command_alias.py
Normal file
@ -0,0 +1,55 @@
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from tortoise.queryset import Q
|
||||
from LittlePaimon.database import CommandAlias
|
||||
from LittlePaimon.config import ConfigManager
|
||||
from LittlePaimon.config.command import modify_msg
|
||||
|
||||
from .utils import authentication
|
||||
|
||||
route = APIRouter()
|
||||
|
||||
|
||||
@route.get('/command_alias', response_class=JSONResponse, dependencies=[authentication()])
|
||||
async def get_command_alias():
|
||||
alias = await CommandAlias.all().order_by('priority').values()
|
||||
return {
|
||||
'status': 0,
|
||||
'msg': 'ok',
|
||||
'data': {
|
||||
'command_alias_enable': ConfigManager.config.command_alias_enable,
|
||||
'items': alias,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@route.post('/command_alias', response_class=JSONResponse, dependencies=[authentication()])
|
||||
async def add_command_alias(data: dict):
|
||||
ConfigManager.config.update(command_alias_enable=data['command_alias_enable'])
|
||||
ConfigManager.save()
|
||||
data = data['items']
|
||||
await CommandAlias.filter(id__not_in=[a['id'] for a in data if a.get('id')]).delete()
|
||||
for alias in data:
|
||||
alias['priority'] = data.index(alias)
|
||||
if alias.get('id'):
|
||||
await CommandAlias.filter(id=alias.pop('id')).update(**alias)
|
||||
else:
|
||||
await CommandAlias.create(**alias)
|
||||
return {
|
||||
'status': 0,
|
||||
'msg': '命令别名保存成功'
|
||||
}
|
||||
|
||||
|
||||
@route.get('/test_command_alias', response_class=JSONResponse, dependencies=[authentication()])
|
||||
async def test_command_alias(group_id: str, message: str):
|
||||
all_alias = await CommandAlias.filter(Q(group_id=group_id) | Q(group_id='all')).order_by('priority')
|
||||
msg = modify_msg(all_alias, message)
|
||||
return {
|
||||
'status': 0,
|
||||
'msg': '测试成功',
|
||||
'data': {
|
||||
'new_msg': msg
|
||||
}
|
||||
}
|
71
LittlePaimon/web/pages/command_alias.py
Normal file
71
LittlePaimon/web/pages/command_alias.py
Normal file
@ -0,0 +1,71 @@
|
||||
from amis import Form, Switch, InputSubForm, Hidden, ButtonGroupSelect, InputText, Radios, Checkbox, Select, Static, Alert, Html, PageSchema, Page
|
||||
|
||||
main_form = Form(
|
||||
title='命令别名',
|
||||
initApi='get:/LittlePaimon/api/command_alias',
|
||||
api='post:/LittlePaimon/api/command_alias',
|
||||
submitText='保存',
|
||||
body=[
|
||||
Switch(name='command_alias_enable', label='功能开关', onText='全局启用', offText='全局关闭'),
|
||||
InputSubForm(
|
||||
name='items',
|
||||
label='已设置的命令别名',
|
||||
multiple=True,
|
||||
btnLabel='${alias} >> ${command}',
|
||||
draggable=True,
|
||||
addable=True,
|
||||
removable=True,
|
||||
addButtonText='添加命令别名',
|
||||
showErrorMsg=False,
|
||||
form=Form(
|
||||
title='命令别名',
|
||||
body=[
|
||||
Hidden(name='id'),
|
||||
ButtonGroupSelect(name='is_regex', label='匹配模式', value=False,
|
||||
options=[
|
||||
{'label': '普通匹配', 'value': False},
|
||||
{'label': '正则匹配', 'value': True}
|
||||
]),
|
||||
InputText(name='alias', label='命令别名', required=True),
|
||||
InputText(name='command', label='原命令', required=True),
|
||||
Radios(name='mode', label='匹配位置', value='前缀', hiddenOn='${is_regex == true}', required=True,
|
||||
options=[
|
||||
{
|
||||
'label': '前缀',
|
||||
'value': '前缀'
|
||||
},
|
||||
{
|
||||
'label': '后缀',
|
||||
'value': '后缀'
|
||||
},
|
||||
{
|
||||
'label': '全匹配',
|
||||
'value': '全匹配'
|
||||
}
|
||||
]),
|
||||
Checkbox(name='is_reverse', label='是否反转', hiddenOn='${is_regex == true}'),
|
||||
Select(name='group_id', label='设置群', value='all', required=True,
|
||||
source='${group_list}')
|
||||
]
|
||||
)
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
test_form = Form(
|
||||
title='测试',
|
||||
api='get:/LittlePaimon/api/test_command_alias?group_id=${group_id}&message=${message}',
|
||||
submitText='测试',
|
||||
body=[
|
||||
Select(name='group_id', label='测试群', value='all', required=True, source='${group_list}'),
|
||||
InputText(name='message', label='测试消息', required=True),
|
||||
Static(className='text-red-600', name='new_msg', label='命令别名修改后消息', visibleOn="typeof data.new_msg !== 'undefined'")
|
||||
]
|
||||
)
|
||||
|
||||
tips = Alert(level='info',
|
||||
body=Html(html='命令别名的详细用法和配置例子可以在<a href="https://docs.paimon.cherishmoon.fun/configs/manage/plugin-manage.html#%E8%87%AA%E5%AE%9A%E4%B9%89%E5%91%BD%E4%BB%A4%E5%88%AB%E5%90%8D" target="_blank">文档</a>中查看,配置保存后,可以在下方的"测试"一栏进行实时测试。'))
|
||||
|
||||
|
||||
page = PageSchema(url='/bot_config/command_alias', icon='fa fa-angle-double-right', label='命令别名',
|
||||
schema=Page(title='', initApi='/LittlePaimon/api/get_group_list?include_all=true', body=[tips, main_form, test_form]))
|
@ -5,6 +5,7 @@ from .home_page import page as home_page
|
||||
from .plugin_manage import page as plugin_manage_page
|
||||
from .private_cookie import page as private_cookie_page
|
||||
from .public_cookie import page as public_cookie_page
|
||||
from .command_alias import page as command_alias_page
|
||||
|
||||
|
||||
github_logo = Tpl(className='w-full',
|
||||
@ -21,7 +22,7 @@ admin_app = App(brandName='LittlePaimon',
|
||||
PageSchema(label='Cookie管理', icon='fa fa-key',
|
||||
children=[public_cookie_page, private_cookie_page]),
|
||||
PageSchema(label='机器人配置', icon='fa fa-wrench',
|
||||
children=[plugin_manage_page, config_page])
|
||||
children=[plugin_manage_page, config_page, command_alias_page])
|
||||
]}],
|
||||
footer=f'<div class="p-2 text-center bg-blue-100">Copyright © 2021 - 2022 <a href="https://github.com/CMHopeSunshine/LittlePaimon" target="_blank" class="link-secondary">LittlePaimon v{__version__}</a> X<a target="_blank" href="https://github.com/baidu/amis" class="link-secondary" rel="noopener"> amis v2.2.0</a></div>')
|
||||
|
||||
|
@ -121,12 +121,11 @@ cards_curd = CardsCRUD(mode='cards',
|
||||
syncLocation=False,
|
||||
api='/LittlePaimon/api/get_plugins',
|
||||
loadDataOnce=True,
|
||||
source='${rows | filter:name:match:keywords_name | filter:description:match:keywords_description | filter:status:match:status}',
|
||||
source='${rows | filter:name:match:keywords_name | filter:description:match:keywords_description}',
|
||||
filter={
|
||||
'body': [
|
||||
InputText(name='keywords_name', label='插件名'),
|
||||
InputText(name='keywords_description', label='插件描述'),
|
||||
Switch(name='status', label='插件状态', onText='启用', offText='禁用')
|
||||
InputText(name='keywords_description', label='插件描述')
|
||||
]
|
||||
},
|
||||
perPage=12,
|
||||
|
Loading…
x
Reference in New Issue
Block a user