Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaoborui committed Mar 12, 2024
1 parent 8183ce9 commit 196b853
Showing 1 changed file with 79 additions and 64 deletions.
143 changes: 79 additions & 64 deletions wzxy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from datetime import datetime
import requests
import json
import re
import traceback
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from base64 import b64encode
import logging
import toml
from croniter import croniter
import time
import os
from dateutil import tz

# Create a logger
logger = logging.getLogger()
Expand All @@ -28,44 +32,44 @@


class User:
def __init__(self, username, password, schoolId):
def __init__(self, username, password, school_id):
self.username = username
self.password = password
self.schoolId = schoolId
self.signData = []
self.school_id = school_id
self.sign_data = []
self.cookie = None

# Try to read JWSESSION from local cache and test login status
self.testCachedSession()
self.test_cached_session()

def encrypt(self, text):
key = (str(self.username) + "0000000000000000")[:16]
cipher = AES.new(key.encode('utf-8'), AES.MODE_ECB)
paddedText = pad(text.encode('utf-8'), AES.block_size)
encryptedText = cipher.encrypt(paddedText)
return b64encode(encryptedText).decode('utf-8')
padded_text = pad(text.encode('utf-8'), AES.block_size)
encrypted_text = cipher.encrypt(padded_text)
return b64encode(encrypted_text).decode('utf-8')

def login(self):
encryptedText = self.encrypt(self.password)
loginUrl = 'https://gw.wozaixiaoyuan.com/basicinfo/mobile/login/username'
encrypted_text = self.encrypt(self.password)
login_url = 'https://gw.wozaixiaoyuan.com/basicinfo/mobile/login/username'
params = {
"schoolId": self.schoolId,
"schoolId": self.school_id,
"username": self.username,
"password": encryptedText
"password": encrypted_text
}
loginReq = requests.post(loginUrl, params=params)
text = json.loads(loginReq.text)
login_req = requests.post(login_url, params=params)
text = json.loads(login_req.text)
if text['code'] == 0:
setCookie = loginReq.headers['Set-Cookie']
jws = re.search(r'JWSESSION=.*?;', str(setCookie)).group(0)
set_cookie = login_req.headers['Set-Cookie']
jws = re.search(r'JWSESSION=.*?;', str(set_cookie)).group(0)
self.cookie = jws
self.writeJwsToCache({self.username: jws}) # Write obtained JWSESSION to local cache
self.write_jws_to_cache({self.username: jws}) # Write obtained JWSESSION to local cache
return True
else:
logging.error(f"{self.username} login error, please check account password!")
return False

def testLoginStatus(self):
def test_login_status(self):
if self.cookie:
headers = {'Host': "gw.wozaixiaoyuan.com", 'Cookie': self.cookie}
url = "https://gw.wozaixiaoyuan.com/health/mobile/health/getBatch"
Expand All @@ -81,33 +85,33 @@ def testLoginStatus(self):
logging.error("Please log in first!")
return False

def writeJwsToCache(self, jwsDict):
def write_jws_to_cache(self, jws_dict):
try:
with open("users_jws.json", 'r') as file:
data = json.load(file)
except FileNotFoundError:
data = {}

data.update(jwsDict)
data.update(jws_dict)

with open("users_jws.json", 'w') as file:
json.dump(data, file)

def readJwsFromCache(self):
def read_jws_from_cache(self):
try:
with open("users_jws.json", 'r') as file:
jwsDict = json.load(file)
return jwsDict
jws_dict = json.load(file)
return jws_dict
except FileNotFoundError:
return {}

def testCachedSession(self):
cachedJws = self.readJwsFromCache()
if cachedJws:
self.cookie = cachedJws.get(self.username)
def test_cached_session(self):
cached_jws = self.read_jws_from_cache()
if cached_jws:
self.cookie = cached_jws.get(self.username)
if self.cookie:
# Test login status
if self.testLoginStatus():
if self.test_login_status():
return
else:
logging.error("JWSESSION in local cache expired, re-login...")
Expand All @@ -130,7 +134,7 @@ def testCachedSession(self):
else:
logging.error("Login failed, please check account password!")

def getSignList(self):
def get_sign_list(self):
if not self.cookie:
logging.error("Please log in first!")
return
Expand All @@ -143,31 +147,31 @@ def getSignList(self):
if 'code' in data and data['code'] == 0 and 'data' in data:
for item in data['data']:
if item.get('type') == 0 and item.get('signStatus') == 1:
signInfo = {'signId': item.get('signId'), 'id': item.get('id'),'userArea':item.get('userArea'),'areaList':item.get('areaList')}
self.signData.append(signInfo)
sign_info = {'signId': item.get('signId'), 'id': item.get('id'),'userArea':item.get('userArea'),'areaList':item.get('areaList')}
self.sign_data.append(sign_info)
logging.info("Sign-in list retrieved successfully!")
else:
logging.error("Failed to retrieve sign-in list!")

def generateCheckInRequest(self, areaList, userArea):
checkInData = {}
areaJsonData = self.findAreaJson(areaList, userArea)
if areaJsonData:
areaJson, latitude, longitude = areaJsonData
checkInData = {
def generate_check_in_request(self, area_list, user_area):
check_in_data = {}
area_json_data = self.find_area_json(area_list, user_area)
if area_json_data:
area_json, latitude, longitude = area_json_data
check_in_data = {
"latitude": float(latitude),
"longitude": float(longitude),
"nationcode": "156",
"country": "China",
"areaJSON": areaJson,
"areaJSON": area_json,
"inArea": 1
}
return json.dumps(checkInData)
return json.dumps(check_in_data)

def findAreaJson(self, areaList, userArea):
for area in areaList:
if area.get('name') == userArea:
areaJson = {
def find_area_json(self, area_list, user_area):
for area in area_list:
if area.get('name') == user_area:
area_json = {
"type": 0,
"circle": {
"latitude": area.get('latitude'),
Expand All @@ -177,54 +181,63 @@ def findAreaJson(self, areaList, userArea):
"id": area.get('id'),
"name": area.get('name')
}
return (json.dumps(areaJson), area.get('latitude'), area.get('longitude'))
return (json.dumps(area_json), area.get('latitude'), area.get('longitude'))
return None

def nightSign(self):
self.getSignList()
if (signCount := len(self.signData)) == 0:
def night_sign(self):
self.get_sign_list()
if (sign_count := len(self.sign_data)) == 0:
logging.info("No sign-in task found!")
return
for sign in self.signData:
checkInData = self.generateCheckInRequest(sign.get('areaList'), sign.get('userArea'))
for sign in self.sign_data:
check_in_data = self.generate_check_in_request(sign.get('areaList'), sign.get('userArea'))
logging.debug(f"Check-in data: {check_in_data}")
headers = {'Host': "gw.wozaixiaoyuan.com", 'Cookie': self.cookie}
id_ = sign.get('id')
signId = sign.get('signId')
url = f"https://gw.wozaixiaoyuan.com/sign/mobile/receive/doSignByArea?id={id_}&schoolId={self.schoolId}&signId={signId}"
res = requests.post(url, headers=headers, data=checkInData)
sign_id = sign.get('signId')
url = f"https://gw.wozaixiaoyuan.com/sign/mobile/receive/doSignByArea?id={id_}&schoolId={self.school_id}&signId={sign_id}"
res = requests.post(url, headers=headers, data=check_in_data)
text = json.loads(res.text)
if text['code'] == 0:
logging.info(f"{sign.get('userArea')} sign-in successful!")
else:
logging.error(f"{sign.get('userArea')} sign-in failed!")


def runUsers():
def run_users():
with open('users.toml', 'r') as file:
data = toml.load(file)['user']

# show the count of users
logging.info(f"Running {len(data)} user(s)...")

for i, userData in enumerate(data):

for i, user_data in enumerate(data):
if i != 0:
print("-" * 50) # Separator between different users
logging.info(f"Running user {userData.get('name')}...")
logging.info(f"Running user {user_data.get('name')}...")
try:
u = User(userData.get('username'), userData.get('password'), userData.get('schoolId'))
u.nightSign()
u = User(user_data.get('username'), user_data.get('password'), user_data.get('school_id'))
u.night_sign()
except Exception as e:
logging.error(f"An error occurred while running user {userData.get('name')}: {str(e)}")
logging.error(f"An error occurred while running user {user_data.get('name')}: {str(e)}")
logging.error(traceback.format_exc())
if i == len(data) - 1:
print("-" * 50)


if __name__ == "__main__":
# judge whether is the development environment
if os.environ.get('ENV') == 'DEV':
# Set the logging level to DEBUG
logger.setLevel(logging.DEBUG)
# Run the script
run_users()
exit(0)

# Define your cron expression here
cronExpression = "*/5 22 * * *" # Runs at 1:00 AM every day
cronExpression = "*/5 22 * * *" # Run the script every day at 22:00

now = datetime.now().replace(tzinfo=tz.gettz())

# Create a cron iterator
cron = croniter(cronExpression)
cron = croniter(cronExpression,now)

# Log the welcome message
logging.info("Welcome to Wozaixiaoyuan Night Sign-in Script!")
Expand All @@ -234,6 +247,8 @@ def runUsers():
# Get the next scheduled time
nextRunTime = cron.get_next(float)

logging.info(f"Next scheduled run time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(nextRunTime))}")

# Calculate the delay until the next scheduled time
delay = nextRunTime - time.time()

Expand All @@ -242,4 +257,4 @@ def runUsers():
time.sleep(delay)

# Execute the task
runUsers()
run_users()

0 comments on commit 196b853

Please sign in to comment.