-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathdc_pipeline.py
150 lines (112 loc) · 3.5 KB
/
dc_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import pandas as pd
import requests
import folium
import luigi
YEARS = {2012: 'http://opendata.dc.gov/datasets/5f4ea2f25c9a45b29e15e53072126739_7.csv',
2013: 'http://opendata.dc.gov/datasets/4911fcf3527246ae9bf81b5553a48c4d_6.csv',
2014: 'http://opendata.dc.gov/datasets/d4891ca6951947538f6707a6b07ae225_5.csv',
2015: 'http://opendata.dc.gov/datasets/981c105beef74af38cc4090992661264_25.csv',
2016: 'http://opendata.dc.gov/datasets/5d14ae7dcd1544878c54e61edda489c3_24.csv'}
class DownloadTask(luigi.ExternalTask):
"""
downloads data from the portal.
"""
year = luigi.IntParameter(default=2012)
def run(self):
pass
def output(self):
return luigi.LocalTarget('./data/permits-dc-%s.csv' % str(self.year))
class cleanCSV(luigi.Task):
"""
cleans a CSV into the format we'd like for analysis.
you'll want to grab the Ward, Fees, Permit, and Geospacial fees.
"""
def requires(self):
return DownloadTask(self.param)
def run(self):
pass
def output(self):
pass
class mergeDatasets(luigi.Task):
"""
merges the datasets
"""
def requires(self):
return [cleanCSV(year) for year in range(2012,2017)]
def run(self):
pass
def output(self):
pass
class importIntoPandasDF(luigi.Task):
"""
converts the CSVs into pandas dataframes, saves as pickle file.
"""
def requires(self):
return mergeDatasets
def run(self):
pass
def output(self):
pass
class computeWards(luigi.Task):
"""
compute the development by ward per year and save.
Basically, you'll want to value_counts() on the ward field.
How many permits per ward were issued
"""
def requires(self):
return importIntoPandasDF()
def run(self):
pass
def output(self):
pass
class makeMap(luigi.Task):
"""
make a map of development
"""
def requires(self):
return importIntoPandasDF()
def run(self):
"""
We're gonna use Folium to make a map.
I'm giving you some basic code here to get a map object and show you
how to plot a marker.
"""
# make a map
map= folium.Map(location=[38.9072, -77.0369],
zoom_start=12)
# add a marker
folium.Marker([45.3288, -121.6625], popup='Mt. Hood Meadows').add_to(map)
pass
def output(self):
pass
class makePredictions(luigi.Task):
"""
Make predictions for given next years number of ward development,
Given the ward sums, predict the values using a simple regression.
NB: This is not good modeling, but I'm trying to demostrate plumbing here.
"""
def requires(self):
return computeWards()
def run(self):
## Here's the basic skeleton of how do do a linear regression
from sklearn.linear_model import LinearRegression
data = np.asarray(df)
lr = LinearRegression()
X, y = data[:, 1], data[:, 0] ## THIS DEPENDS ON HOW YOU SHAPE YOUR DATA - X should be years, y should be counts
lr.fit(X, y)
lr.predict(2017)
pass
def output(self):
pass
class makeReport(luigi.Task):
"""
Gathers info and saves to report
"""
def requires(self):
return makePredictions(), makeMap()
def run(self):
pass
def output(self):
pass
if __name__ == '__main__':
luigi.run(['DownloadTask', '--local-scheduler'])