-
Notifications
You must be signed in to change notification settings - Fork 34
/
vix_spider.py
134 lines (103 loc) · 3.56 KB
/
vix_spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from billiard import Process
from scrapy import Spider
from scrapy import signals as scrapy_signals
from scrapy.crawler import Crawler
from kafka import KafkaProducer
from twisted.internet import reactor
from config import user_agent
from datetime import datetime
import logging
import json
# Set logger level
logging.basicConfig(level=logging.DEBUG)
class VIXCollectorPipeline:
"""Implementation of the Scrapy Pipeline that sends scraped VIX data
through Kafka producer.
Parameters
----------
server: list
List of Kafka brokers addresses.
topic: str
Specify Kafka topic to which the stream of data records will be published.
"""
def __init__(self, server, topic):
self.server = server
self.topic = topic
self.producer = KafkaProducer(bootstrap_servers=server,
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
def process_item(self, item, spider):
self.item = item
@classmethod
def from_crawler(cls, crawler):
return cls(server=crawler.spider.server,
topic=crawler.spider.topic)
def close_spider(self, spider):
# Send VIX data through kafka producer
self.producer.send(topic=self.topic, value=self.item)
self.producer.flush()
self.producer.close()
class VIXSpiderSpider(Spider):
"""Implementation of the Scrapy Spider that extracts VIX data from cnbc.com
Parameters
----------
current_dt: datetime.datetime()
Timestamp of real-time data (EST).
server: list
List of Kafka brokers addresses.
topic: str
Specify Kafka topic to which the stream of data records will be published.
Yields
------
dict
Dictionary that represents scraped item.
"""
name = 'vix_reports_spider'
allowed_domains = ['www.cnbc.com']
start_urls = ['https://www.cnbc.com/quotes/?symbol=.VIX']
custom_settings = {
'ITEM_PIPELINES': {
'vix_spider.VIXCollectorPipeline': 100
}
}
def __init__(self, current_dt, server, topic):
super(VIXSpiderSpider, self).__init__()
self.current_dt = datetime.strftime(current_dt, "%Y-%m-%d %H:%M:%S")
self.server = server
self.topic = topic
def parse(self, response):
vix = response.xpath("//span[@class='last original']/text()").extract_first()
yield {'VIX': float(vix),
'Timestamp': self.current_dt}
class CrawlerScript(Process):
"""Runs Spider multiple times within one script by utilizing billiard package
(tackle the ReactorNotRestartable error).
Parameters
----------
current_dt: datetime.datetime()
Timestamp of real-time data (EST).
server: list
List of Kafka brokers addresses.
topic: str
Specify Kafka topic to which the stream of data records will be published.
"""
def __init__(self, current_dt, server, topic):
Process.__init__(self)
self.current_dt = current_dt
self.server = server
self.topic = topic
self.crawler = Crawler(
VIXSpiderSpider,
settings={
'USER_AGENT': user_agent
}
)
self.crawler.signals.connect(reactor.stop, signal=scrapy_signals.spider_closed)
def run(self):
self.crawler.crawl(self.current_dt, self.server, self.topic)
reactor.run()
def run_vix_spider(current_dt, server, topic):
crawler = CrawlerScript(current_dt, server, topic)
# the script will block here until the crawling is finished
crawler.start()
crawler.join()