104 lines
3.8 KiB
Python
Raw Normal View History

2022-03-13 21:25:42 +08:00
import os
import re
from io import BytesIO
from os import path
from typing import Union
import nonebot
from aiocqhttp.event import Event
from aiocqhttp.message import Message
from filetype.filetype import guess_mime
from PIL import Image
from nonebot.message import MessageSegment
from ._util import download_async, get_md5, get_random_file, logger, bot
class Res:
base_dir = path.join(path.dirname(__file__), 'res')
image_dir = path.join(base_dir, 'image')
record_dir = path.join(base_dir, 'record')
img_cache_dir = path.join(image_dir, 'cache')
def check_exist(res_path: str) -> None:
return path.exists(res_path)
@classmethod
def image(cls, pic_path: str) -> 'MessageSegment':
if cls.check_exist(pic_path):
return MessageSegment.image(f'file:///{pic_path}')
elif cls.check_exist(path.join(cls.image_dir, pic_path)):
return MessageSegment.image(f'file:///{path.join(cls.image_dir, pic_path)}')
else:
return '【图片丢了】'
@classmethod
def record(cls, rec_path) -> 'MessageSegment':
if cls.check_exist(rec_path):
return MessageSegment.record(f'file:///{rec_path}')
elif cls.check_exist(path.join(cls.record_dir, rec_path)):
return MessageSegment.record(f'file:///{path.join(cls.record_dir, rec_path)}')
else:
return '【图片丢了】'
@classmethod
def cardimage(cls, pic_path: str) -> 'MessageSegment':
if cls.check_exist(pic_path):
return f"[CQ:cardimage,file=file:///{pic_path}]"
elif cls.check_exist(path.join(cls.image_dir, pic_path)):
return f"[CQ:cardimage,file=file:///{path.join(cls.image_dir, pic_path)}]"
else:
return '【图片丢了】'
@classmethod
def get_random_image(cls, folder=None) -> 'MessageSegment':
if not folder:
image_path = cls.image_dir
else:
image_path = path.join(cls.image_dir, folder)
image_name = get_random_file(image_path)
return cls.image(path.join(image_path, image_name))
@classmethod
def get_random_record(cls, folder=None) -> 'MessageSegment':
if not folder:
record_path = cls.record_dir
else:
record_path = path.join(cls.record_dir, folder)
rec_name = get_random_file(record_path)
return cls.record(path.join(record_path, rec_name))
@classmethod
async def image_from_url(cls, url: str, cache=True) -> 'MessageSegment':
fname = get_md5(url)
image = path.join(cls.img_cache_dir, f'{fname}.jpg')
if not path.exists(image) or not cache:
image = await download_async(url, cls.img_cache_dir, f'{fname}.jpg')
return cls.image(image)
@classmethod
async def cardimage_from_url(cls, url: str, cache=True) -> 'MessageSegment':
fname = get_md5(url)
image = path.join(cls.img_cache_dir, f'{fname}.jpg')
if not path.exists(image) or not cache:
image = await download_async(url, cls.img_cache_dir, f'{fname}.jpg')
return cls.cardimage(image)
@classmethod
def image_from_memory(cls, data: Union[bytes, Image.Image]) -> 'MessageSegment':
if isinstance(data, Image.Image):
out = BytesIO()
data.save(out,format='png')
data = out.getvalue()
if not isinstance(data, bytes):
raise ValueError('不支持的参数类型')
ftype = guess_mime(data)
if not ftype or not ftype.startswith('image'):
raise ValueError('不是有效的图片类型')
fn = get_md5(data)
save_path = path.join(cls.img_cache_dir, fn)
with open(save_path, 'wb') as f:
f.write(data)
return cls.image(path.join(cls.img_cache_dir, fn))