Skip to content

Commit

Permalink
Refactor code to improve performance and readability
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaoborui committed Mar 11, 2024
1 parent 58831a4 commit 66bb359
Showing 1 changed file with 71 additions and 68 deletions.
139 changes: 71 additions & 68 deletions wzxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,58 @@
logger.setLevel(logging.INFO)

# Create handlers
file_handler = logging.FileHandler('wozaixiaoyuan.log')
console_handler = logging.StreamHandler()
fileHandler = logging.FileHandler('wozaixiaoyuan.log')
consoleHandler = logging.StreamHandler()

# Create formatters and add them to the handlers
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
fileHandler.setFormatter(formatter)
consoleHandler.setFormatter(formatter)

# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.addHandler(fileHandler)
logger.addHandler(consoleHandler)


class User:
def __init__(self, username, password, school_id):
def __init__(self, username, password, schoolId):
self.username = username
self.password = password
self.school_id = school_id
self.sign_data = []
self.schoolId = schoolId
self.signData = []
self.cookie = None

# Try to read JWSESSION from local cache and test login status
self.test_cached_session()
self.testCachedSession()

def encrypt(self, text):
key = (str(self.username) + "0000000000000000")[:16]
cipher = AES.new(key.encode('utf-8'), AES.MODE_ECB)
padded_text = pad(text.encode('utf-8'), AES.block_size)
encrypted_text = cipher.encrypt(padded_text)
return b64encode(encrypted_text).decode('utf-8')
paddedText = pad(text.encode('utf-8'), AES.block_size)
encryptedText = cipher.encrypt(paddedText)
return b64encode(encryptedText).decode('utf-8')

def login(self):
encrypted_text = self.encrypt(self.password)
login_url = 'https://gw.wozaixiaoyuan.com/basicinfo/mobile/login/username'
encryptedText = self.encrypt(self.password)
loginUrl = 'https://gw.wozaixiaoyuan.com/basicinfo/mobile/login/username'
params = {
"schoolId": self.school_id,
"schoolId": self.schoolId,
"username": self.username,
"password": encrypted_text
"password": encryptedText
}
login_req = requests.post(login_url, params=params)
text = json.loads(login_req.text)
loginReq = requests.post(loginUrl, params=params)
text = json.loads(loginReq.text)
if text['code'] == 0:
set_cookie = login_req.headers['Set-Cookie']
jws = re.search(r'JWSESSION=.*?;', str(set_cookie)).group(0)
setCookie = loginReq.headers['Set-Cookie']
jws = re.search(r'JWSESSION=.*?;', str(setCookie)).group(0)
self.cookie = jws
self.write_jws_to_cache({self.username: jws}) # Write obtained JWSESSION to local cache
self.writeJwsToCache({self.username: jws}) # Write obtained JWSESSION to local cache
return True
else:
logging.error(f"{self.username} login error, please check account password!")
return False

def test_login_status(self):
def testLoginStatus(self):
if self.cookie:
headers = {'Host': "gw.wozaixiaoyuan.com", 'Cookie': self.cookie}
url = "https://gw.wozaixiaoyuan.com/health/mobile/health/getBatch"
Expand All @@ -79,33 +81,33 @@ def test_login_status(self):
logging.error("Please log in first!")
return False

def write_jws_to_cache(self, jws_dict):
def writeJwsToCache(self, jwsDict):
try:
with open("users_jws.json", 'r') as file:
data = json.load(file)
except FileNotFoundError:
data = {}

data.update(jws_dict)
data.update(jwsDict)

with open("users_jws.json", 'w') as file:
json.dump(data, file)

def read_jws_from_cache(self):
def readJwsFromCache(self):
try:
with open("users_jws.json", 'r') as file:
jws_dict = json.load(file)
return jws_dict
jwsDict = json.load(file)
return jwsDict
except FileNotFoundError:
return {}

def test_cached_session(self):
cached_jws = self.read_jws_from_cache()
if cached_jws:
self.cookie = cached_jws.get(self.username)
def testCachedSession(self):
cachedJws = self.readJwsFromCache()
if cachedJws:
self.cookie = cachedJws.get(self.username)
if self.cookie:
# Test login status
if self.test_login_status():
if self.testLoginStatus():
return
else:
logging.error("JWSESSION in local cache expired, re-login...")
Expand All @@ -128,7 +130,7 @@ def test_cached_session(self):
else:
logging.error("Login failed, please check account password!")

def get_sign_list(self):
def getSignList(self):
if not self.cookie:
logging.error("Please log in first!")
return
Expand All @@ -141,31 +143,31 @@ def get_sign_list(self):
if 'code' in data and data['code'] == 0 and 'data' in data:
for item in data['data']:
if item.get('type') == 0 and item.get('signStatus') == 1:
sign_info = {'signId': item.get('signId'), 'id': item.get('id'),'userArea':item.get('userArea'),'areaList':item.get('areaList')}
self.sign_data.append(sign_info)
signInfo = {'signId': item.get('signId'), 'id': item.get('id'),'userArea':item.get('userArea'),'areaList':item.get('areaList')}
self.signData.append(signInfo)
logging.info("Sign-in list retrieved successfully!")
else:
logging.error("Failed to retrieve sign-in list!")

def generate_check_in_request(self, area_list, user_area):
check_in_data = {}
area_json_data = self.find_area_json(area_list, user_area)
if area_json_data:
area_json, latitude, longitude = area_json_data
check_in_data = {
def generateCheckInRequest(self, areaList, userArea):
checkInData = {}
areaJsonData = self.findAreaJson(areaList, userArea)
if areaJsonData:
areaJson, latitude, longitude = areaJsonData
checkInData = {
"latitude": float(latitude),
"longitude": float(longitude),
"nationcode": "156",
"country": "China",
"areaJSON": area_json,
"areaJSON": areaJson,
"inArea": 1
}
return json.dumps(check_in_data)
return json.dumps(checkInData)

def find_area_json(self, area_list, user_area):
for area in area_list:
if area.get('name') == user_area:
area_json = {
def findAreaJson(self, areaList, userArea):
for area in areaList:
if area.get('name') == userArea:
areaJson = {
"type": 0,
"circle": {
"latitude": area.get('latitude'),
Expand All @@ -175,62 +177,63 @@ def find_area_json(self, area_list, user_area):
"id": area.get('id'),
"name": area.get('name')
}
return (json.dumps(area_json), area.get('latitude'), area.get('longitude'))
return (json.dumps(areaJson), area.get('latitude'), area.get('longitude'))
return None

def night_sign(self):
self.get_sign_list()
if (sign_count := len(self.sign_data)) == 0:
def nightSign(self):
self.getSignList()
if (signCount := len(self.signData)) == 0:
logging.info("No sign-in task found!")
return
for sign in self.sign_data:
check_in_data = self.generate_check_in_request(sign.get('areaList'), sign.get('userArea'))
for sign in self.signData:
checkInData = self.generateCheckInRequest(sign.get('areaList'), sign.get('userArea'))
headers = {'Host': "gw.wozaixiaoyuan.com", 'Cookie': self.cookie}
id_ = sign.get('id')
sign_id = sign.get('signId')
url = f"https://gw.wozaixiaoyuan.com/sign/mobile/receive/doSignByArea?id={id_}&schoolId={self.school_id}&signId={sign_id}"
res = requests.post(url, headers=headers, data=check_in_data)
signId = sign.get('signId')
url = f"https://gw.wozaixiaoyuan.com/sign/mobile/receive/doSignByArea?id={id_}&schoolId={self.schoolId}&signId={signId}"
res = requests.post(url, headers=headers, data=checkInData)
text = json.loads(res.text)
if text['code'] == 0:
logging.info(f"{sign.get('userArea')} sign-in successful!")
else:
logging.error(f"{sign.get('userArea')} sign-in failed!")


def run_users():
def runUsers():
with open('users.toml', 'r') as file:
data = toml.load(file)['user']

for i, user_data in enumerate(data):
for i, userData in enumerate(data):
if i != 0:
print("-" * 50) # Separator between different users
logging.info(f"Running user {user_data.get('name')}...")
logging.info(f"Running user {userData.get('name')}...")
try:
u = User(user_data.get('username'), user_data.get('password'), user_data.get('school_id'))
u.night_sign()
u = User(userData.get('username'), userData.get('password'), userData.get('schoolId'))
u.nightSign()
except Exception as e:
logging.error(f"An error occurred while running user {user_data.get('name')}: {str(e)}")
logging.error(f"An error occurred while running user {userData.get('name')}: {str(e)}")
if i == len(data) - 1:
print("-" * 50)


if __name__ == "__main__":
# Define your cron expression here
cron_expression = "*/5 22 * * *" # Runs at 1:00 AM every day
cronExpression = "*/5 22 * * *" # Runs at 1:00 AM every day

# Create a cron iterator
cron = croniter(cron_expression)
cron = croniter(cronExpression)

# Infinite loop to check if it's time to execute the task
while True:
# Get the next scheduled time
next_run_time = cron.get_next(float)
nextRunTime = cron.get_next(float)

# Calculate the delay until the next scheduled time
delay = next_run_time - time.time()
delay = nextRunTime - time.time()

# Sleep until the next scheduled time
if delay > 0:
time.sleep(delay)

# Execute the task
run_users()
runUsers()

0 comments on commit 66bb359

Please sign in to comment.