mirror of
https://github.com/xuthus83/LittlePaimon.git
synced 2024-12-16 13:40:53 +08:00
104 lines
3.8 KiB
Python
104 lines
3.8 KiB
Python
|
import os
|
||
|
import re
|
||
|
from io import BytesIO
|
||
|
from os import path
|
||
|
from typing import Union
|
||
|
|
||
|
import nonebot
|
||
|
from aiocqhttp.event import Event
|
||
|
from aiocqhttp.message import Message
|
||
|
from filetype.filetype import guess_mime
|
||
|
from PIL import Image
|
||
|
from nonebot.message import MessageSegment
|
||
|
|
||
|
|
||
|
from ._util import download_async, get_md5, get_random_file, logger, bot
|
||
|
|
||
|
class Res:
|
||
|
base_dir = path.join(path.dirname(__file__), 'res')
|
||
|
image_dir = path.join(base_dir, 'image')
|
||
|
record_dir = path.join(base_dir, 'record')
|
||
|
img_cache_dir = path.join(image_dir, 'cache')
|
||
|
|
||
|
def check_exist(res_path: str) -> None:
|
||
|
return path.exists(res_path)
|
||
|
|
||
|
@classmethod
|
||
|
def image(cls, pic_path: str) -> 'MessageSegment':
|
||
|
if cls.check_exist(pic_path):
|
||
|
return MessageSegment.image(f'file:///{pic_path}')
|
||
|
elif cls.check_exist(path.join(cls.image_dir, pic_path)):
|
||
|
return MessageSegment.image(f'file:///{path.join(cls.image_dir, pic_path)}')
|
||
|
else:
|
||
|
return '【图片丢了】'
|
||
|
|
||
|
@classmethod
|
||
|
def record(cls, rec_path) -> 'MessageSegment':
|
||
|
if cls.check_exist(rec_path):
|
||
|
return MessageSegment.record(f'file:///{rec_path}')
|
||
|
elif cls.check_exist(path.join(cls.record_dir, rec_path)):
|
||
|
return MessageSegment.record(f'file:///{path.join(cls.record_dir, rec_path)}')
|
||
|
else:
|
||
|
return '【图片丢了】'
|
||
|
|
||
|
@classmethod
|
||
|
def cardimage(cls, pic_path: str) -> 'MessageSegment':
|
||
|
if cls.check_exist(pic_path):
|
||
|
return f"[CQ:cardimage,file=file:///{pic_path}]"
|
||
|
elif cls.check_exist(path.join(cls.image_dir, pic_path)):
|
||
|
return f"[CQ:cardimage,file=file:///{path.join(cls.image_dir, pic_path)}]"
|
||
|
else:
|
||
|
return '【图片丢了】'
|
||
|
|
||
|
@classmethod
|
||
|
def get_random_image(cls, folder=None) -> 'MessageSegment':
|
||
|
if not folder:
|
||
|
image_path = cls.image_dir
|
||
|
else:
|
||
|
image_path = path.join(cls.image_dir, folder)
|
||
|
image_name = get_random_file(image_path)
|
||
|
return cls.image(path.join(image_path, image_name))
|
||
|
|
||
|
@classmethod
|
||
|
def get_random_record(cls, folder=None) -> 'MessageSegment':
|
||
|
if not folder:
|
||
|
record_path = cls.record_dir
|
||
|
else:
|
||
|
record_path = path.join(cls.record_dir, folder)
|
||
|
rec_name = get_random_file(record_path)
|
||
|
return cls.record(path.join(record_path, rec_name))
|
||
|
|
||
|
@classmethod
|
||
|
async def image_from_url(cls, url: str, cache=True) -> 'MessageSegment':
|
||
|
fname = get_md5(url)
|
||
|
image = path.join(cls.img_cache_dir, f'{fname}.jpg')
|
||
|
if not path.exists(image) or not cache:
|
||
|
image = await download_async(url, cls.img_cache_dir, f'{fname}.jpg')
|
||
|
return cls.image(image)
|
||
|
|
||
|
@classmethod
|
||
|
async def cardimage_from_url(cls, url: str, cache=True) -> 'MessageSegment':
|
||
|
fname = get_md5(url)
|
||
|
image = path.join(cls.img_cache_dir, f'{fname}.jpg')
|
||
|
if not path.exists(image) or not cache:
|
||
|
image = await download_async(url, cls.img_cache_dir, f'{fname}.jpg')
|
||
|
return cls.cardimage(image)
|
||
|
|
||
|
@classmethod
|
||
|
def image_from_memory(cls, data: Union[bytes, Image.Image]) -> 'MessageSegment':
|
||
|
if isinstance(data, Image.Image):
|
||
|
out = BytesIO()
|
||
|
data.save(out,format='png')
|
||
|
data = out.getvalue()
|
||
|
if not isinstance(data, bytes):
|
||
|
raise ValueError('不支持的参数类型')
|
||
|
ftype = guess_mime(data)
|
||
|
if not ftype or not ftype.startswith('image'):
|
||
|
raise ValueError('不是有效的图片类型')
|
||
|
fn = get_md5(data)
|
||
|
save_path = path.join(cls.img_cache_dir, fn)
|
||
|
with open(save_path, 'wb') as f:
|
||
|
f.write(data)
|
||
|
return cls.image(path.join(cls.img_cache_dir, fn))
|
||
|
|