Skip to content

Commit

Permalink
Merge pull request #1 from binux/master
Browse files Browse the repository at this point in the history
Merge original project pull request at 2015-10-09
  • Loading branch information
machinewu committed Oct 9, 2015
2 parents 7d0dce8 + 23a5db0 commit f461cea
Show file tree
Hide file tree
Showing 18 changed files with 165 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ MAINTAINER binux <[email protected]>
# install python
RUN apt-get update && \
apt-get install -y python python-dev python-distribute python-pip && \
apt-get install -y libcurl4-openssl-dev libxml2-dev libxslt1-dev python-lxml python-mysqldb
apt-get install -y libcurl4-openssl-dev libxml2-dev libxslt1-dev python-lxml python-mysqldb libpq-dev

# install requirements
ADD requirements.txt /opt/pyspider/requirements.txt
Expand Down
2 changes: 2 additions & 0 deletions docs/Deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ builtin:
None
```

> Hint for postgresql: you need to create database with encoding utf8 by your own. pyspider will not create database for you.
running
-------

Expand Down
2 changes: 1 addition & 1 deletion docs/apis/self.crawl.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def on_start(self):

the following parameters are optional

* `age` - the period of validity of the task. The page would be regarded as not modified during the period. _default: 0(never recrawl)_ <a name="age" href="#age">¶</a>
* `age` - the period of validity of the task. The page would be regarded as not modified during the period. _default: -1(never recrawl)_ <a name="age" href="#age">¶</a>

```python
@config(age=10 * 24 * 60 * 60)
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorial/AJAX-and-more-HTTP.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ AJAX

[AJAX] is short for asynchronous JavaScript + XML. AJAX is using existing standards to update parts of a web page without loading the whole page. A common usage of AJAX is loading [JSON] data and render to HTML on the client side.

You may find elements mission in HTML fetched by pyspider or [wget](https://www.gnu.org/software/wget/). When you open it in browser some elements appear after page loaded with(maybe not) a 'loading' animation or words. For example, we want to scrape all channels of Dota 2 from [http://www.twitch.tv/directory/game/Dota%202](http://www.twitch.tv/directory/game/Dota%202)
You may find elements missing in HTML fetched by pyspider or [wget](https://www.gnu.org/software/wget/). When you open it in browser some elements appear after page loaded with(maybe not) a 'loading' animation or words. For example, we want to scrape all channels of Dota 2 from [http://www.twitch.tv/directory/game/Dota%202](http://www.twitch.tv/directory/game/Dota%202)

![twitch](../imgs/twitch.png)

Expand Down
8 changes: 6 additions & 2 deletions pyspider/database/sqlalchemy/projectdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import six
import time
import sqlalchemy.exc

from sqlalchemy import create_engine, MetaData, Table, Column, String, Float, Text
from sqlalchemy.engine.url import make_url
Expand Down Expand Up @@ -41,8 +42,11 @@ def __init__(self, url):
if self.url.database:
database = self.url.database
self.url.database = None
engine = create_engine(self.url, convert_unicode=False)
engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database)
try:
engine = create_engine(self.url, convert_unicode=False)
engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database)
except sqlalchemy.exc.OperationalError:
pass
self.url.database = database
self.engine = create_engine(url, convert_unicode=False)
self.table.create(self.engine, checkfirst=True)
Expand Down
8 changes: 6 additions & 2 deletions pyspider/database/sqlalchemy/resultdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import six
import time
import json
import sqlalchemy.exc

from sqlalchemy import (create_engine, MetaData, Table, Column,
String, Float, LargeBinary)
Expand Down Expand Up @@ -40,8 +41,11 @@ def __init__(self, url):
if self.url.database:
database = self.url.database
self.url.database = None
engine = create_engine(self.url, convert_unicode=True)
engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database)
try:
engine = create_engine(self.url, convert_unicode=True)
engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database)
except sqlalchemy.exc.OperationalError:
pass
self.url.database = database
self.engine = create_engine(url, convert_unicode=True)

Expand Down
8 changes: 6 additions & 2 deletions pyspider/database/sqlalchemy/taskdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import six
import time
import json
import sqlalchemy.exc

from sqlalchemy import (create_engine, MetaData, Table, Column, Index,
Integer, String, Float, LargeBinary, func)
Expand Down Expand Up @@ -46,8 +47,11 @@ def __init__(self, url):
if self.url.database:
database = self.url.database
self.url.database = None
engine = create_engine(self.url, convert_unicode=True)
engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database)
try:
engine = create_engine(self.url, convert_unicode=True)
engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database)
except sqlalchemy.exc.OperationalError:
pass
self.url.database = database
self.engine = create_engine(self.url, convert_unicode=True)

Expand Down
2 changes: 1 addition & 1 deletion pyspider/libs/base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def _crawl(self, url, **kwargs):
"""
task = {}

assert len(url) < 1024, "Maximum URL length error: len(url) > 1024"
assert len(url) < 1024, "Maximum (1024) URL length error."

if kwargs.get('callback'):
callback = kwargs['callback']
Expand Down
2 changes: 1 addition & 1 deletion pyspider/libs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def pretty_unicode(string):
try:
return string.decode("utf8")
except UnicodeDecodeError:
return string.decode('Latin-1').encode('unicode_escape')
return string.decode('Latin-1').encode('unicode_escape').decode("utf8")


def unicode_string(string):
Expand Down
4 changes: 3 additions & 1 deletion pyspider/message_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def connect_message_queue(name, url=None, maxsize=0):
except:
db = 0

return Queue(name, parsed.hostname, parsed.port, db=db, maxsize=maxsize)
password = parsed.password or None

return Queue(name, parsed.hostname, parsed.port, db=db, maxsize=maxsize, password=password)
else:
if url.startswith('kombu+'):
url = url[len('kombu+'):]
Expand Down
2 changes: 1 addition & 1 deletion pyspider/message_queue/beanstalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def stats(self):
try:
with self.lock:
stats = self.connection.stats_tube(self.name)
except beanstalkc.CommandFailed, err:
except beanstalkc.CommandFailed as err:
# tube is empty
if err[1] == 'NOT_FOUND':
return {}
Expand Down
4 changes: 2 additions & 2 deletions pyspider/message_queue/redis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RedisQueue(object):
max_timeout = 0.3

def __init__(self, name, host='localhost', port=6379, db=0,
maxsize=0, lazy_limit=True):
maxsize=0, lazy_limit=True, password=None):
"""
Constructor for RedisQueue
Expand All @@ -31,7 +31,7 @@ def __init__(self, name, host='localhost', port=6379, db=0,
for better performance.
"""
self.name = name
self.redis = redis.StrictRedis(host=host, port=port, db=db)
self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)
self.maxsize = maxsize
self.lazy_limit = lazy_limit
self.last_qsize = 0
Expand Down
17 changes: 9 additions & 8 deletions pyspider/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ def result_worker(ctx, result_cls):
help='webui bind to host')
@click.option('--cdn', default='//cdnjscn.b0.upaiyun.com/libs/',
help='js/css cdn server')
@click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler')
@click.option('--fetcher-rpc', callback=connect_rpc, help='xmlrpc path of fetcher')
@click.option('--scheduler-rpc', help='xmlrpc path of scheduler')
@click.option('--fetcher-rpc', help='xmlrpc path of fetcher')
@click.option('--max-rate', type=float, help='max rate for each project')
@click.option('--max-burst', type=float, help='max burst for each project')
@click.option('--username', envvar='WEBUI_USERNAME',
Expand Down Expand Up @@ -440,12 +440,13 @@ def all(ctx, fetcher_num, processor_num, result_worker_num, run_in):

try:
# phantomjs
phantomjs_config = g.config.get('phantomjs', {})
phantomjs_config.setdefault('auto_restart', True)
threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config))
time.sleep(2)
if threads[-1].is_alive() and not g.get('phantomjs_proxy'):
g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)
if not g.get('phantomjs_proxy'):
phantomjs_config = g.config.get('phantomjs', {})
phantomjs_config.setdefault('auto_restart', True)
threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config))
time.sleep(2)
if threads[-1].is_alive() and not g.get('phantomjs_proxy'):
g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)

# result worker
result_worker_config = g.config.get('result_worker', {})
Expand Down
18 changes: 18 additions & 0 deletions pyspider/webui/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@
login_manager.init_app(app)


class AnonymousUser(login.AnonymousUserMixin):

def is_anonymous(self):
return True

def is_active(self):
return False

def is_authenticated(self):
return False

def get_id(self):
return


class User(login.UserMixin):

def __init__(self, id, password):
Expand All @@ -32,6 +47,9 @@ def is_active(self):
return self.is_authenticated()


login_manager.anonymous_user = AnonymousUser


@login_manager.request_loader
def load_user_from_request(request):
api_key = request.headers.get('Authorization')
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ six
amqp>=1.3.0
redis
kombu
psycopg2
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
'SQLAlchemy>=0.9.7',
'redis',
'kombu',
'psycopg2',
]
if sys.version_info < (3, 0):
extras_require_all.extend([
Expand Down
37 changes: 36 additions & 1 deletion tests/test_webui.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ def setUpClass(self):
run_in_thread(scheduler.xmlrpc_run)
run_in_thread(scheduler.run)

ctx = run.fetcher.make_context('fetcher', [], self.ctx)
ctx = run.fetcher.make_context('fetcher', [
'--xmlrpc',
'--xmlrpc-port', '24444',
], self.ctx)
fetcher = run.fetcher.invoke(ctx)
run_in_thread(fetcher.xmlrpc_run)
run_in_thread(fetcher.run)

ctx = run.processor.make_context('processor', [], self.ctx)
Expand Down Expand Up @@ -347,6 +351,37 @@ def test_a50_export_csv(self):
self.assertEqual(rv.status_code, 200)
self.assertIn(b'url,title,url', rv.data)

def test_a60_fetch_via_cannot_connect_fetcher(self):
ctx = run.webui.make_context('webui', [
'--fetcher-rpc', 'http://localhost:20000/',
], self.ctx)
app = run.webui.invoke(ctx)
app = app.test_client()
rv = app.post('/debug/test_project/run', data={
'script': self.script_content,
'task': self.task_content
})
self.assertEqual(rv.status_code, 200)
data = json.loads(utils.text(rv.data))
self.assertGreater(len(data['logs']), 0)
self.assertEqual(len(data['follows']), 0)

def test_a70_fetch_via_fetcher(self):
ctx = run.webui.make_context('webui', [
'--fetcher-rpc', 'http://localhost:24444/',
], self.ctx)
app = run.webui.invoke(ctx)
app = app.test_client()
rv = app.post('/debug/test_project/run', data={
'script': self.script_content,
'task': self.task_content
})
self.assertEqual(rv.status_code, 200)
data = json.loads(utils.text(rv.data))
self.assertEqual(len(data['logs']), 0, data['logs'])
self.assertIn(b'follows', rv.data)
self.assertGreater(len(data['follows']), 0)

def test_h000_auth(self):
ctx = run.webui.make_context('webui', [
'--scheduler-rpc', 'http://localhost:23333/',
Expand Down
69 changes: 69 additions & 0 deletions tools/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
# Author: Binux<[email protected]>
# http://binux.me
# Created on 2015-09-30 23:22:46

import click
import logging
from pyspider.database.base.projectdb import ProjectDB
from pyspider.database.base.taskdb import TaskDB
from pyspider.database.base.resultdb import ResultDB
from pyspider.database import connect_database
from pyspider.libs.utils import unicode_obj
from multiprocessing.pool import ThreadPool as Pool

logging.getLogger().setLevel(logging.INFO)


def taskdb_migrating(project, from_connection, to_connection):
logging.info("taskdb: %s", project)
f = connect_database(from_connection)
t = connect_database(to_connection)
t.drop(project)
for status in range(1, 5):
for task in f.load_tasks(status, project=project):
t.insert(project, task['taskid'], task)


def resultdb_migrating(project, from_connection, to_connection):
logging.info("resultdb: %s", project)
f = connect_database(from_connection)
t = connect_database(to_connection)
t.drop(project)
for result in f.select(project):
t.save(project, result['taskid'], result['url'], result['result'])


@click.command()
@click.option('--pool', default=10, help='cocurrent worker size.')
@click.argument('from_connection', required=1)
@click.argument('to_connection', required=1)
def migrate(pool, from_connection, to_connection):
"""
Migrate tool for pyspider
"""
f = connect_database(from_connection)
t = connect_database(to_connection)

if isinstance(f, ProjectDB):
for each in f.get_all():
each = unicode_obj(each)
logging.info("projectdb: %s", each['name'])
t.drop(each['name'])
t.insert(each['name'], each)
elif isinstance(f, TaskDB):
pool = Pool(pool)
pool.map(
lambda x, f=from_connection, t=to_connection: taskdb_migrating(x, f, t),
f.projects)
elif isinstance(f, ResultDB):
pool = Pool(pool)
pool.map(
lambda x, f=from_connection, t=to_connection: resultdb_migrating(x, f, t),
f.projects)


if __name__ == '__main__':
migrate()

0 comments on commit f461cea

Please sign in to comment.