CMHopeSunshine fa12023db0 小派蒙
2022-03-13 21:25:42 +08:00

174 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import urllib
import requests
from asyncio import sleep
import time
import os
import hoshino,os
from hoshino import R,MessageSegment,aiorequests,logger,Service
sv = Service('原神抽卡记录导出')
def checkApi(url) -> bool:
if not url:
#print("url为空")
return "url为空"
if "getGachaLog" not in url:
#print("错误的url检查是否包含getGachaLog")
return "错误的url检查是否包含getGachaLog"
try:
r = requests.get(url)
s = r.content.decode("utf-8")
j = json.loads(s)
except Exception as e:
#print("API请求解析出错" + str(e))
return "API请求解析出错" + str(e)
if not j["data"]:
if j["message"] == "authkey valid error":
return "authkey错误"
else:
return "数据为空,错误代码:" + j["message"]
return "OK"
def getApi(url: str, gachaType: str, size: str, page: int, end_id="") -> str:
parsed = urllib.parse.urlparse(url)
querys = urllib.parse.parse_qsl(parsed.query)
param_dict = dict(querys)
param_dict["size"] = size
param_dict["gacha_type"] = gachaType
param_dict["page"] = page
param_dict["lang"] = "zh-cn"
param_dict["end_id"] = end_id
param = urllib.parse.urlencode(param_dict)
path = url.split("?")[0]
api = path + "?" + param
return api
async def getGachaLogs(url: str, gachaTypeId: str, gachaTypeDict: dict) -> list:
size = "20"
# api限制一页最大20
gachaList = []
gachaList_temp = []
end_id = "0"
c = 0
for page in range(1, 9999):
#print(f"正在获取 {gachaTypeDict[gachaTypeId]} 第 {page} 页", flush=True)
api = getApi(url, gachaTypeId, size, page, end_id)
r = requests.get(api)
s = r.content.decode("utf-8")
j = json.loads(s)
gacha = j["data"]["list"]
if not len(gacha):
break
for i in gacha:
gachaList_temp.append(i)
end_id = j["data"]["list"][-1]["id"]
await sleep(0.3)
if gachaList_temp:
gachaList.append({'uid':gachaList_temp[0]['uid']})
gachaList_temp.reverse()
for i in gachaList_temp:
c += 1
if i['rank_type'] == "5":
fivechar = {}
fivechar['name'] = i['name']
fivechar['item_type'] = i['item_type']
fivechar['count'] = c
c = 0
gachaList.append(fivechar)
gachaList.append({'未出货':c})
return gachaList
# def getGachaTypes(url: str) -> list:
# tmp_url = url.replace("getGachaLog", "getConfigList")
# parsed = urllib.parse.urlparse(tmp_url)
# querys = urllib.parse.parse_qsl(parsed.query)
# param_dict = dict(querys)
# param_dict["lang"] = "zh-cn"
# param = urllib.parse.urlencode(param_dict)
# path = tmp_url.split("?")[0]
# tmp_url = path + "?" + param
# r = requests.get(tmp_url)
# s = r.content.decode("utf-8")
# configList = json.loads(s)
# return configList["data"]["gacha_type_list"]
# def mergeDataFunc(localData: dict, gachaData: dict) -> dict:
# gachaTypes = gachaData["gachaType"]
# gachaTypeIds = [banner["key"] for banner in gachaTypes]
# gachaTypeNames = [banner["name"] for banner in gachaTypes]
# gachaTypeDict = dict(zip(gachaTypeIds, gachaTypeNames))
# for banner in gachaTypeDict:
# bannerLocal = localData["gachaLog"][banner]
# bannerGet = gachaData["gachaLog"][banner]
# if bannerGet == bannerLocal:
# pass
# else:
# #print("合并", gachaTypeDict[banner])
# flaglist = [1] * len(bannerGet)
# loc = [[i["time"], i["name"]] for i in bannerLocal]
# for i in range(len(bannerGet)):
# gachaGet = bannerGet[i]
# get = [gachaGet["time"], gachaGet["name"]]
# if get in loc:
# pass
# else:
# flaglist[i] = 0
# print("获取到", len(flaglist), "条记录")
# tempData = []
# for i in range(len(bannerGet)):
# if flaglist[i] == 0:
# gachaGet = bannerGet[i]
# tempData.insert(0, gachaGet)
# print("追加", len(tempData), "条记录")
# for i in tempData:
# localData["gachaLog"][banner].insert(0, i)
# return localData
@sv.on_prefix('原神抽卡记录导出')
async def get_gacha_record(bot,ev):
url = ev.message.extract_plain_text().strip()
spliturl = url.split("?")
spliturl[0] = "https://hk4e-api.mihoyo.com/event/gacha_info/api/getGachaLog"
url = "?".join(spliturl)
checkApimsg = checkApi(url)
if checkApimsg == 'OK':
await bot.send(ev,'获取抽卡记录中,请稍候...',at_sender=True)
gachaTypes = {'200': '常驻祈愿', '301': '角色活动祈愿', '302': '武器活动祈愿'}
gachaData = {}
gachaData["gachaLog"] = {}
for gachaType in gachaTypes.items():
gachaLog = await getGachaLogs(url, gachaType[0], gachaTypes)
gachaData["gachaLog"][gachaType[1]] = gachaLog
uid_flag = 1
for gachaType in gachaData["gachaLog"]:
for log in gachaData["gachaLog"][gachaType]:
if uid_flag and log["uid"]:
gachaData["uid"] = log["uid"]
uid_flag = 0
#gen_path = os.path.dirname(os.path.realpath(sys.argv[0]))
uid = gachaData["uid"]
mergeData = gachaData
msg = f'你的抽卡记录如下\nuid: {uid}\n'
for gachaType in gachaTypes.values():
if gachaType in mergeData['gachaLog']:
msg += gachaType +': '
if len(mergeData['gachaLog'][gachaType]) <= 2:
msg += '没有五星记录'
else:
for role in mergeData['gachaLog'][gachaType]:
if 'name' in role:
msg += f"{role['name']}({role['count']}) "
msg += '\n'
await bot.send(ev,msg,at_sender=True)
else:
await bot.send(ev,checkApimsg,at_sender=True)