From dfcae8abedf9b99ce7231539dc7d5fa0b0b7a5fd Mon Sep 17 00:00:00 2001
From: CMHopeSunshine <277073121@qq.com>
Date: Mon, 3 Oct 2022 18:18:20 +0800
Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=96=B0=E5=A2=9E`=E7=B1=B3?=
=?UTF-8?q?=E6=B8=B8=E7=A4=BE=E5=B8=96=E5=AD=90=E9=93=BE=E6=8E=A5=E6=88=AA?=
=?UTF-8?q?=E5=9B=BE`=EF=BC=8C`=E6=B6=88=E6=81=AF=E5=B9=BF=E6=92=AD`?=
=?UTF-8?q?=E6=94=AF=E6=8C=81=E5=9B=BE=E7=89=87=EF=BC=8C=E4=BF=AE=E5=A4=8D?=
=?UTF-8?q?=E4=BB=8E=E6=98=B5=E7=A7=B0=E8=8E=B7=E5=8F=96uid=E6=8A=A5?=
=?UTF-8?q?=E9=94=99=E7=9A=84=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
LittlePaimon/manager/bot_manager/__init__.py | 2 +-
LittlePaimon/plugins/Mihoyo_bbs/__init__.py | 36 +++
LittlePaimon/plugins/Paimon_Abyss/__init__.py | 2 +-
LittlePaimon/plugins/tools/__init__.py | 11 +-
LittlePaimon/utils/brower.py | 225 +++++-------------
LittlePaimon/utils/message.py | 2 +-
6 files changed, 111 insertions(+), 167 deletions(-)
create mode 100644 LittlePaimon/plugins/Mihoyo_bbs/__init__.py
diff --git a/LittlePaimon/manager/bot_manager/__init__.py b/LittlePaimon/manager/bot_manager/__init__.py
index 26152fa..5055b8f 100644
--- a/LittlePaimon/manager/bot_manager/__init__.py
+++ b/LittlePaimon/manager/bot_manager/__init__.py
@@ -120,7 +120,7 @@ async def _(event: MessageEvent, state: T_State, msg: Message = CommandArg()):
@broadcast.got('groups', prompt='要广播到哪些群呢?多个群以空格隔开,或发送"全部"向所有群广播')
-async def _(event: MessageEvent, bot: Bot, msg: str = ArgPlainText('msg'), groups: str = ArgPlainText('groups')):
+async def _(event: MessageEvent, bot: Bot, msg: Message = Arg('msg'), groups: str = ArgPlainText('groups')):
group_list = await bot.get_group_list()
group_list = [g['group_id'] for g in group_list]
if groups in {'全部', '所有', 'all'}:
diff --git a/LittlePaimon/plugins/Mihoyo_bbs/__init__.py b/LittlePaimon/plugins/Mihoyo_bbs/__init__.py
new file mode 100644
index 0000000..06361a7
--- /dev/null
+++ b/LittlePaimon/plugins/Mihoyo_bbs/__init__.py
@@ -0,0 +1,36 @@
+from nonebot import on_regex
+from nonebot.params import RegexMatched
+from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment
+from nonebot.plugin import PluginMetadata
+
+from LittlePaimon.utils import logger
+from LittlePaimon.utils.brower import screenshot
+
+__plugin_meta__ = PluginMetadata(
+ name='米游社',
+ description='米游社',
+ usage='',
+ extra={
+ 'author': '惜月',
+ 'version': '3.0',
+ 'priority': 20,
+ }
+)
+
+post_screenshot = on_regex(r'(https://)?(m\.)?bbs.mihoyo.com/.+/article/\d+', priority=20, block=False, state={
+ 'pm_name': '米游社帖子截图',
+ 'pm_description': '(被动技能)自动对消息中的米游社帖子链接内容进行截图发送',
+ 'pm_usage': '米游社帖子截图',
+ 'pm_priority': 1
+})
+
+
+@post_screenshot.handle()
+async def _(event: MessageEvent, url: str = RegexMatched()):
+ logger.info('米游社', f'开始截图帖子{url}')
+ try:
+ img = await screenshot(url, elements=['.mhy-article-page__main'])
+ except Exception:
+ logger.info('米游社', f'帖子{url}截图失败')
+ return
+ await post_screenshot.finish(MessageSegment.image(img))
diff --git a/LittlePaimon/plugins/Paimon_Abyss/__init__.py b/LittlePaimon/plugins/Paimon_Abyss/__init__.py
index 115f111..45abbc2 100644
--- a/LittlePaimon/plugins/Paimon_Abyss/__init__.py
+++ b/LittlePaimon/plugins/Paimon_Abyss/__init__.py
@@ -80,4 +80,4 @@ async def _(event: MessageEvent):
result = await draw_team(str(event.user_id))
except Exception as e:
result = f'制作深渊配队时出错:{e}'
- await abyss_team.finish(result)
+ await abyss_team.finish(result, at_sender=True)
diff --git a/LittlePaimon/plugins/tools/__init__.py b/LittlePaimon/plugins/tools/__init__.py
index bd1c31a..c9b4aa8 100644
--- a/LittlePaimon/plugins/tools/__init__.py
+++ b/LittlePaimon/plugins/tools/__init__.py
@@ -1,11 +1,11 @@
from nonebot import on_command
from nonebot.params import CommandArg
from nonebot.rule import Rule
-from nonebot.adapters.onebot.v11 import Message, MessageEvent
+from nonebot.adapters.onebot.v11 import Message, MessageEvent, MessageSegment
from nonebot.plugin import PluginMetadata
from LittlePaimon import SUPERUSERS
from LittlePaimon.manager.plugin_manager import plugin_manager as pm
-from LittlePaimon.utils.brower import AsyncPlaywright
+from LittlePaimon.utils.brower import screenshot
async def permission_check(event: MessageEvent) -> bool:
@@ -37,7 +37,10 @@ screenshot_cmd = on_command('网页截图', priority=10, block=True, rule=Rule(p
async def _(event: MessageEvent, msg: Message = CommandArg()):
await screenshot_cmd.send('正在尝试截图,请稍等...')
url = msg.extract_plain_text().strip()
- img = await AsyncPlaywright.screenshot(url)
- await screenshot_cmd.send(img)
+ try:
+ img = await screenshot(url)
+ await screenshot_cmd.send(MessageSegment.image(img))
+ except Exception:
+ await screenshot_cmd.send('网页截图失败,无法访问该网页,请稍候再试')
diff --git a/LittlePaimon/utils/brower.py b/LittlePaimon/utils/brower.py
index bff1e9c..a93c261 100644
--- a/LittlePaimon/utils/brower.py
+++ b/LittlePaimon/utils/brower.py
@@ -1,174 +1,79 @@
-# https://github.com/HibiKier/zhenxun_bot/blob/main/utils
-import asyncio
-from pathlib import Path
-from typing import Optional, Literal, Union, List, Dict
-from nonebot import logger
-from nonebot.adapters.onebot.v11 import MessageSegment
-from playwright.async_api import Browser, async_playwright, Page, BrowserContext
+from typing import Optional, Literal, Tuple, Union, List, AsyncGenerator
+from playwright.async_api import Page, Browser, Playwright, async_playwright
+from contextlib import asynccontextmanager
+from LittlePaimon import DRIVER
+from LittlePaimon.utils import logger
+_playwright: Optional[Playwright] = None
_browser: Optional[Browser] = None
-async def init(**kwargs) -> Optional[Browser]:
+def get_brower() -> Browser:
+ assert _browser
+ return _browser
+
+
+@DRIVER.on_startup
+async def start_browser():
+ global _playwright
global _browser
- browser = await async_playwright().start()
try:
- _browser = await browser.chromium.launch(**kwargs)
- return _browser
- except Exception:
- await asyncio.get_event_loop().run_in_executor(None, install)
- _browser = await browser.chromium.launch(**kwargs)
- return None
+ _playwright = await async_playwright().start()
+ _browser = await _playwright.chromium.launch()
+ except NotImplementedError:
+ logger.warning('Playwright', '初始化失败,请关闭FASTAPI_RELOAD')
+ except Exception as e:
+ logger.warning('Playwright', f'初始化失败,错误信息:{e}')
+ if _browser:
+ await _browser.close()
-async def get_browser(**kwargs) -> Browser:
- return _browser or await init(**kwargs)
+@DRIVER.on_shutdown
+async def shutdown_browser():
+ if _browser:
+ await _browser.close()
+ if _playwright:
+ await _playwright.stop()
-def install():
- """自动安装、更新 Chromium"""
- logger.info("正在检查 Chromium 更新")
- import sys
- from playwright.__main__ import main
-
- sys.argv = ["", "install", "chromium"]
+@asynccontextmanager
+async def get_new_page(**kwargs) -> AsyncGenerator[Page, None]:
+ assert _browser, "playwright尚未初始化"
+ page = await _browser.new_page(**kwargs)
try:
- main()
- except SystemExit:
- pass
+ yield page
+ finally:
+ await page.close()
-class AsyncPlaywright:
- @classmethod
- async def _new_page(cls, user_agent: Optional[str] = None, **kwargs) -> Page:
- """
- 说明:
- 获取一个新页面
- 参数:
- :param user_agent: 请求头
- """
- browser = await get_browser()
- if browser:
- return await browser.new_page(user_agent=user_agent, **kwargs)
- logger.info('获取浏览器失败')
- raise BrowserIsNone('获取浏览器失败')
+async def screenshot(url: str,
+ *,
+ elements: Optional[Union[List[str]]] = None,
+ timeout: Optional[float] = 60000,
+ wait_until: Literal["domcontentloaded", "load", "networkidle"] = "networkidle",
+ viewport_size: Tuple[int, int] = (1920, 1080),
+ full_page=True,
+ **kwargs):
+ if not url.startswith(('https://', 'http://')):
+ url = f'https://{url}'
+ viewport_size = {'width': viewport_size[0], 'height': viewport_size[1]}
+ brower = get_brower()
+ page = await brower.new_page(
+ viewport=viewport_size,
+ **kwargs)
+ try:
+ await page.goto(url, wait_until=wait_until, timeout=timeout)
+ assert page
+ if not elements:
+ return await page.screenshot(timeout=timeout, full_page=full_page)
+ for e in elements:
+ card = await page.query_selector(e)
+ assert card
+ clip = await card.bounding_box()
+ return await page.screenshot(clip=clip, timeout=timeout, full_page=full_page, path='test.png')
- @classmethod
- async def new_context(cls, user_agent: Optional[str] = None, **kwargs) -> BrowserContext:
- """
- 说明:
- 获取一个新上下文
- 参数:
- :param user_agent: 请求头
- """
- browser = await get_browser()
- if browser:
- return await browser.new_context(user_agent=user_agent, **kwargs)
- logger.info('获取浏览器失败')
- raise BrowserIsNone('获取浏览器失败')
-
- @classmethod
- async def goto(
- cls,
- url: str,
- *,
- timeout: Optional[float] = 100000,
- wait_until: Optional[
- Literal["domcontentloaded", "load", "networkidle"]
- ] = "networkidle",
- referer: str = None,
- **kwargs
- ) -> Optional[Page]:
- """
- 说明:
- goto
- 参数:
- :param url: 网址
- :param timeout: 超时限制
- :param wait_until: 等待类型
- :param referer:
- """
- page = None
- try:
- page = await cls._new_page(**kwargs)
- await page.goto(url, timeout=timeout, wait_until=wait_until, referer=referer)
- return page
- except Exception as e:
- logger.warning(f"Playwright 访问 url:{url} 发生错误 {type(e)}:{e}")
- if page:
- await page.close()
- return None
-
- @classmethod
- async def screenshot(
- cls,
- url: str,
- *,
- element: Optional[Union[str, List[str]]] = None,
- path: Optional[Union[Path, str]] = None,
- wait_time: Optional[int] = None,
- viewport_size: Dict[str, int] = None,
- wait_until: Optional[
- Literal["domcontentloaded", "load", "networkidle"]
- ] = "networkidle",
- timeout: float = None,
- **kwargs
- ) -> Optional[MessageSegment]:
- """
- 说明:
- 截图,该方法仅用于简单快捷截图,复杂截图请操作 page
- 参数:
- :param url: 网址
- :param path: 存储路径
- :param element: 元素选择
- :param wait_time: 等待截取超时时间
- :param viewport_size: 窗口大小
- :param wait_until: 等待类型
- :param timeout: 超时限制
- """
- if not url.startswith(('https://', 'http://')):
- url = f'https://{url}'
- page = None
- if viewport_size is None:
- viewport_size = dict(width=1920, height=1080)
- if path and isinstance(path, str):
- path = Path(path)
- try:
- page = await cls.goto(url, wait_until=wait_until, **kwargs)
- if page is None:
- return MessageSegment.text('截图失败,无法访问网页,请稍候再试')
- await page.set_viewport_size(viewport_size)
- if element:
- if isinstance(element, str):
- if wait_time:
- card = await page.wait_for_selector(element, timeout=wait_time * 1000)
- else:
- card = await page.query_selector(element)
- else:
- card = page
- for e in element:
- if wait_time:
- card = await card.wait_for_selector(e, timeout=wait_time * 1000)
- else:
- card = await card.query_selector(e)
- else:
- card = page
- if path:
- img = await card.screenshot(path=path, timeout=timeout)
- else:
- img = await card.screenshot(timeout=timeout)
- return MessageSegment.image(img)
- except Exception as e:
- logger.warning(f"Playwright 截图 url:{url} element:{element} 发生错误 {type(e)}:{e}")
- return MessageSegment.text(f'截图失败,报错信息:{e}')
- finally:
- if page:
- await page.close()
-
-
-class UrlPathNumberNotEqual(Exception):
- pass
-
-
-class BrowserIsNone(Exception):
- pass
+ except Exception as e:
+ raise e
+ finally:
+ if page:
+ await page.close()
diff --git a/LittlePaimon/utils/message.py b/LittlePaimon/utils/message.py
index a0cdedd..fbdda83 100644
--- a/LittlePaimon/utils/message.py
+++ b/LittlePaimon/utils/message.py
@@ -304,7 +304,7 @@ async def get_uid(event: Optional[MessageEvent] = None, user_id: Optional[str] =
return bind_uid.uid
else:
if event:
- if nickname_uid := re.search(r'[1258]\d{8}', event.sender.card):
+ if nickname_uid := re.search(r'[1258]\d{8}', event.sender.card or event.sender.nickname):
return nickname_uid.group()
else:
if group_id: