🐛 修复扫码绑定及原神日历的一些错误 (#511)

* 🔧 自动更新依赖文件

* 调整playwright默认安装的浏览器内核

新增安装playwright时需要下载的浏览器内核,默认为firefox

* 添加有关playwright的说明

* 修复文件错名及优化配置项

* 使用配置模型及修复部分错误导致的显示异常

1. 使用小派蒙配置模型
2. 修复图片生成时丢失html样式

* 修复扫码绑定及原神日历的一些错误

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
nicklly 2024-08-05 09:04:34 +08:00 committed by GitHub
parent d9c1698062
commit 5ff7d59641
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 201 additions and 109 deletions

View File

@ -1,3 +1,5 @@
from typing import Optional
from nonebot import get_bot, on_command
from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment
from nonebot.plugin import PluginMetadata
@ -5,7 +7,7 @@ from nonebot.plugin import PluginMetadata
from LittlePaimon.database import GeneralSub
from LittlePaimon.utils import scheduler, logger, DRIVER, SUPERUSERS
from LittlePaimon.utils.message import CommandObjectID, CommandSwitch, CommandTime
from .generate import *
from .draw_calendar import *
__plugin_meta__ = PluginMetadata(
name="原神日历",
@ -28,7 +30,7 @@ calendar = on_command('原神日历', aliases={'原神日程', '活动日历'},
@calendar.handle()
async def _(event: MessageEvent, sub_id=CommandObjectID(), switch=CommandSwitch(), sub_time=CommandTime()):
if switch is None:
im = await generate_day_schedule('cn')
im = await generate_day_schedule('cn', viewport={"width": 600, "height": 10})
await calendar.finish(MessageSegment.image(im))
else:
if event.sender.role not in ['admin', 'owner'] and event.user_id not in SUPERUSERS:
@ -74,7 +76,7 @@ async def send_calendar(sub_id: int, sub_type: str, extra_id: Optional[int]):
else:
api = 'send_group_msg'
data = {'group_id': sub_id}
im = await generate_day_schedule('cn')
im = await generate_day_schedule('cn', viewport={"width": 600, "height": 10})
data['message'] = MessageSegment.image(im)
await get_bot().call_api(api, **data)
logger.info('原神日历', '', {sub_type: sub_id}, '推送成功', True)

View File

@ -1,14 +1,16 @@
import os
import asyncio
import math
import functools
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from dateutil.relativedelta import relativedelta
from LittlePaimon.utils.requests import aiorequests
res = Path(__file__).parent / 'template'
# type 0 普通常驻任务深渊 1 新闻 2 蛋池 3 限时活动H5
event_data = {
'cn': [],
@ -30,30 +32,65 @@ ignored_key_words = [
"防沉迷",
"问卷",
"公平运营",
"纪行",
"有奖活动",
"反馈功能"
"反馈功能",
"常用FAQ",
'七圣召唤',
'官方社区'
]
ignored_ann_ids = [
423, # 《原神》玩家社区一览
495, # 有奖问卷调查开启!
762, # 《原神》公平运营声明
1263, # 米游社《原神》专属工具一览
2388, # 原神问卷调查
2522, # 《原神》防沉迷系统说明
2911, # 功能反馈
423, # 《原神》玩家社区一览
422, # 《原神》防沉迷系统说明
762, # 《原神》公平运营声明
]
list_api = 'https://hk4e-api.mihoyo.com/common/hk4e_cn/announcement/api/getAnnList?game=hk4e&game_biz=hk4e_cn&lang=zh-cn&bundle_id=hk4e_cn&platform=pc&region=cn_gf01&level=55&uid=100000000'
detail_api = 'https://hk4e-api.mihoyo.com/common/hk4e_cn/announcement/api/getAnnContent?game=hk4e&game_biz=hk4e_cn&lang=zh-cn&bundle_id=hk4e_cn&platform=pc&region=cn_gf01&level=55&uid=100000000'
async def query_data(url) -> Optional[dict]:
def cache(ttl=timedelta(hours=1), arg_key=None):
def wrap(func):
cache_data = {}
@functools.wraps(func)
async def wrapped(*args, **kw):
nonlocal cache_data
default_data = {"time": None, "value": None}
ins_key = 'default'
if arg_key:
ins_key = arg_key + str(kw.get(arg_key, ''))
data = cache_data.get(ins_key, default_data)
else:
data = cache_data.get(ins_key, default_data)
now = datetime.now()
if not data['time'] or now - data['time'] > ttl:
try:
data['value'] = await func(*args, **kw)
data['time'] = now
cache_data[ins_key] = data
except Exception as e:
raise e
return data['value']
return wrapped
return wrap
@cache(ttl=timedelta(hours=3), arg_key='url')
async def query_data(url):
try:
req = await aiorequests.get(url)
return req.json()
except Exception:
return None
except:
pass
return None
async def load_event_cn():
@ -62,41 +99,58 @@ async def load_event_cn():
if result and 'retcode' in result and result['retcode'] == 0 and detail_result and 'retcode' in detail_result and \
detail_result['retcode'] == 0:
event_data['cn'] = []
event_detail = {detail['ann_id']: detail for detail in detail_result['data']['list']}
event_detail = {}
for detail in detail_result['data']['list']:
event_detail[detail['ann_id']] = detail
datalist = result['data']['list']
for data in datalist:
for item in data['list']:
# 1 活动公告 2 游戏公告
if item['type'] == 2:
ignore = any(ann_id == item["ann_id"] for ann_id in ignored_ann_ids)
ignore = False
for ann_id in ignored_ann_ids:
if ann_id == item["ann_id"]:
ignore = True
break
if ignore:
continue
for keyword in ignored_key_words:
if keyword in item['title']:
ignore = True
break
if ignore:
continue
start_time = datetime.strptime(item['start_time'], "%Y-%m-%d %H:%M:%S")
end_time = datetime.strptime(item['end_time'], "%Y-%m-%d %H:%M:%S")
start_time = datetime.strptime(item['start_time'], r"%Y-%m-%d %H:%M:%S")
end_time = datetime.strptime(item['end_time'], r"%Y-%m-%d %H:%M:%S")
# 从正文中查找开始时间
if event_detail[item["ann_id"]]:
content = event_detail[item["ann_id"]]['content']
searchObj = re.search('(\d+)\/(\d+)\/(\d+)\s(\d+):(\d+):(\d+)', content, re.M | re.I)
searchObj = re.search(
r'(\d+)\/(\d+)\/(\d+)\s(\d+):(\d+):(\d+)', content, re.M | re.I)
try:
datelist = searchObj.groups()
datelist = searchObj.groups() # ('2021', '9', '17')
if datelist and len(datelist) >= 6:
ctime = datetime.strptime(
f'{datelist[0]}-{datelist[1]}-{datelist[2]} {datelist[3]}:{datelist[4]}:{datelist[5]}',
"%Y-%m-%d %H:%M:%S")
r"%Y-%m-%d %H:%M:%S")
if start_time < ctime < end_time:
start_time = ctime
except Exception as e:
pass
event = {'title': item['title'], 'start': start_time, 'end': end_time, 'forever': False, 'type': 0,
'banner': item['banner'], 'color': '#2196f3'}
event = {
'title': item['title'],
'start': start_time,
'end': end_time,
'forever': False,
'type': 0,
'banner': item['banner'],
'color': '#2196f3'
}
if '任务' in item['title']:
event['forever'] = True
event['color'] = '#f764ad'
@ -112,25 +166,38 @@ async def load_event_cn():
event['banner'] = item['banner']
event['color'] = '#580dda'
event_data['cn'].append(event)
for i in range(2):
curmon = datetime.now() + relativedelta(months=i)
nextmon = curmon + relativedelta(months=1)
event_data['cn'].append({'title': '「深境螺旋」· 上半段',
'start': datetime.strptime(curmon.strftime("%Y/%m/01 04:00"), "%Y/%m/%d %H:%M"),
'end': datetime.strptime(curmon.strftime("%Y/%m/16 03:59"), "%Y/%m/%d %H:%M"),
'forever': False, 'type': 3, 'color': '#580dda', 'banner': res / 'sy.jpg'})
event_data['cn'].append({'title': '「深境螺旋」· 下半段 ',
'start': datetime.strptime(curmon.strftime("%Y/%m/16 04:00"), "%Y/%m/%d %H:%M"),
'end': datetime.strptime(nextmon.strftime("%Y/%m/01 03:59"), "%Y/%m/%d %H:%M"),
'forever': False, 'type': 3, 'color': '#580dda', 'banner': res / 'sy.jpg'})
# 深渊提醒
i = 0
while i < 2:
current_month = datetime.today() + relativedelta(months=i)
next_month = current_month + relativedelta(months=1)
event_data['cn'].append({
'title': '「深境螺旋」',
'start': datetime.strptime(current_month.strftime("%Y/%m/16 04:00"), r"%Y/%m/%d %H:%M"),
'end': datetime.strptime(next_month.strftime("%Y/%m/16 03:59"), r"%Y/%m/%d %H:%M"),
'forever': False,
'type': 3,
'color': '#580dda',
'banner': res / 'sy.jpg'
})
event_data['cn'].append({
'title': '「幻想真境剧诗」',
'start': datetime.strptime(current_month.strftime("%Y/%m/1 04:00"), r"%Y/%m/%d %H:%M"),
'end': datetime.strptime(next_month.strftime("%Y/%m/1 03:59"), r"%Y/%m/%d %H:%M"),
'forever': False,
'type': 3,
'color': '#580dda',
'banner': res / 'sy.jpg'
})
i = i + 1
return 0
return 1
async def load_event(server):
return await load_event_cn() if server == 'cn' else 1
if server == 'cn':
return await load_event_cn()
return 1
def get_pcr_now(offset):
@ -148,31 +215,37 @@ async def get_events(server, offset, days):
pcr_now = datetime.now()
if pcr_now.hour < 4:
pcr_now -= timedelta(days=1)
pcr_now = pcr_now.replace(hour=18, minute=0, second=0, microsecond=0)
pcr_now = pcr_now.replace(
hour=18, minute=0, second=0, microsecond=0) # 用晚6点做基准
await lock[server].acquire()
try:
t = pcr_now.strftime('%y%m%d')
if event_updated[server] != t and await load_event(server) == 0:
event_updated[server] = t
if event_updated[server] != t:
if await load_event(server) == 0:
event_updated[server] = t
finally:
lock[server].release()
start = pcr_now + timedelta(days=offset)
end = start + timedelta(days=days)
end -= timedelta(hours=18)
for event in event_data[server]:
if end > event['start'] and start < event['end']:
event['start_days'] = math.ceil((event['start'] - start) / timedelta(days=1))
event['left_days'] = math.floor((event['end'] - start) / timedelta(days=1))
events.append(event)
events.sort(key=lambda item: item["type"] * 100 - item['left_days'], reverse=True)
end -= timedelta(hours=18) # 晚上12点结束
for event in event_data[server]:
if end > event['start'] and start < event['end']: # 在指定时间段内 已开始 且 未结束
event['start_days'] = math.ceil((event['start'] - start) / timedelta(days=1)) # 还有几天开始
event['left_days'] = math.floor((event['end'] - start) / timedelta(days=1)) # 还有几天结束
events.append(event)
# 按type从大到小 按剩余天数从小到大
events.sort(key=lambda item: item["type"] * 100 - item['left_days'], reverse=True)
return events
if __name__ == '__main__':
async def main():
await load_event_cn()
print(await get_events('cn', 0, 15))
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())

View File

@ -0,0 +1,70 @@
import jinja2
from LittlePaimon.utils.browser import get_new_page
from .data_source import *
from datetime import datetime, timedelta
body = []
weeks = []
weekList = ['', '', '', '', '', '', '']
template_path = str(Path(__file__).parent / 'template')
template_name = "calendar.html"
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_path), enable_async=True)
async def generate_day_schedule(server='cn', **kwargs):
events = await get_events(server, 0, 15)
has_prediction = False
""" 追加数据前先执行清除,以防数据叠加 """
body.clear()
weeks.clear()
t = datetime.now()
for i in range(7):
d2 = (t + timedelta(days=i)).strftime("%Y-%m-%d")
""" 分割 [年|月|日]"""
date_full = str(d2).split("-")
current = 'm-events-calendar__table-header-current' if t.strftime("%d") == date_full[2] else ""
date = re.search(r'0\d+', date_full[1]).group(0).replace('0', '') if re.search(r'0\d+', date_full[1]) else \
date_full[1]
week = datetime(int(date_full[0]), int(date_full[1]), int(date_full[2])).isoweekday()
weeks.append({
'week': f'星期{weekList[week - 1]}',
'date': f'{date}.{date_full[2]}',
'current': current
})
for event in events:
if event['start_days'] > 0:
has_prediction = True
template = env.get_template('calendar.html')
for event in events:
if event['start_days'] <= 0:
time = '即将结束' if event["left_days"] == 0 else f'{str(event["left_days"])}天后结束'
body.append({
'title': event['title'],
'time': time,
'online': f'{datetime.strftime(event["start"], r"%m-%d")} ~ {datetime.strftime(event["end"], r"%m-%d")}',
'color': event['color'],
'banner': event['banner']
})
if has_prediction:
for event in events:
if event['start_days'] > 0:
time = '即将开始' if event["start_days"] == 0 else f'{str(event["start_days"])}天后开始'
body.append({
'title': event['title'],
'time': time,
'online': f'{datetime.strftime(event["start"], r"%m-%d")} ~ {datetime.strftime(event["end"], r"%m-%d")}',
'color': event['color'],
'banner': event['banner']
})
content = await template.render_async(body=body, css_path=template_path, week=weeks)
async with get_new_page(**kwargs) as page:
await page.goto('file://' + template_path + "/" + template_name)
await page.set_content(content, wait_until='networkidle')
await page.wait_for_timeout(0)
pic = await page.screenshot(full_page=True)
return pic

View File

@ -1,55 +0,0 @@
import jinja2
from .event import *
from datetime import datetime, timedelta
from LittlePaimon.utils.browser import get_new_page
body = []
weeks = []
weekList = ['', '', '', '', '', '', '']
template_path = Path(__file__).parent / 'template'
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_path), enable_async=True)
async def generate_day_schedule(server='cn'):
events = await get_events(server, 0, 15)
body.clear()
weeks.clear()
t = datetime.now()
for i in range(7):
d2 = (t + timedelta(days=i)).strftime("%Y-%m-%d")
""" 分割 [年|月|日]"""
date_full = str(d2).split("-")
current = 'm-events-calendar__table-header-current' if t.strftime("%d") == date_full[2] else ''
date = re.search('0\d+', date_full[1]).group(0).replace('0', '') if re.search('0\d+', date_full[1]) else \
date_full[1]
week = datetime(int(date_full[0]), int(date_full[1]), int(date_full[2])).isoweekday()
weeks.append({'week': f'星期{weekList[week - 1]}', 'date': f'{date}.{date_full[2]}', 'current': current})
has_prediction = any(event['start_days'] > 0 for event in events)
template = env.get_template('calendar.html')
for event in events:
if event['start_days'] <= 0:
time = '即将结束' if event["left_days"] == 0 else f'{str(event["left_days"])}天后结束'
body.append({'title': event['title'], 'time': time,
'online': f'{datetime.strftime(event["start"], "%m-%d")} ~ {datetime.strftime(event["end"], "%m-%d")}',
'color': event['color'], 'banner': event['banner']})
if has_prediction:
for event in events:
if event['start_days'] > 0:
time = '即将开始' if event["start_days"] == 0 else f'{str(event["start_days"])}天后开始'
body.append({'title': event['title'], 'time': time,
'online': f'{datetime.strftime(event["start"], "%m-%d")} ~ {datetime.strftime(event["end"], "%m-%d")}',
'color': event['color'], 'banner': event['banner']})
content = await template.render_async(body=body, week=weeks)
async with get_new_page(viewport={'width': 600, 'height': 10}) as page:
await page.goto(f'file://{template_path}/calendar.html')
await page.set_content(content, wait_until='networkidle')
await page.wait_for_timeout(0.2)
return await page.screenshot(full_page=True)

View File

@ -52,7 +52,6 @@
{% endfor%}
</div>
</div>
<div class="text">All Rights Reserved.@NepPure<br>Create By @nicklly for LittlePaimon</div>
</div>
</div>

View File

@ -84,7 +84,7 @@ def generate_qrcode(url):
async def create_login_data():
device_id = ''.join(random.choices((ascii_letters + digits), k=64))
app_id = '4'
app_id = '1'
data = {'app_id': app_id,
'device': device_id}
res = await aiorequests.post('https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/fetch?',
@ -102,6 +102,9 @@ async def check_login(login_data: dict):
'ticket': login_data['ticket'],
'device': login_data['device']}
res = await aiorequests.post('https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/query?',
headers={
'x-rpc-device_id': login_data['device']
},
json=data)
return res.json()
@ -151,7 +154,7 @@ async def check_qrcode():
send_msg = None
status_data = await check_login(data)
if status_data['retcode'] != 0:
send_msg = '绑定二维码已过期,请重新发送扫码绑定指令'
send_msg = status_data['message']
running_login_data.pop(user_id)
elif status_data['data']['stat'] == 'Confirmed':
game_token = json.loads(status_data['data']['payload']['raw'])