mirror of
https://github.com/xuthus83/LittlePaimon.git
synced 2024-12-16 13:40:53 +08:00
✨ 新增米游社帖子链接截图
,消息广播
支持图片,修复从昵称获取uid报错的问题
This commit is contained in:
parent
7f762617ca
commit
dfcae8abed
@ -120,7 +120,7 @@ async def _(event: MessageEvent, state: T_State, msg: Message = CommandArg()):
|
||||
|
||||
|
||||
@broadcast.got('groups', prompt='要广播到哪些群呢?多个群以空格隔开,或发送"全部"向所有群广播')
|
||||
async def _(event: MessageEvent, bot: Bot, msg: str = ArgPlainText('msg'), groups: str = ArgPlainText('groups')):
|
||||
async def _(event: MessageEvent, bot: Bot, msg: Message = Arg('msg'), groups: str = ArgPlainText('groups')):
|
||||
group_list = await bot.get_group_list()
|
||||
group_list = [g['group_id'] for g in group_list]
|
||||
if groups in {'全部', '所有', 'all'}:
|
||||
|
36
LittlePaimon/plugins/Mihoyo_bbs/__init__.py
Normal file
36
LittlePaimon/plugins/Mihoyo_bbs/__init__.py
Normal file
@ -0,0 +1,36 @@
|
||||
from nonebot import on_regex
|
||||
from nonebot.params import RegexMatched
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment
|
||||
from nonebot.plugin import PluginMetadata
|
||||
|
||||
from LittlePaimon.utils import logger
|
||||
from LittlePaimon.utils.brower import screenshot
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name='米游社',
|
||||
description='米游社',
|
||||
usage='',
|
||||
extra={
|
||||
'author': '惜月',
|
||||
'version': '3.0',
|
||||
'priority': 20,
|
||||
}
|
||||
)
|
||||
|
||||
post_screenshot = on_regex(r'(https://)?(m\.)?bbs.mihoyo.com/.+/article/\d+', priority=20, block=False, state={
|
||||
'pm_name': '米游社帖子截图',
|
||||
'pm_description': '(被动技能)自动对消息中的米游社帖子链接内容进行截图发送',
|
||||
'pm_usage': '米游社帖子截图',
|
||||
'pm_priority': 1
|
||||
})
|
||||
|
||||
|
||||
@post_screenshot.handle()
|
||||
async def _(event: MessageEvent, url: str = RegexMatched()):
|
||||
logger.info('米游社', f'开始截图帖子<m>{url}</m>')
|
||||
try:
|
||||
img = await screenshot(url, elements=['.mhy-article-page__main'])
|
||||
except Exception:
|
||||
logger.info('米游社', f'帖子<m>{url}</m>截图失败')
|
||||
return
|
||||
await post_screenshot.finish(MessageSegment.image(img))
|
@ -80,4 +80,4 @@ async def _(event: MessageEvent):
|
||||
result = await draw_team(str(event.user_id))
|
||||
except Exception as e:
|
||||
result = f'制作深渊配队时出错:{e}'
|
||||
await abyss_team.finish(result)
|
||||
await abyss_team.finish(result, at_sender=True)
|
||||
|
@ -1,11 +1,11 @@
|
||||
from nonebot import on_command
|
||||
from nonebot.params import CommandArg
|
||||
from nonebot.rule import Rule
|
||||
from nonebot.adapters.onebot.v11 import Message, MessageEvent
|
||||
from nonebot.adapters.onebot.v11 import Message, MessageEvent, MessageSegment
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from LittlePaimon import SUPERUSERS
|
||||
from LittlePaimon.manager.plugin_manager import plugin_manager as pm
|
||||
from LittlePaimon.utils.brower import AsyncPlaywright
|
||||
from LittlePaimon.utils.brower import screenshot
|
||||
|
||||
|
||||
async def permission_check(event: MessageEvent) -> bool:
|
||||
@ -37,7 +37,10 @@ screenshot_cmd = on_command('网页截图', priority=10, block=True, rule=Rule(p
|
||||
async def _(event: MessageEvent, msg: Message = CommandArg()):
|
||||
await screenshot_cmd.send('正在尝试截图,请稍等...')
|
||||
url = msg.extract_plain_text().strip()
|
||||
img = await AsyncPlaywright.screenshot(url)
|
||||
await screenshot_cmd.send(img)
|
||||
try:
|
||||
img = await screenshot(url)
|
||||
await screenshot_cmd.send(MessageSegment.image(img))
|
||||
except Exception:
|
||||
await screenshot_cmd.send('网页截图失败,无法访问该网页,请稍候再试')
|
||||
|
||||
|
||||
|
@ -1,174 +1,79 @@
|
||||
# https://github.com/HibiKier/zhenxun_bot/blob/main/utils
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Optional, Literal, Union, List, Dict
|
||||
from nonebot import logger
|
||||
from nonebot.adapters.onebot.v11 import MessageSegment
|
||||
from playwright.async_api import Browser, async_playwright, Page, BrowserContext
|
||||
from typing import Optional, Literal, Tuple, Union, List, AsyncGenerator
|
||||
from playwright.async_api import Page, Browser, Playwright, async_playwright
|
||||
from contextlib import asynccontextmanager
|
||||
from LittlePaimon import DRIVER
|
||||
from LittlePaimon.utils import logger
|
||||
|
||||
_playwright: Optional[Playwright] = None
|
||||
_browser: Optional[Browser] = None
|
||||
|
||||
|
||||
async def init(**kwargs) -> Optional[Browser]:
|
||||
def get_brower() -> Browser:
|
||||
assert _browser
|
||||
return _browser
|
||||
|
||||
|
||||
@DRIVER.on_startup
|
||||
async def start_browser():
|
||||
global _playwright
|
||||
global _browser
|
||||
browser = await async_playwright().start()
|
||||
try:
|
||||
_browser = await browser.chromium.launch(**kwargs)
|
||||
return _browser
|
||||
except Exception:
|
||||
await asyncio.get_event_loop().run_in_executor(None, install)
|
||||
_browser = await browser.chromium.launch(**kwargs)
|
||||
return None
|
||||
_playwright = await async_playwright().start()
|
||||
_browser = await _playwright.chromium.launch()
|
||||
except NotImplementedError:
|
||||
logger.warning('Playwright', '初始化失败,请关闭FASTAPI_RELOAD')
|
||||
except Exception as e:
|
||||
logger.warning('Playwright', f'初始化失败,错误信息:{e}')
|
||||
if _browser:
|
||||
await _browser.close()
|
||||
|
||||
|
||||
async def get_browser(**kwargs) -> Browser:
|
||||
return _browser or await init(**kwargs)
|
||||
@DRIVER.on_shutdown
|
||||
async def shutdown_browser():
|
||||
if _browser:
|
||||
await _browser.close()
|
||||
if _playwright:
|
||||
await _playwright.stop()
|
||||
|
||||
|
||||
def install():
|
||||
"""自动安装、更新 Chromium"""
|
||||
logger.info("正在检查 Chromium 更新")
|
||||
import sys
|
||||
from playwright.__main__ import main
|
||||
|
||||
sys.argv = ["", "install", "chromium"]
|
||||
@asynccontextmanager
|
||||
async def get_new_page(**kwargs) -> AsyncGenerator[Page, None]:
|
||||
assert _browser, "playwright尚未初始化"
|
||||
page = await _browser.new_page(**kwargs)
|
||||
try:
|
||||
main()
|
||||
except SystemExit:
|
||||
pass
|
||||
yield page
|
||||
finally:
|
||||
await page.close()
|
||||
|
||||
|
||||
class AsyncPlaywright:
|
||||
@classmethod
|
||||
async def _new_page(cls, user_agent: Optional[str] = None, **kwargs) -> Page:
|
||||
"""
|
||||
说明:
|
||||
获取一个新页面
|
||||
参数:
|
||||
:param user_agent: 请求头
|
||||
"""
|
||||
browser = await get_browser()
|
||||
if browser:
|
||||
return await browser.new_page(user_agent=user_agent, **kwargs)
|
||||
logger.info('获取浏览器失败')
|
||||
raise BrowserIsNone('获取浏览器失败')
|
||||
async def screenshot(url: str,
|
||||
*,
|
||||
elements: Optional[Union[List[str]]] = None,
|
||||
timeout: Optional[float] = 60000,
|
||||
wait_until: Literal["domcontentloaded", "load", "networkidle"] = "networkidle",
|
||||
viewport_size: Tuple[int, int] = (1920, 1080),
|
||||
full_page=True,
|
||||
**kwargs):
|
||||
if not url.startswith(('https://', 'http://')):
|
||||
url = f'https://{url}'
|
||||
viewport_size = {'width': viewport_size[0], 'height': viewport_size[1]}
|
||||
brower = get_brower()
|
||||
page = await brower.new_page(
|
||||
viewport=viewport_size,
|
||||
**kwargs)
|
||||
try:
|
||||
await page.goto(url, wait_until=wait_until, timeout=timeout)
|
||||
assert page
|
||||
if not elements:
|
||||
return await page.screenshot(timeout=timeout, full_page=full_page)
|
||||
for e in elements:
|
||||
card = await page.query_selector(e)
|
||||
assert card
|
||||
clip = await card.bounding_box()
|
||||
return await page.screenshot(clip=clip, timeout=timeout, full_page=full_page, path='test.png')
|
||||
|
||||
@classmethod
|
||||
async def new_context(cls, user_agent: Optional[str] = None, **kwargs) -> BrowserContext:
|
||||
"""
|
||||
说明:
|
||||
获取一个新上下文
|
||||
参数:
|
||||
:param user_agent: 请求头
|
||||
"""
|
||||
browser = await get_browser()
|
||||
if browser:
|
||||
return await browser.new_context(user_agent=user_agent, **kwargs)
|
||||
logger.info('获取浏览器失败')
|
||||
raise BrowserIsNone('获取浏览器失败')
|
||||
|
||||
@classmethod
|
||||
async def goto(
|
||||
cls,
|
||||
url: str,
|
||||
*,
|
||||
timeout: Optional[float] = 100000,
|
||||
wait_until: Optional[
|
||||
Literal["domcontentloaded", "load", "networkidle"]
|
||||
] = "networkidle",
|
||||
referer: str = None,
|
||||
**kwargs
|
||||
) -> Optional[Page]:
|
||||
"""
|
||||
说明:
|
||||
goto
|
||||
参数:
|
||||
:param url: 网址
|
||||
:param timeout: 超时限制
|
||||
:param wait_until: 等待类型
|
||||
:param referer:
|
||||
"""
|
||||
page = None
|
||||
try:
|
||||
page = await cls._new_page(**kwargs)
|
||||
await page.goto(url, timeout=timeout, wait_until=wait_until, referer=referer)
|
||||
return page
|
||||
except Exception as e:
|
||||
logger.warning(f"Playwright 访问 url:{url} 发生错误 {type(e)}:{e}")
|
||||
if page:
|
||||
await page.close()
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
async def screenshot(
|
||||
cls,
|
||||
url: str,
|
||||
*,
|
||||
element: Optional[Union[str, List[str]]] = None,
|
||||
path: Optional[Union[Path, str]] = None,
|
||||
wait_time: Optional[int] = None,
|
||||
viewport_size: Dict[str, int] = None,
|
||||
wait_until: Optional[
|
||||
Literal["domcontentloaded", "load", "networkidle"]
|
||||
] = "networkidle",
|
||||
timeout: float = None,
|
||||
**kwargs
|
||||
) -> Optional[MessageSegment]:
|
||||
"""
|
||||
说明:
|
||||
截图,该方法仅用于简单快捷截图,复杂截图请操作 page
|
||||
参数:
|
||||
:param url: 网址
|
||||
:param path: 存储路径
|
||||
:param element: 元素选择
|
||||
:param wait_time: 等待截取超时时间
|
||||
:param viewport_size: 窗口大小
|
||||
:param wait_until: 等待类型
|
||||
:param timeout: 超时限制
|
||||
"""
|
||||
if not url.startswith(('https://', 'http://')):
|
||||
url = f'https://{url}'
|
||||
page = None
|
||||
if viewport_size is None:
|
||||
viewport_size = dict(width=1920, height=1080)
|
||||
if path and isinstance(path, str):
|
||||
path = Path(path)
|
||||
try:
|
||||
page = await cls.goto(url, wait_until=wait_until, **kwargs)
|
||||
if page is None:
|
||||
return MessageSegment.text('截图失败,无法访问网页,请稍候再试')
|
||||
await page.set_viewport_size(viewport_size)
|
||||
if element:
|
||||
if isinstance(element, str):
|
||||
if wait_time:
|
||||
card = await page.wait_for_selector(element, timeout=wait_time * 1000)
|
||||
else:
|
||||
card = await page.query_selector(element)
|
||||
else:
|
||||
card = page
|
||||
for e in element:
|
||||
if wait_time:
|
||||
card = await card.wait_for_selector(e, timeout=wait_time * 1000)
|
||||
else:
|
||||
card = await card.query_selector(e)
|
||||
else:
|
||||
card = page
|
||||
if path:
|
||||
img = await card.screenshot(path=path, timeout=timeout)
|
||||
else:
|
||||
img = await card.screenshot(timeout=timeout)
|
||||
return MessageSegment.image(img)
|
||||
except Exception as e:
|
||||
logger.warning(f"Playwright 截图 url:{url} element:{element} 发生错误 {type(e)}:{e}")
|
||||
return MessageSegment.text(f'截图失败,报错信息:{e}')
|
||||
finally:
|
||||
if page:
|
||||
await page.close()
|
||||
|
||||
|
||||
class UrlPathNumberNotEqual(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BrowserIsNone(Exception):
|
||||
pass
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
if page:
|
||||
await page.close()
|
||||
|
@ -304,7 +304,7 @@ async def get_uid(event: Optional[MessageEvent] = None, user_id: Optional[str] =
|
||||
return bind_uid.uid
|
||||
else:
|
||||
if event:
|
||||
if nickname_uid := re.search(r'[1258]\d{8}', event.sender.card):
|
||||
if nickname_uid := re.search(r'[1258]\d{8}', event.sender.card or event.sender.nickname):
|
||||
return nickname_uid.group()
|
||||
else:
|
||||
if group_id:
|
||||
|
Loading…
x
Reference in New Issue
Block a user