尝试绕过geetest

This commit is contained in:
CMHopeSunshine 2022-10-07 00:22:57 +08:00
parent dfcae8abed
commit 890994925d
2 changed files with 63 additions and 11 deletions

View File

@ -1,20 +1,64 @@
import asyncio import asyncio
import datetime import datetime
import json
import random import random
import time import time
from nonebot import get_bot from nonebot import get_bot
from collections import defaultdict from collections import defaultdict
from typing import Tuple from typing import Tuple, Dict, Any, Optional, Union
from LittlePaimon import DRIVER from LittlePaimon import DRIVER
from LittlePaimon.database.models import MihoyoBBSSub, LastQuery from LittlePaimon.database.models import MihoyoBBSSub, LastQuery, PrivateCookie
from LittlePaimon.utils import logger, scheduler from LittlePaimon.utils import logger, scheduler, aiorequests
from LittlePaimon.utils.api import get_mihoyo_private_data, get_sign_reward_list from LittlePaimon.utils.api import get_mihoyo_private_data, get_sign_reward_list, mihoyo_sign_headers, check_retcode
from LittlePaimon.manager.plugin_manager import plugin_manager as pm from LittlePaimon.manager.plugin_manager import plugin_manager as pm
from .draw import SignResult, draw_result from .draw import SignResult, draw_result
SIGN_ACTION_API = 'https://api-takumi.mihoyo.com/event/bbs_sign_reward/sign'
GEETEST_HEADER = {"Accept": "*/*",
"X-Requested-With": "com.mihoyo.hyperion",
"User-Agent": 'Mozilla/5.0 (Linux; Android 12; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) '
'Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 miHoYoBBS/2.35.2',
"Referer": "https://webstatic.mihoyo.com/",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
sign_reward_list: dict = {} sign_reward_list: dict = {}
async def pass_geetest(data: Dict[str, Any]):
url = f'https://api.geetest.com/ajax.php?gt={data["gt"]}&challenge={data["challenge"]}&lang=zh-cn&pt=3&client_type=web_mobile'
resp = await aiorequests.get(url, headers=GEETEST_HEADER)
if resp.status_code == 200:
resp_data = json.loads(resp.text.replace('(', '').replace(')', ''))
if 'success' in resp_data['status'] and 'success' in resp_data['data']['result']:
return resp_data['data']['validate']
return None
async def sign_action(user_id: str, uid: str, validate: Optional[dict] = None, last_data: Optional[dict] = None) -> \
Union[dict, str]:
server_id = 'cn_qd01' if uid[0] == '5' else 'cn_gf01'
cookie_info = await PrivateCookie.get_or_none(user_id=user_id, uid=uid)
if last_data and validate:
extra_headers = {
'x-rpc-challenge': last_data['data']['challenge'],
'x-rpc-validate': validate,
'x-rpc-seccode': f'{validate}|jordan'
}
else:
extra_headers = None
resp = await aiorequests.post(SIGN_ACTION_API, headers=mihoyo_sign_headers(cookie_info.cookie, extra_headers),
json={
'act_id': 'e202009291139501',
'uid': uid,
'region': server_id
})
data = resp.json()
if await check_retcode(data, cookie_info, 'private', user_id, uid):
return data
else:
return f'你的UID{uid}的cookie疑似失效了'
async def mhy_bbs_sign(user_id: str, uid: str) -> Tuple[SignResult, str]: async def mhy_bbs_sign(user_id: str, uid: str) -> Tuple[SignResult, str]:
""" """
执行米游社原神签到返回签到成功天数或失败原因 执行米游社原神签到返回签到成功天数或失败原因
@ -33,8 +77,11 @@ async def mhy_bbs_sign(user_id: str, uid: str) -> Tuple[SignResult, str]:
signed_days = sign_info['data']['total_sign_day'] - 1 signed_days = sign_info['data']['total_sign_day'] - 1
logger.info('米游社原神签到', '', {'用户': user_id, 'UID': uid}, '今天已经签过了', True) logger.info('米游社原神签到', '', {'用户': user_id, 'UID': uid}, '今天已经签过了', True)
return SignResult.DONE, f'UID{uid}今天已经签过了,获得的奖励为\n{sign_reward_list[signed_days]["name"]}*{sign_reward_list[signed_days]["cnt"]}' return SignResult.DONE, f'UID{uid}今天已经签过了,获得的奖励为\n{sign_reward_list[signed_days]["name"]}*{sign_reward_list[signed_days]["cnt"]}'
for i in range(6): validate = None
sign_data = await get_mihoyo_private_data(uid, user_id, 'sign_action') sign_data = None
for i in range(3):
sign_data = await sign_action(user_id, uid, validate, sign_data)
validate = await pass_geetest(sign_data['data'])
if isinstance(sign_data, str): if isinstance(sign_data, str):
logger.info('米游社原神签到', '', {'用户': user_id, 'UID': uid}, f'获取数据失败, {sign_data}', False) logger.info('米游社原神签到', '', {'用户': user_id, 'UID': uid}, f'获取数据失败, {sign_data}', False)
return SignResult.FAIL, f'{uid}签到失败,{sign_data}\n' return SignResult.FAIL, f'{uid}签到失败,{sign_data}\n'
@ -53,12 +100,13 @@ async def mhy_bbs_sign(user_id: str, uid: str) -> Tuple[SignResult, str]:
return SignResult.SUCCESS, f'签到成功,获得的奖励为\n{sign_reward_list[signed_days]["name"]}*{sign_reward_list[signed_days]["cnt"]}' return SignResult.SUCCESS, f'签到成功,获得的奖励为\n{sign_reward_list[signed_days]["name"]}*{sign_reward_list[signed_days]["cnt"]}'
else: else:
logger.info('米游社原神签到', '', {'用户': user_id, 'UID': uid}, f'出现验证码,重试第{i}', False) logger.info('米游社原神签到', '', {'用户': user_id, 'UID': uid}, f'出现验证码,重试第{i}', False)
await asyncio.sleep(random.randint(10, 15)) await asyncio.sleep(random.randint(30, 45))
logger.info('米游社原神签到', '', {'用户': user_id, 'UID': uid}, '尝试6次签到失败无法绕过验证码', False) logger.info('米游社原神签到', '', {'用户': user_id, 'UID': uid}, '尝试6次签到失败无法绕过验证码', False)
return SignResult.FAIL, f'{uid}签到失败,无法绕过验证码' return SignResult.FAIL, f'{uid}签到失败,无法绕过验证码'
@scheduler.scheduled_job('cron', hour=pm.config.auto_sign_hour, minute=pm.config.auto_sign_minute, misfire_grace_time=10) @scheduler.scheduled_job('cron', hour=pm.config.auto_sign_hour, minute=pm.config.auto_sign_minute,
misfire_grace_time=10)
async def _(): async def _():
await bbs_auto_sign() await bbs_auto_sign()
@ -96,7 +144,7 @@ async def bbs_auto_sign():
if result == SignResult.DONE: if result == SignResult.DONE:
await asyncio.sleep(random.randint(3, 8)) await asyncio.sleep(random.randint(3, 8))
else: else:
await asyncio.sleep(random.randint(20, 30)) await asyncio.sleep(random.randint(30, 45))
for group_id, sign_result in sign_result_group.items(): for group_id, sign_result in sign_result_group.items():
# 发送签到结果到群 # 发送签到结果到群

View File

@ -113,13 +113,14 @@ def mihoyo_headers(cookie, q='', b=None) -> dict:
} }
def mihoyo_sign_headers(cookie: str) -> dict: def mihoyo_sign_headers(cookie: str, extra_headers: Optional[dict] = None) -> dict:
""" """
生成米游社签到headers 生成米游社签到headers
:param cookie: cookie :param cookie: cookie
:param extra_headers: 额外的headers参数
:return: headers :return: headers
""" """
return { header = {
'User_Agent': 'Mozilla/5.0 (Linux; Android 12; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) ' 'User_Agent': 'Mozilla/5.0 (Linux; Android 12; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) '
'Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 miHoYoBBS/2.35.2', 'Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 miHoYoBBS/2.35.2',
'Cookie': cookie, 'Cookie': cookie,
@ -132,6 +133,9 @@ def mihoyo_sign_headers(cookie: str) -> dict:
'=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon', '=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon',
'x-rpc-app_version': '2.35.2' 'x-rpc-app_version': '2.35.2'
} }
if extra_headers:
header.update(extra_headers)
return header
async def check_retcode(data: dict, cookie_info, cookie_type: str, user_id: str, uid: str) -> bool: async def check_retcode(data: dict, cookie_info, cookie_type: str, user_id: str, uid: str) -> bool: