diff --git a/LittlePaimon/manager/bot_manager/__init__.py b/LittlePaimon/manager/bot_manager/__init__.py index 26152fa..5055b8f 100644 --- a/LittlePaimon/manager/bot_manager/__init__.py +++ b/LittlePaimon/manager/bot_manager/__init__.py @@ -120,7 +120,7 @@ async def _(event: MessageEvent, state: T_State, msg: Message = CommandArg()): @broadcast.got('groups', prompt='要广播到哪些群呢?多个群以空格隔开,或发送"全部"向所有群广播') -async def _(event: MessageEvent, bot: Bot, msg: str = ArgPlainText('msg'), groups: str = ArgPlainText('groups')): +async def _(event: MessageEvent, bot: Bot, msg: Message = Arg('msg'), groups: str = ArgPlainText('groups')): group_list = await bot.get_group_list() group_list = [g['group_id'] for g in group_list] if groups in {'全部', '所有', 'all'}: diff --git a/LittlePaimon/plugins/Mihoyo_bbs/__init__.py b/LittlePaimon/plugins/Mihoyo_bbs/__init__.py new file mode 100644 index 0000000..06361a7 --- /dev/null +++ b/LittlePaimon/plugins/Mihoyo_bbs/__init__.py @@ -0,0 +1,36 @@ +from nonebot import on_regex +from nonebot.params import RegexMatched +from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment +from nonebot.plugin import PluginMetadata + +from LittlePaimon.utils import logger +from LittlePaimon.utils.brower import screenshot + +__plugin_meta__ = PluginMetadata( + name='米游社', + description='米游社', + usage='', + extra={ + 'author': '惜月', + 'version': '3.0', + 'priority': 20, + } +) + +post_screenshot = on_regex(r'(https://)?(m\.)?bbs.mihoyo.com/.+/article/\d+', priority=20, block=False, state={ + 'pm_name': '米游社帖子截图', + 'pm_description': '(被动技能)自动对消息中的米游社帖子链接内容进行截图发送', + 'pm_usage': '米游社帖子截图', + 'pm_priority': 1 +}) + + +@post_screenshot.handle() +async def _(event: MessageEvent, url: str = RegexMatched()): + logger.info('米游社', f'开始截图帖子{url}') + try: + img = await screenshot(url, elements=['.mhy-article-page__main']) + except Exception: + logger.info('米游社', f'帖子{url}截图失败') + return + await post_screenshot.finish(MessageSegment.image(img)) diff --git a/LittlePaimon/plugins/Paimon_Abyss/__init__.py b/LittlePaimon/plugins/Paimon_Abyss/__init__.py index 115f111..45abbc2 100644 --- a/LittlePaimon/plugins/Paimon_Abyss/__init__.py +++ b/LittlePaimon/plugins/Paimon_Abyss/__init__.py @@ -80,4 +80,4 @@ async def _(event: MessageEvent): result = await draw_team(str(event.user_id)) except Exception as e: result = f'制作深渊配队时出错:{e}' - await abyss_team.finish(result) + await abyss_team.finish(result, at_sender=True) diff --git a/LittlePaimon/plugins/tools/__init__.py b/LittlePaimon/plugins/tools/__init__.py index bd1c31a..c9b4aa8 100644 --- a/LittlePaimon/plugins/tools/__init__.py +++ b/LittlePaimon/plugins/tools/__init__.py @@ -1,11 +1,11 @@ from nonebot import on_command from nonebot.params import CommandArg from nonebot.rule import Rule -from nonebot.adapters.onebot.v11 import Message, MessageEvent +from nonebot.adapters.onebot.v11 import Message, MessageEvent, MessageSegment from nonebot.plugin import PluginMetadata from LittlePaimon import SUPERUSERS from LittlePaimon.manager.plugin_manager import plugin_manager as pm -from LittlePaimon.utils.brower import AsyncPlaywright +from LittlePaimon.utils.brower import screenshot async def permission_check(event: MessageEvent) -> bool: @@ -37,7 +37,10 @@ screenshot_cmd = on_command('网页截图', priority=10, block=True, rule=Rule(p async def _(event: MessageEvent, msg: Message = CommandArg()): await screenshot_cmd.send('正在尝试截图,请稍等...') url = msg.extract_plain_text().strip() - img = await AsyncPlaywright.screenshot(url) - await screenshot_cmd.send(img) + try: + img = await screenshot(url) + await screenshot_cmd.send(MessageSegment.image(img)) + except Exception: + await screenshot_cmd.send('网页截图失败,无法访问该网页,请稍候再试') diff --git a/LittlePaimon/utils/brower.py b/LittlePaimon/utils/brower.py index bff1e9c..a93c261 100644 --- a/LittlePaimon/utils/brower.py +++ b/LittlePaimon/utils/brower.py @@ -1,174 +1,79 @@ -# https://github.com/HibiKier/zhenxun_bot/blob/main/utils -import asyncio -from pathlib import Path -from typing import Optional, Literal, Union, List, Dict -from nonebot import logger -from nonebot.adapters.onebot.v11 import MessageSegment -from playwright.async_api import Browser, async_playwright, Page, BrowserContext +from typing import Optional, Literal, Tuple, Union, List, AsyncGenerator +from playwright.async_api import Page, Browser, Playwright, async_playwright +from contextlib import asynccontextmanager +from LittlePaimon import DRIVER +from LittlePaimon.utils import logger +_playwright: Optional[Playwright] = None _browser: Optional[Browser] = None -async def init(**kwargs) -> Optional[Browser]: +def get_brower() -> Browser: + assert _browser + return _browser + + +@DRIVER.on_startup +async def start_browser(): + global _playwright global _browser - browser = await async_playwright().start() try: - _browser = await browser.chromium.launch(**kwargs) - return _browser - except Exception: - await asyncio.get_event_loop().run_in_executor(None, install) - _browser = await browser.chromium.launch(**kwargs) - return None + _playwright = await async_playwright().start() + _browser = await _playwright.chromium.launch() + except NotImplementedError: + logger.warning('Playwright', '初始化失败,请关闭FASTAPI_RELOAD') + except Exception as e: + logger.warning('Playwright', f'初始化失败,错误信息:{e}') + if _browser: + await _browser.close() -async def get_browser(**kwargs) -> Browser: - return _browser or await init(**kwargs) +@DRIVER.on_shutdown +async def shutdown_browser(): + if _browser: + await _browser.close() + if _playwright: + await _playwright.stop() -def install(): - """自动安装、更新 Chromium""" - logger.info("正在检查 Chromium 更新") - import sys - from playwright.__main__ import main - - sys.argv = ["", "install", "chromium"] +@asynccontextmanager +async def get_new_page(**kwargs) -> AsyncGenerator[Page, None]: + assert _browser, "playwright尚未初始化" + page = await _browser.new_page(**kwargs) try: - main() - except SystemExit: - pass + yield page + finally: + await page.close() -class AsyncPlaywright: - @classmethod - async def _new_page(cls, user_agent: Optional[str] = None, **kwargs) -> Page: - """ - 说明: - 获取一个新页面 - 参数: - :param user_agent: 请求头 - """ - browser = await get_browser() - if browser: - return await browser.new_page(user_agent=user_agent, **kwargs) - logger.info('获取浏览器失败') - raise BrowserIsNone('获取浏览器失败') +async def screenshot(url: str, + *, + elements: Optional[Union[List[str]]] = None, + timeout: Optional[float] = 60000, + wait_until: Literal["domcontentloaded", "load", "networkidle"] = "networkidle", + viewport_size: Tuple[int, int] = (1920, 1080), + full_page=True, + **kwargs): + if not url.startswith(('https://', 'http://')): + url = f'https://{url}' + viewport_size = {'width': viewport_size[0], 'height': viewport_size[1]} + brower = get_brower() + page = await brower.new_page( + viewport=viewport_size, + **kwargs) + try: + await page.goto(url, wait_until=wait_until, timeout=timeout) + assert page + if not elements: + return await page.screenshot(timeout=timeout, full_page=full_page) + for e in elements: + card = await page.query_selector(e) + assert card + clip = await card.bounding_box() + return await page.screenshot(clip=clip, timeout=timeout, full_page=full_page, path='test.png') - @classmethod - async def new_context(cls, user_agent: Optional[str] = None, **kwargs) -> BrowserContext: - """ - 说明: - 获取一个新上下文 - 参数: - :param user_agent: 请求头 - """ - browser = await get_browser() - if browser: - return await browser.new_context(user_agent=user_agent, **kwargs) - logger.info('获取浏览器失败') - raise BrowserIsNone('获取浏览器失败') - - @classmethod - async def goto( - cls, - url: str, - *, - timeout: Optional[float] = 100000, - wait_until: Optional[ - Literal["domcontentloaded", "load", "networkidle"] - ] = "networkidle", - referer: str = None, - **kwargs - ) -> Optional[Page]: - """ - 说明: - goto - 参数: - :param url: 网址 - :param timeout: 超时限制 - :param wait_until: 等待类型 - :param referer: - """ - page = None - try: - page = await cls._new_page(**kwargs) - await page.goto(url, timeout=timeout, wait_until=wait_until, referer=referer) - return page - except Exception as e: - logger.warning(f"Playwright 访问 url:{url} 发生错误 {type(e)}:{e}") - if page: - await page.close() - return None - - @classmethod - async def screenshot( - cls, - url: str, - *, - element: Optional[Union[str, List[str]]] = None, - path: Optional[Union[Path, str]] = None, - wait_time: Optional[int] = None, - viewport_size: Dict[str, int] = None, - wait_until: Optional[ - Literal["domcontentloaded", "load", "networkidle"] - ] = "networkidle", - timeout: float = None, - **kwargs - ) -> Optional[MessageSegment]: - """ - 说明: - 截图,该方法仅用于简单快捷截图,复杂截图请操作 page - 参数: - :param url: 网址 - :param path: 存储路径 - :param element: 元素选择 - :param wait_time: 等待截取超时时间 - :param viewport_size: 窗口大小 - :param wait_until: 等待类型 - :param timeout: 超时限制 - """ - if not url.startswith(('https://', 'http://')): - url = f'https://{url}' - page = None - if viewport_size is None: - viewport_size = dict(width=1920, height=1080) - if path and isinstance(path, str): - path = Path(path) - try: - page = await cls.goto(url, wait_until=wait_until, **kwargs) - if page is None: - return MessageSegment.text('截图失败,无法访问网页,请稍候再试') - await page.set_viewport_size(viewport_size) - if element: - if isinstance(element, str): - if wait_time: - card = await page.wait_for_selector(element, timeout=wait_time * 1000) - else: - card = await page.query_selector(element) - else: - card = page - for e in element: - if wait_time: - card = await card.wait_for_selector(e, timeout=wait_time * 1000) - else: - card = await card.query_selector(e) - else: - card = page - if path: - img = await card.screenshot(path=path, timeout=timeout) - else: - img = await card.screenshot(timeout=timeout) - return MessageSegment.image(img) - except Exception as e: - logger.warning(f"Playwright 截图 url:{url} element:{element} 发生错误 {type(e)}:{e}") - return MessageSegment.text(f'截图失败,报错信息:{e}') - finally: - if page: - await page.close() - - -class UrlPathNumberNotEqual(Exception): - pass - - -class BrowserIsNone(Exception): - pass + except Exception as e: + raise e + finally: + if page: + await page.close() diff --git a/LittlePaimon/utils/message.py b/LittlePaimon/utils/message.py index a0cdedd..fbdda83 100644 --- a/LittlePaimon/utils/message.py +++ b/LittlePaimon/utils/message.py @@ -304,7 +304,7 @@ async def get_uid(event: Optional[MessageEvent] = None, user_id: Optional[str] = return bind_uid.uid else: if event: - if nickname_uid := re.search(r'[1258]\d{8}', event.sender.card): + if nickname_uid := re.search(r'[1258]\d{8}', event.sender.card or event.sender.nickname): return nickname_uid.group() else: if group_id: