-
Notifications
You must be signed in to change notification settings - Fork 1
/
oilproc.pig
223 lines (190 loc) · 8.57 KB
/
oilproc.pig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/*
Pig file to input, process, and combine the input data for the gas price prediction model
5/20/2016
Martin John Madsen
*/
--We have eight data sets to load in
--The first data set needs to be changed to a weekly average
--We will join all the data sets by the calulated week and year
input1 = LOAD 'wasb://[email protected]/input_1_20140601/' USING PigStorage(',')
--input1 = LOAD 'input_1.txt' using PigStorage(',')
as (labela:chararray, labelb:chararray, labelc:chararray, datestr:chararray, freq, value1:float, unit);
input2 = LOAD 'wasb://[email protected]/input_2_20140601/' USING PigStorage(',')
--input2 = LOAD 'input_2.txt' using PigStorage(',')
as (labela:chararray, labelb:chararray, datestr:chararray, freq, value2:int, unit);
input3 = LOAD 'wasb://[email protected]/input_3_20140601/' USING PigStorage(',')
--input3 = LOAD 'input_3.txt' using PigStorage(',')
as (labela:chararray, labelb:chararray, datestr:chararray, freq, value3:float, unit);
input4 = LOAD 'wasb://[email protected]/input_4_20140601/' USING PigStorage(',')
--input4 = LOAD 'input_4.txt' using PigStorage(',')
as (labela:chararray, labelb:chararray, datestr:chararray, freq, value4:int, unit);
input5 = LOAD 'wasb://[email protected]/input_5_20140601/' USING PigStorage(',')
--input5 = LOAD 'input_5.txt' using PigStorage(',')
as (labela:chararray, labelb:chararray, datestr:chararray, freq, value5:float, unit);
--input6 = LOAD 'input_6.txt' using PigStorage(',')
-- as (labela:chararray, labelb:chararray, datestr:chararray, freq, value6:int, unit);
input7 = LOAD 'wasb://[email protected]/input_7_20140601/' USING PigStorage(',')
--input7 = LOAD 'input_7.txt' using PigStorage(',')
as (labela:chararray, labelb:chararray, datestr:chararray, freq, value7:int, unit);
input8 = LOAD 'wasb://[email protected]/input_8_20140601/' USING PigStorage(',')
--input8 = LOAD 'input_8.txt' using PigStorage(',')
as (labela:chararray, labelb:chararray, datestr:chararray, freq, value8:int, unit);
--Now we need to pull in the price data to get it organized in a similar manner
input0 = LOAD 'wasb://[email protected]/price_0_20140601/' using PigStorage(',')
as (labela:chararray, labelb:chararray, datestr:chararray, freq, value0:float, unit);
--fix the date column
input1a = foreach input1
generate ToDate(datestr,'yyyyMMdd')
as (date1:DateTime),value1;
input2a = foreach input2
generate ToDate(datestr,'yyyyMMdd')
as (date2:DateTime),value2;
input3a = foreach input3
generate ToDate(datestr,'yyyyMMdd')
as (date3:DateTime),value3;
input4a = foreach input4
generate ToDate(datestr,'yyyyMMdd')
as (date4:DateTime),value4;
input5a = foreach input5
generate ToDate(datestr,'yyyyMMdd')
as (date5:DateTime),value5;
--input6a = foreach input6
-- generate ToDate(datestr,'yyyyMMdd')
-- as (date6:DateTime),value6;
input7a = foreach input7
generate ToDate(datestr,'yyyyMMdd')
as (date7:DateTime),value7;
input8a = foreach input8
generate ToDate(datestr,'yyyyMMdd')
as (date8:DateTime),value8;
--The prices we are trying to predict actually happen a week after the rest of the variables.
--In order to match them up, we will subtract three days from the oil price date when computing the week
--This will give us the match we want
input0a = foreach input0
generate SubtractDuration( ToDate(datestr,'yyyyMMdd'), 'P3D') as (date0:DateTime),
ToDate(datestr,'yyyyMMdd') as (date0real:DateTime),
value0;
--Now get the week
input1b = foreach input1a generate
GetYear(date1) as year,
GetWeek(date1) as week,
date1,
value1;
input2b = foreach input2a generate
GetYear(date2) as year,
GetWeek(date2) as week,
date2,
value2;
input3b = foreach input3a generate
GetYear(date3) as year,
GetWeek(date3) as week,
date3,
value3;
input4b = foreach input4a generate
GetYear(date4) as year,
GetWeek(date4) as week,
date4,
value4;
input5b = foreach input5a generate
GetYear(date5) as year,
GetWeek(date5) as week,
date5,
value5;
--input6b = foreach input6a generate
-- GetYear(date6) as year,
-- GetWeek(date6) as week,
-- date6,
-- value6;
input7b = foreach input7a generate
GetYear(date7) as year,
GetWeek(date7) as week,
date7,
value7;
input8b = foreach input8a generate
GetYear(date8) as year,
GetWeek(date8) as week,
date8,
value8;
input0b = foreach input0a generate
GetYear(date0) as year,
GetWeek(date0) as week,
date0real as date0,
value0;
--Now group the spot prices by week
input1c = foreach (group input1b by (year, week) )
{
inner_row = ORDER input1b.date1 BY date1 DESC;
first_row = LIMIT inner_row 1;
generate
flatten ( group ) as (year, week),
flatten ( first_row ) as date1,
AVG(input1b.value1) as meanval1;
}
--update our key to a single field yeardate for future joining
input1d = foreach input1c generate
CONCAT ( (chararray) year , (chararray) week ) as yearweek,
date1,
week,
meanval1;
input2d = foreach input2b generate
CONCAT ( (chararray) year , (chararray) week ) as yearweek,
date2,
value2;
input3d = foreach input3b generate
CONCAT ( (chararray) year , (chararray) week ) as yearweek,
date3,
value3;
input4d = foreach input4b generate
CONCAT ( (chararray) year , (chararray) week ) as yearweek,
date4,
value4;
input5d = foreach input5b generate
CONCAT ( (chararray) year , (chararray) week ) as yearweek,
date5,
value5;
--input6d = foreach input6b generate
-- CONCAT ( (chararray) year , (chararray) week ) as yearweek,
-- date6,
-- value6;
input7d = foreach input7b generate
CONCAT ( (chararray) year , (chararray) week ) as yearweek,
date7,
value7;
input8d = foreach input8b generate
CONCAT ( (chararray) year , (chararray) week ) as yearweek,
date8,
value8;
input0d = foreach input0b generate
CONCAT ( (chararray) year , (chararray) week ) as yearweek,
date0,
value0;
input12 = join input1d by yearweek, input2d by yearweek;
input12a = foreach input12 generate input1d::yearweek as yearweek,date1,week,meanval1,value2;
input123 = join input12a by yearweek, input3d by yearweek;
input123a = foreach input123 generate input12a::yearweek as yearweek ,date1,week,meanval1,value2,value3;
input1234 = join input123a by yearweek, input4d by yearweek;
input1234a = foreach input1234 generate input123a::yearweek as yearweek ,date1,week,meanval1,value2,value3,value4;
input12345 = join input1234a by yearweek, input5d by yearweek;
input12345a = foreach input12345 generate input1234a::yearweek as yearweek ,date1,week,meanval1,value2,value3,value4,value5;
input123457 = join input12345a by yearweek, input7d by yearweek;
input123457a = foreach input123457 generate input12345a::yearweek as yearweek ,date1,week,meanval1,value2,value3,value4,value5,value7;
input1234578 = join input123457a by yearweek, input8d by yearweek;
input1234578a = foreach input1234578 generate input123457a::yearweek as yearweek ,date1,week,meanval1,value2,value3,value4,value5,value7,value8;
--Now we join the output column to everything before splitting the training/testing data
alldata = join input1234578a by yearweek, input0d by yearweek;
testdiv = filter alldata by date1 > ToDate('2015-9-1');
traindiv = filter alldata by date1 <= ToDate('2015-9-1');
testdata = order testdiv by date1 ASC;
traindata = order traindiv by date1 ASC;
trainfeatures = foreach traindata generate date1, week, meanval1, value2, value3, value4, value5, value7, value8;
testfeatures = foreach testdata generate date1, week, meanval1, value2, value3, value4, value5, value7, value8;
trainvalues = foreach traindata generate date1, date0, value0;
testvalues = foreach testdata generate date1, date0, value0;
STORE trainfeatures into 'wasb://[email protected]/trainfeatures.txt' USING PigStorage (',');
STORE testfeatures into 'wasb://[email protected]/testfeatures.txt' USING PigStorage (',');
STORE trainvalues into 'wasb://[email protected]/trainvalues.txt' USING PigStorage (',');
STORE testvalues into 'wasb://[email protected]/testvalues.txt' USING PigStorage (',');
--STORE trainfeatures into 'trainfeatures.txt' using PigStorage(',');
--STORE testfeatures into 'testfeatures.txt' using PigStorage(',');
--STORE trainvalues into 'trainvalue.txt' using PigStorage(',');
--STORE testvalues into 'testvalues.txt' using PigStorage(',');