278 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import string
import time
from httpx import AsyncClient
from nonebot import logger
# 米游社的API列表
bbs_Cookieurl = 'https://webapi.account.mihoyo.com/Api/cookie_accountinfo_by_loginticket?login_ticket={}'
bbs_Cookieurl2 = 'https://api-takumi.mihoyo.com/auth/api/getMultiTokenByLoginTicket?' \
'login_ticket={}&token_types=3&uid={}'
bbs_Taskslist = 'https://bbs-api.mihoyo.com/apihub/sapi/getUserMissionsState' # 获取任务列表
bbs_Signurl = 'https://bbs-api.mihoyo.com/apihub/sapi/signIn?gids={}' # post
bbs_Listurl = 'https://bbs-api.mihoyo.com/post/api/getForumPostList?' \
'forum_id={}&is_good=false&is_hot=false&page_size=20&sort_type=1'
bbs_Detailurl = 'https://bbs-api.mihoyo.com/post/api/getPostFull?post_id={}'
bbs_Shareurl = 'https://bbs-api.mihoyo.com/apihub/api/getShareConf?entity_id={}&entity_type=1'
bbs_Likeurl = 'https://bbs-api.mihoyo.com/apihub/sapi/upvotePost' # post json
mihoyobbs_List = [{
'id': '1',
'forumId': '1',
'name': '崩坏3',
'url': 'https://bbs.mihoyo.com/bh3/'
}, {
'id': '2',
'forumId': '26',
'name': '原神',
'url': 'https://bbs.mihoyo.com/ys/'
}, {
'id': '3',
'forumId': '30',
'name': '崩坏2',
'url': 'https://bbs.mihoyo.com/bh2/'
}, {
'id': '4',
'forumId': '37',
'name': '未定事件簿',
'url': 'https://bbs.mihoyo.com/wd/'
}, {
'id': '5',
'forumId': '34',
'name': '大别野',
'url': 'https://bbs.mihoyo.com/dby/'
}, {
'id': '6',
'forumId': '52',
'name': '崩坏:星穹铁道',
'url': 'https://bbs.mihoyo.com/sr/'
}]
def random_text(num: int) -> str:
return ''.join(random.sample(string.ascii_lowercase + string.digits, num))
class MihoyoBBSCoin:
def __init__(self, cookies):
self.postsList = None
self.headers = {
'DS': old_version_get_ds_token(True),
'cookie': cookies,
'x-rpc-client_type': '2',
'x-rpc-app_version': '2.7.0',
'x-rpc-sys_version': '6.0.1',
'x-rpc-channel': 'mihoyo',
'x-rpc-device_id': random_hex(32),
'x-rpc-device_name': random_text(random.randint(1, 10)),
'x-rpc-device_model': 'Mi 10',
'Referer': 'https://app.mihoyo.com',
'Host': 'bbs-api.mihoyo.com',
'User-Agent': 'okhttp/4.8.0'
}
self.Task_do = {
'bbs_Sign': False,
'bbs_Read_posts': False,
'bbs_Read_posts_num': 3,
'bbs_Like_posts': False,
'bbs_Like_posts_num': 5,
'bbs_Share': False
}
self.mihoyobbs_List_Use = []
self.Today_getcoins = 0
self.Today_have_getcoins = 0 # 这个变量以后可能会用上,先留着了
self.Have_coins = 0
self.is_Right = True # 判断stoken是否失效
# 开始运行
async def task_run(self):
await self.load_mihoyo_bbs_list_use()
start = await self.get_tasks_list()
if self.is_Right is False:
return start
self.postsList = await self.get_list()
sign = await self.signing()
read = await self.read_posts()
like = await self.like_posts()
share = await self.share_post()
im = start + '\n' + sign + '\n' + read + '\n' + like + '\n' + share
return im
async def load_mihoyo_bbs_list_use(self):
for i in [2, 5]:
for k in mihoyobbs_List:
if i == int(k['id']):
self.mihoyobbs_List_Use.append(k)
# 获取任务列表,用来判断做了哪些任务
async def get_tasks_list(self):
# logger.info('正在获取任务列表')
async with AsyncClient() as client:
req = await client.get(url=bbs_Taskslist, headers=self.headers)
data = req.json()
if 'err' in data['message'] or data['retcode'] == -100:
self.is_Right = False
# logger.info('stoken或cookie失效')
return '你的Cookies或Stoken已失效。'
# log.error('获取任务列表失败你的cookie可能已过期请重新设置cookie。')
else:
self.Today_getcoins = data['data']['can_get_points']
self.Today_have_getcoins = data['data']['already_received_points']
self.Have_coins = data['data']['total_points']
# 如果当日可获取米游币数量为0直接判断全部任务都完成了
if self.Today_getcoins == 0:
self.Task_do['bbs_Sign'] = True
self.Task_do['bbs_Read_posts'] = True
self.Task_do['bbs_Like_posts'] = True
self.Task_do['bbs_Share'] = True
else:
# 如果第0个大于或等于62则直接判定任务没做
if data['data']['states'][0]['mission_id'] >= 62:
# logger.info(f'新的一天,今天可以获得{self.Today_getcoins}个米游币')
pass
else:
# logger.info(f'似乎还有任务没完成,今天还能获得{self.Today_getcoins}')
for i in data['data']['states']:
# 58是讨论区签到
if i['mission_id'] == 58:
if i['is_get_award']:
self.Task_do['bbs_Sign'] = True
# 59是看帖子
elif i['mission_id'] == 59:
if i['is_get_award']:
self.Task_do['bbs_Read_posts'] = True
else:
self.Task_do['bbs_Read_posts_num'] -= i['happened_times']
# 60是给帖子点赞
elif i['mission_id'] == 60:
if i['is_get_award']:
self.Task_do['bbs_Like_posts'] = True
else:
self.Task_do['bbs_Like_posts_num'] -= i['happened_times']
# 61是分享帖子
elif i['mission_id'] == 61:
if i['is_get_award']:
self.Task_do['bbs_Share'] = True
# 分享帖子,是最后一个任务,到这里了下面都是一次性任务,直接跳出循环
break
return '今日完成情况!'
# 获取要帖子列表
async def get_list(self) -> list:
temp_list = []
# logger.info('正在获取帖子列表......')
async with AsyncClient() as client:
req = await client.get(url=bbs_Listurl.format(self.mihoyobbs_List_Use[0]['forumId']), headers=self.headers)
data = req.json()
for n in range(5):
temp_list.append([data['data']['list'][n]['post']['post_id'], data['data']['list'][n]['post']['subject']])
# logger.info('已获取{}个帖子'.format(len(temp_list)))
return temp_list
# 进行签到操作
async def signing(self):
if self.Task_do['bbs_Sign']:
# logger.info("讨论区任务已经完成过了~")
return '讨论区任务已经完成过了~'
else:
for i in self.mihoyobbs_List_Use:
async with AsyncClient() as client:
req = await client.post(url=bbs_Signurl.format(i['id']), data={}, headers=self.headers)
data = req.json()
if 'err' not in data['message']:
time.sleep(random.randint(2, 8))
else:
# logger.info("cookie或stoken已失效")
return '你的Cookies已失效。'
return 'SignM:完成!'
# 看帖子
async def read_posts(self):
if self.Task_do['bbs_Read_posts']:
# logger.info("看帖任务已经完成过了~'")
return '看帖任务已经完成过了~'
else:
num_ok = 0
for i in range(self.Task_do['bbs_Read_posts_num']):
async with AsyncClient() as client:
req = await client.get(url=bbs_Detailurl.format(self.postsList[i][0]), headers=self.headers)
data = req.json()
if data['message'] == 'OK':
num_ok += 1
time.sleep(random.randint(2, 8))
# logger.info('ReadM:成功!Read:{}!'.format(str(num_ok)))
return 'ReadM:成功!Read:{}!'.format(str(num_ok))
# 点赞
async def like_posts(self):
if self.Task_do['bbs_Like_posts']:
# logger.info('Like任务已经完成过了~')
return 'Like任务已经完成过了~'
else:
num_ok = 0
num_cancel = 0
for i in range(self.Task_do['bbs_Like_posts_num']):
async with AsyncClient() as client:
req = await client.post(url=bbs_Likeurl, headers=self.headers,
json={'post_id': self.postsList[i][0], 'is_cancel': False})
data = req.json()
if data['message'] == 'OK':
num_ok += 1
# 判断取消点赞是否打开
if True:
time.sleep(random.randint(2, 8))
async with AsyncClient() as client:
req = await client.post(url=bbs_Likeurl, headers=self.headers,
json={'post_id': self.postsList[i][0], 'is_cancel': True})
data = req.json()
if data['message'] == 'OK':
num_cancel += 1
time.sleep(random.randint(2, 8))
# logger.info('LikeM:完成!like:{}dislike:{}!'.format(str(num_ok), str(num_cancel)))
return 'LikeM:完成!like:{}dislike:{}!'.format(str(num_ok), str(num_cancel))
# 分享操作
async def share_post(self):
if self.Task_do['bbs_Share']:
# logger.info('分享任务已经完成过了~')
return '分享任务已经完成过了~'
else:
for _ in range(3):
async with AsyncClient() as client:
req = await client.get(url=bbs_Shareurl.format(self.postsList[0][0]), headers=self.headers)
data = req.json()
if data['message'] == 'OK':
return 'ShareM:完成!'
else:
time.sleep(random.randint(2, 8))
time.sleep(random.randint(2, 8))
# add function
import hashlib
def md5(text):
md5_func = hashlib.md5()
md5_func.update(text.encode())
return md5_func.hexdigest()
def old_version_get_ds_token(mysbbs=False):
if mysbbs:
n = 'fd3ykrh7o1j54g581upo1tvpam0dsgtf'
else:
n = 'h8w582wxwgqvahcdkpvdhbh2w9casgfl'
i = str(int(time.time()))
r = ''.join(random.sample(string.ascii_lowercase + string.digits, 6))
c = md5('salt=' + n + '&t=' + i + '&r=' + r)
return i + ',' + r + ',' + c
def random_hex(length):
result = hex(random.randint(0, 16 ** length)).replace('0x', '').upper()
if len(result) < length:
result = '0' * (length - len(result)) + result
return result