-
Notifications
You must be signed in to change notification settings - Fork 0
/
csust_api.py
125 lines (100 loc) · 3.84 KB
/
csust_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import re
import requests
import json
from nonebot import logger
# 配置
QUERY_URL = "http://yktwd.csust.edu.cn:8988/web/Common/Tsm.html"
HEADERS = {"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
# 校区和楼栋的映射
CAMPUS_IDS = {"云塘": "0030000000002501", "金盆岭": "0030000000002502"}
def get_buildings_for_campus(campus_name, aid):
"""查询指定校区的所有楼栋信息"""
data = {
"jsondata": {
"query_elec_building": {
"aid": aid,
"account": "000001",
"area": {
"area": campus_name + "校区",
"areaname": campus_name + "校区",
},
}
},
"funname": "synjones.onecard.query.elec.building",
"json": "true",
}
try:
response = requests.post(
QUERY_URL,
headers=HEADERS,
data={
"jsondata": json.dumps(data["jsondata"]),
"funname": data["funname"],
"json": data["json"],
},
)
response.raise_for_status()
result = response.json()
building_info = result.get("query_elec_building", {}).get("buildingtab", [])
buildings = {item["building"]: item["buildingid"] for item in building_info}
logger.info(
f"成功获取到{campus_name}校区的楼栋信息,共 {len(buildings)} 个楼栋。"
)
return dict(sorted(buildings.items(), key=lambda item: int(item[1])))
except requests.exceptions.RequestException as e:
logger.error(f"{campus_name}校区的楼栋查询失败: {e}")
return {}
def fetch_building_data():
"""爬取并保存所有校区楼栋信息"""
all_buildings = {}
for campus_name, aid in CAMPUS_IDS.items():
buildings = get_buildings_for_campus(campus_name, aid)
all_buildings[campus_name] = buildings
# time.sleep(1) # 控制请求频率
return all_buildings
def fetch_electricity_data(campus, building_id, room_id):
"""获取指定校区、楼栋、宿舍的电量信息"""
aid = CAMPUS_IDS.get(campus)
if not aid:
raise ValueError(f"未知校区: {campus}")
query_params = {
"jsondata": {
"query_elec_roominfo": {
"aid": aid,
"account": "000001",
"room": {"roomid": room_id, "room": room_id},
"floor": {"floorid": "", "floor": ""},
"area": {"area": f"{campus}校区", "areaname": f"{campus}校区"},
"building": {"buildingid": building_id, "building": ""},
}
},
"funname": "synjones.onecard.query.elec.roominfo",
"json": "true",
}
try:
encoded_data = {
"jsondata": json.dumps(query_params["jsondata"]),
"funname": query_params["funname"],
"json": query_params["json"],
}
response = requests.post(QUERY_URL, headers=HEADERS, data=encoded_data)
response.raise_for_status()
result = response.json()
info = result.get("query_elec_roominfo", {})
electricity = info.get("errmsg", "未知电量")
match = re.search(r"(\d+(\.\d+)?)", electricity)
return float(match.group()) if match else "未知"
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
logger.info(f"电量查询失败: {e}")
return {"error": f"查询失败: {e}"}
# 主函数,用于直接运行脚本时调用
if __name__ == "__main__":
# 示例:查询楼栋信息
# fetch_building_data()
# 示例:查询宿舍电量
campus = "云塘"
building_id = "557"
room_id = "A543"
electricity_data = fetch_electricity_data(campus, building_id, room_id)
logger.info(electricity_data)
building_data = fetch_building_data()