-
Notifications
You must be signed in to change notification settings - Fork 403
/
Copy pathget_data.py
101 lines (89 loc) · 3.31 KB
/
get_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# -*- coding:utf-8 -*-
"""
Author: BigCat
"""
import argparse
import requests
import pandas as pd
from bs4 import BeautifulSoup
from loguru import logger
from config import os, name_path, data_file_name
parser = argparse.ArgumentParser()
parser.add_argument('--name', default="ssq", type=str, help="选择爬取数据: 双色球/大乐透")
args = parser.parse_args()
def get_url(name):
"""
:param name: 玩法名称
:return:
"""
url = "https://datachart.500.com/{}/history/".format(name)
path = "newinc/history.php?start={}&end="
return url, path
def get_current_number(name):
""" 获取最新一期数字
:return: int
"""
url, _ = get_url(name)
r = requests.get("{}{}".format(url, "history.shtml"), verify=False)
r.encoding = "gb2312"
soup = BeautifulSoup(r.text, "lxml")
current_num = soup.find("div", class_="wrap_datachart").find("input", id="end")["value"]
return current_num
def spider(name, start, end, mode):
""" 爬取历史数据
:param name 玩法
:param start 开始一期
:param end 最近一期
:param mode 模式,train:训练模式,predict:预测模式(训练模式会保持文件)
:return:
"""
url, path = get_url(name)
url = "{}{}{}".format(url, path.format(start), end)
r = requests.get(url=url, verify=False)
r.encoding = "gb2312"
soup = BeautifulSoup(r.text, "lxml")
trs = soup.find("tbody", attrs={"id": "tdata"}).find_all("tr")
data = []
for tr in trs:
item = dict()
if name == "ssq":
item[u"期数"] = tr.find_all("td")[0].get_text().strip()
for i in range(6):
item[u"红球_{}".format(i+1)] = tr.find_all("td")[i+1].get_text().strip()
item[u"蓝球"] = tr.find_all("td")[7].get_text().strip()
data.append(item)
elif name == "dlt":
item[u"期数"] = tr.find_all("td")[0].get_text().strip()
for i in range(5):
item[u"红球_{}".format(i+1)] = tr.find_all("td")[i+1].get_text().strip()
for j in range(2):
item[u"蓝球_{}".format(j+1)] = tr.find_all("td")[6+j].get_text().strip()
data.append(item)
else:
logger.warning("抱歉,没有找到数据源!")
if mode == "train":
df = pd.DataFrame(data)
df.to_csv("{}{}".format(name_path[name]["path"], data_file_name), encoding="utf-8")
return pd.DataFrame(data)
elif mode == "predict":
return pd.DataFrame(data)
def run(name):
"""
:param name: 玩法名称
:return:
"""
current_number = get_current_number(name)
logger.info("【{}】最新一期期号:{}".format(name_path[name]["name"], current_number))
logger.info("正在获取【{}】数据。。。".format(name_path[name]["name"]))
if not os.path.exists(name_path[name]["path"]):
os.makedirs(name_path[name]["path"])
data = spider(name, 1, current_number, "train")
if "data" in os.listdir(os.getcwd()):
logger.info("【{}】数据准备就绪,共{}期, 下一步可训练模型...".format(name_path[name]["name"], len(data)))
else:
logger.error("数据文件不存在!")
if __name__ == "__main__":
if not args.name:
raise Exception("玩法名称不能为空!")
else:
run(name=args.name)