Skip to content

Commit

Permalink
Merge pull request qiyeboy#86 from xsren/master
Browse files Browse the repository at this point in the history
增加控制内存使用量和下载、检查IP并发速度的选项
  • Loading branch information
qiyeboy authored May 6, 2017
2 parents 5519764 + de39c2f commit a8cd638
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 20 deletions.
4 changes: 3 additions & 1 deletion IPProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from validator.Validator import validator, getMyIP
from spider.ProxyCrawl import startProxyCrawl

from config import TASK_QUEUE_SIZE

if __name__ == "__main__":
myip = getMyIP()
DB_PROXY_NUM = Value('i', 0)
q1 = Queue()
q1 = Queue(maxsize=TASK_QUEUE_SIZE)
q2 = Queue()
p0 = Process(target=start_api_server)
p1 = Process(target=startProxyCrawl, args=(q1, DB_PROXY_NUM,myip))
Expand Down
6 changes: 6 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,9 @@ def get_header():

#下面配置squid,现在还没实现
#SQUID={'path':None,'confpath':'C:/squid/etc/squid.conf'}

MAX_CHECK_PROCESS = 2 # CHECK_PROXY最大进程数
MAX_CHECK_CONCURRENT_PER_PROCESS = 30 # CHECK_PROXY时每个进程的最大并发
TASK_QUEUE_SIZE = 50 # 任务队列SIZE
MAX_DOWNLOAD_CONCURRENT = 3 # 从免费代理网站下载时的最大并发
CHECK_WATI_TIME = 1#进程数达到上限时的等待时间
20 changes: 17 additions & 3 deletions spider/ProxyCrawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from multiprocessing import Queue, Process, Value

from api.apiServer import start_api_server
from config import THREADNUM, parserList, UPDATE_TIME, MINNUM
from config import THREADNUM, parserList, UPDATE_TIME, MINNUM, MAX_CHECK_CONCURRENT_PER_PROCESS, MAX_DOWNLOAD_CONCURRENT
from db.DataStore import store_data, sqlhelper
from spider.HtmlDownloader import Html_Downloader
from spider.HtmlPraser import Html_Parser
Expand Down Expand Up @@ -47,6 +47,9 @@ def run(self):
spawns = []
for proxy in proxylist:
spawns.append(gevent.spawn(detect_from_db, self.myip, proxy, self.proxies))
if len(spawns) >= MAX_CHECK_CONCURRENT_PER_PROCESS:
gevent.joinall(spawns)
spawns= []
gevent.joinall(spawns)
self.db_proxy_num.value = len(self.proxies)
str = 'IPProxyPool----->>>>>>>>db exists ip:%d' % len(self.proxies)
Expand All @@ -55,7 +58,13 @@ def run(self):
str += '\r\nIPProxyPool----->>>>>>>>now ip num < MINNUM,start crawling...'
sys.stdout.write(str + "\r\n")
sys.stdout.flush()
self.crawl_pool.map(self.crawl, parserList)
spawns = []
for p in parserList:
spawns.append(gevent.spawn(self.crawl, p))
if len(spawns) >= MAX_DOWNLOAD_CONCURRENT:
gevent.joinall(spawns)
spawns= []
gevent.joinall(spawns)
else:
str += '\r\nIPProxyPool----->>>>>>>>now ip num meet the requirement,wait UPDATE_TIME...'
sys.stdout.write(str + "\r\n")
Expand All @@ -74,7 +83,12 @@ def crawl(self, parser):
proxy_str = '%s:%s' % (proxy['ip'], proxy['port'])
if proxy_str not in self.proxies:
self.proxies.add(proxy_str)
self.queue.put(proxy)
while True:
if self.queue.full():
time.sleep(0.1)
else:
self.queue.put(proxy)
break


if __name__ == "__main__":
Expand Down
33 changes: 17 additions & 16 deletions validator/Validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,26 @@ def validator(queue1, queue2, myip):
proc_pool = {} # 所有进程列表
cntl_q = Queue() # 控制信息队列
while True:
if not cntl_q.empty():
# 处理已结束的进程
try:
pid = cntl_q.get()
proc = proc_pool.pop(pid)
proc_ps = psutil.Process(pid)
proc_ps.kill()
proc_ps.wait()
except Exception as e:
pass
# print(e)
# print(" we are unable to kill pid:%s" % (pid))
try:
# proxy_dict = {'source':'crawl','data':proxy}
proxy = queue1.get(timeout=10)
if len(proc_pool) >= config.MAX_CHECK_PROCESS:
time.sleep(config.CHECK_WATI_TIME)
continue
proxy = queue1.get()
tasklist.append(proxy)
if len(tasklist) > 500:
if len(tasklist) >= config.MAX_CHECK_CONCURRENT_PER_PROCESS:
p = Process(target=process_start, args=(tasklist, myip, queue2, cntl_q))
p.start()
proc_pool[p.pid] = p
Expand All @@ -58,20 +73,6 @@ def validator(queue1, queue2, myip):
proc_pool[p.pid] = p
tasklist = []

if not cntl_q.empty():
# 处理已结束的进程
try:
pid = cntl_q.get()
proc = proc_pool.pop(pid)
proc_ps = psutil.Process(pid)
proc_ps.kill()
proc_ps.wait()
except Exception as e:
pass
# print(e)
# print(" we are unable to kill pid:%s" % (pid))


def process_start(tasks, myip, queue2, cntl):
spawns = []
for task in tasks:
Expand Down

0 comments on commit a8cd638

Please sign in to comment.