Skip to content

Commit 3dac7f5

Browse files
committed
Merge branch 'branch-1.4' into ddf-for-1.5
2 parents fe1df5e + d781f9d commit 3dac7f5

File tree

4 files changed

+84
-37
lines changed

4 files changed

+84
-37
lines changed

jdbc/src/main/java/io/ddf/jdbc/JDBCDDFManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,12 @@ public JDBCDDFManager(DataSourceDescriptor dataSourceDescriptor, String
8282

8383
if (engineType.equals("sfdc")) {
8484
// Special handler for sfdc connection string, add RTK (for cdata driver)
85-
URI uriWithRTK = new URI(mJdbcDataSource
86-
.getDataSourceUri().getUri().toString()
87-
+ "RTK='" + cdata.jdbc.salesforce.SalesforceDriver.getRTK()+"';");
88-
mJdbcDataSource.getDataSourceUri().setUri(uriWithRTK);
85+
String rtkString = System.getenv("SFDC_RTK");
86+
String uriString = mJdbcDataSource.getDataSourceUri().getUri().toString();
87+
if (!Strings.isNullOrEmpty(rtkString)) {
88+
uriString += "RTK='" + rtkString + "';";
89+
}
90+
mJdbcDataSource.getDataSourceUri().setUri(new URI(uriString));
8991
}
9092

9193
this.setDataSourceDescriptor(dataSourceDescriptor);

spark/src/main/scala/io/ddf/spark/etl/DateTimeExtractUDF.scala

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,21 @@ import org.apache.spark.sql.SQLContext
88
object DateTimeExtractUDF {
99
val extractDateTime: (Object, String) => Integer = {
1010
(obj: Object, field: String) => {
11-
val dateTime = Utils.toDateTimeObject(obj)
12-
if(dateTime != null) {
13-
val intValue = field.toLowerCase match {
14-
case "year" => dateTime.getYear
15-
case "month" => dateTime.getMonthOfYear
16-
case "weekyear" => dateTime.getWeekyear
17-
case "weekofweekyear" => dateTime.getWeekOfWeekyear
18-
case "day" => dateTime.getDayOfMonth
19-
case "dayofweek" => dateTime.getDayOfWeek
20-
case "dayofyear" => dateTime.getDayOfYear
21-
case "hour" => dateTime.getHourOfDay
22-
case "minute" => dateTime.getMinuteOfHour
23-
case "second" => dateTime.getSecondOfMinute
24-
case "millisecond" => dateTime.getMillisOfSecond
25-
}
26-
new Integer(intValue)
27-
} else {
28-
null
11+
field.toLowerCase match {
12+
case "year" => DateUDF.parseYear(obj)
13+
case "month" => DateUDF.parseMonth(obj)
14+
case "quarter" => DateUDF.parseQuarter(obj)
15+
case "weekyear" => DateUDF.parseWeekYear(obj)
16+
case "weekofyear" => DateUDF.parseWeekOfYear(obj)
17+
case "weekofweekyear" => DateUDF.parseWeekOfYear(obj)
18+
case "day" => DateUDF.parseDay(obj)
19+
case "dayofweek" => DateUDF.parseDayOfWeek(obj)
20+
case "dayofyear" => DateUDF.parseDayOfYear(obj)
21+
case "hour" => DateUDF.parseHour(obj)
22+
case "minute" => DateUDF.parseMinute(obj)
23+
case "second" => DateUDF.parseSecond(obj)
24+
case "millisecond" => DateUDF.parseMillisecond(obj)
25+
case _ => null
2926
}
3027
}
3128
}

spark/src/main/scala/io/ddf/spark/etl/DateUDFs.scala

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@ import org.apache.spark.sql.types.DataTypes
88
*/
99
object DateUDF {
1010

11-
val parseYear: (String => Integer) = (year: String) => {
12-
val datetime = Utils.toDateTimeObject(year)
13-
if(datetime != null) {
14-
datetime.getYear
15-
} else {
16-
null
11+
val parseYear: (Object => Integer) = {
12+
(obj: Object) => {
13+
val datetime = Utils.toDateTimeObject(obj)
14+
if(datetime != null) {
15+
datetime.getYear
16+
} else {
17+
null
18+
}
1719
}
20+
1821
}
1922

2023
val parseHour: Object => Integer = {
@@ -39,6 +42,29 @@ object DateUDF {
3942
}
4043
}
4144

45+
val parseQuarter: Object => Integer = {
46+
(obj: Object) => {
47+
val dateTime = Utils.toDateTimeObject(obj)
48+
if(dateTime != null) {
49+
val month = dateTime.getMonthOfYear
50+
if (month >= 1 && month <= 3) {
51+
1
52+
} else if (month >= 4 && month <= 6) {
53+
2
54+
} else if (month >= 7 && month <= 9) {
55+
3
56+
} else if (month >= 10 && month <= 12) {
57+
4
58+
} else {
59+
null
60+
}
61+
62+
} else {
63+
null
64+
}
65+
}
66+
}
67+
4268
val parseDayOfWeekAsText: (Object, String) => String = {
4369
(obj: Object, format: String) => {
4470
val dateTime = Utils.toDateTimeObject(obj)
@@ -80,7 +106,7 @@ object DateUDF {
80106
}
81107
}
82108

83-
val parseWeekOfYear: (Object) => Integer = {
109+
val parseWeekYear: (Object) => Integer = {
84110
(obj: Object) => {
85111
val dateTime = Utils.toDateTimeObject(obj)
86112
if(dateTime !=null) {
@@ -91,7 +117,7 @@ object DateUDF {
91117
}
92118
}
93119

94-
val parseWeekOfWeekYear: Object => Integer = {
120+
val parseWeekOfYear: Object => Integer = {
95121
(obj: Object) => {
96122
val dateTime = Utils.toDateTimeObject(obj)
97123
if(dateTime != null) {
@@ -159,10 +185,12 @@ object DateUDF {
159185

160186
def registerUDFs(sQLContext: SQLContext) = {
161187
sQLContext.udf.register("year", parseYear)
188+
sQLContext.udf.register("quarter", parseQuarter)
162189
sQLContext.udf.register("month", parseMonth)
163190
sQLContext.udf.register("month_as_text", parseMonthAsText)
164-
sQLContext.udf.register("weekyear", parseWeekOfYear)
165-
sQLContext.udf.register("weekofweekyear", parseWeekOfWeekYear)
191+
sQLContext.udf.register("weekyear", parseWeekYear)
192+
sQLContext.udf.register("weekofyear", parseWeekOfYear)
193+
sQLContext.udf.register("weekofweekyear", parseWeekOfYear)
166194
sQLContext.udf.register("day", parseDay)
167195
sQLContext.udf.register("dayofweek", parseDayOfWeek)
168196
sQLContext.udf.register("dayofweek_as_text", parseDayOfWeekAsText)

spark/src/test/java/io/ddf/spark/etl/UDFTest.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,17 @@ public void testDateTimeExtract() throws DDFException {
111111
System.out.println(rows.get(0));
112112
Assert.assertTrue(Integer.parseInt(rows.get(0))== 2015);
113113

114+
ddf3 = ddf.sql2ddf("select extract('2015-01-22 20:23 +0000', 'quarter') from @this");
115+
rows = ddf3.VIEWS.head(1);
116+
System.out.println(rows.get(0));
117+
Assert.assertTrue(Integer.parseInt(rows.get(0))== 1);
118+
114119
ddf3 = ddf.sql2ddf("select extract('2015-01-22 20:23 +0000', 'weekyear') from @this");
115120
rows = ddf3.VIEWS.head(1);
116121
System.out.println(rows.get(0));
117122
Assert.assertTrue(Integer.parseInt(rows.get(0))== 2015);
118123

119-
ddf3 = ddf.sql2ddf("select extract('2015-01-22 20:23 +0000', 'weekofweekyear') from @this");
124+
ddf3 = ddf.sql2ddf("select extract('2015-01-22 20:23 +0000', 'weekofyear') from @this");
120125
rows = ddf3.VIEWS.head(1);
121126
System.out.println(rows.get(0));
122127
Assert.assertTrue(Integer.parseInt(rows.get(0))== 4);
@@ -167,7 +172,7 @@ public void testDateTimeExtract() throws DDFException {
167172
System.out.println(rows.get(0));
168173
Assert.assertTrue(Integer.parseInt(rows.get(0))== 2015);
169174

170-
ddf3 = ddf.sql2ddf("select extract(1433386800, 'weekofweekyear') from @this");
175+
ddf3 = ddf.sql2ddf("select extract(1433386800, 'weekofyear') from @this");
171176
rows = ddf3.VIEWS.head(1);
172177
System.out.println(rows.get(0));
173178
Assert.assertTrue(Integer.parseInt(rows.get(0))== 23);
@@ -177,6 +182,11 @@ public void testDateTimeExtract() throws DDFException {
177182
System.out.println(rows.get(0));
178183
Assert.assertTrue(Integer.parseInt(rows.get(0))== 6);
179184

185+
ddf3 = ddf.sql2ddf("select extract(1433386800, 'quarter') from @this");
186+
rows = ddf3.VIEWS.head(1);
187+
System.out.println(rows.get(0));
188+
Assert.assertTrue(Integer.parseInt(rows.get(0))== 2);
189+
180190
ddf3 = ddf.sql2ddf("select extract(1433386800, 'day') from @this");
181191
rows = ddf3.VIEWS.head(1);
182192
System.out.println(rows.get(0));
@@ -223,7 +233,7 @@ public void testDateTimeExtract() throws DDFException {
223233
System.out.println(rows.get(0));
224234
Assert.assertTrue(Integer.parseInt(rows.get(0))== 2101);
225235

226-
ddf3 = ddf.sql2ddf("select extract(4147483647, 'weekofweekyear') from @this");
236+
ddf3 = ddf.sql2ddf("select extract(4147483647, 'weekofyear') from @this");
227237
rows = ddf3.VIEWS.head(1);
228238
System.out.println(rows.get(0));
229239
Assert.assertTrue(Integer.parseInt(rows.get(0))== 23);
@@ -233,6 +243,11 @@ public void testDateTimeExtract() throws DDFException {
233243
System.out.println(rows.get(0));
234244
Assert.assertTrue(Integer.parseInt(rows.get(0))== 6);
235245

246+
ddf3 = ddf.sql2ddf("select extract(4147483647, 'quarter') from @this");
247+
rows = ddf3.VIEWS.head(1);
248+
System.out.println(rows.get(0));
249+
Assert.assertTrue(Integer.parseInt(rows.get(0))== 2);
250+
236251
ddf3 = ddf.sql2ddf("select extract(4147483647, 'day') from @this");
237252
rows = ddf3.VIEWS.head(1);
238253
System.out.println(rows.get(0));
@@ -275,7 +290,12 @@ public void testIndividualDateTimeExtractUDFs() throws DDFException {
275290
DDF ddf3 = ddf.sql2ddf("select year('2015-01-22 20:23 +0000') from @this");
276291
List<String> rows = ddf3.VIEWS.head(1);
277292
System.out.println(rows.get(0));
278-
Assert.assertTrue(Integer.parseInt(rows.get(0)) == 2015);
293+
294+
295+
ddf3 = ddf.sql2ddf("select quarter('2015-01-22 20:23 +0000') from @this");
296+
rows = ddf3.VIEWS.head(1);
297+
System.out.println(rows.get(0));
298+
Assert.assertTrue(Integer.parseInt(rows.get(0)) == 1);
279299

280300
ddf3 = ddf.sql2ddf("select month('2015-01-22 20:23 +0000') from @this");
281301
rows = ddf3.VIEWS.head(1);
@@ -302,7 +322,7 @@ public void testIndividualDateTimeExtractUDFs() throws DDFException {
302322
System.out.println(rows.get(0));
303323
Assert.assertTrue(Integer.parseInt(rows.get(0)) == 2015);
304324

305-
ddf3 = ddf.sql2ddf("select weekofweekyear('2015-01-22 20:23 +0000') from @this");
325+
ddf3 = ddf.sql2ddf("select weekofyear('2015-01-22 20:23 +0000') from @this");
306326
rows = ddf3.VIEWS.head(1);
307327
System.out.println(rows.get(0));
308328
Assert.assertTrue(Integer.parseInt(rows.get(0)) == 4);

0 commit comments

Comments
 (0)