Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dayofyear, weekofyear, month, dayofmonth, minute, second, next_da… #268

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,95 @@ trait NonAggregateFunctions {

/** Non-Aggregate function: Extracts the year as an integer from a given date/timestamp/string.
*
* Differs from `Column#year` by wrapping it's result into an `Option`.
* Differs from `Column#year` by wrapping it's result into an `Option` in case column
* cannot be parsed into valid date.
*
* apache/spark
*/
def year[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(untyped.year(str.untyped))
}
def year[T](date: AbstractTypedColumn[T, String]): date.ThisType[T, Option[Int]] =
date.typed(untyped.year(date.untyped))

/** Non-Aggregate function: Extracts the day of the year as an integer from a given date/timestamp/string.
*
* Differs from `Column#dayofyear` by wrapping it's result into an `Option` in case the column
* cannot be parsed into valid date.
*
* apache/spark
*/
def dayofyear[T](date: AbstractTypedColumn[T, String]): date.ThisType[T, Option[Int]] =
date.typed(untyped.dayofyear(date.untyped))

/** Non-Aggregate function: Extracts the week number as an integer from a given date/timestamp/string.
*
* Differs from `Column#weekofyear` by wrapping it's result into an `Option` in case the column
* cannot be parsed into valid date.
*
* apache/spark
*/
def weekofyear[T](date: AbstractTypedColumn[T, String]): date.ThisType[T, Option[Int]] =
date.typed(untyped.weekofyear(date.untyped))

/** Non-Aggregate function: Extracts the month as an integer from a given date/timestamp/string.
*
* Differs from `Column#month` by wrapping it's result into an `Option` in case the column
* cannot be parsed into valid date.
*
* apache/spark
*/
def month[T](date: AbstractTypedColumn[T, String]): date.ThisType[T, Option[Int]] =
date.typed(untyped.month(date.untyped))

/** Non-Aggregate function: Extracts the day of the month as an integer from a given date/timestamp/string.
*
* Differs from `Column#dayofmonth` by wrapping it's result into an `Option` in case the column
* cannot be parsed into valid date.
*
* apache/spark
*/
def dayofmonth[T](date: AbstractTypedColumn[T, String]): date.ThisType[T, Option[Int]] =
date.typed(untyped.dayofmonth(date.untyped))

/** Non-Aggregate function: Extracts the minutes as an integer from a given date/timestamp/string.
*
* Differs from `Column#minute` by wrapping it's result into an `Option` in case the column
* cannot be parsed into valid date.
*
* apache/spark
*/
def minute[T](date: AbstractTypedColumn[T, String]): date.ThisType[T, Option[Int]] =
date.typed(untyped.minute(date.untyped))

/** Non-Aggregate function: Extracts the seconds as an integer from a given date/timestamp/string.
*
* Differs from `Column#second` by wrapping it's result into an `Option` in case the column
* cannot be parsed into valid date.
*
* apache/spark
*/
def second[T](date: AbstractTypedColumn[T, String]): date.ThisType[T, Option[Int]] =
date.typed(untyped.second(date.untyped))

/**
* Non-Aggregate function: Given a date column, returns the first date which is later than the value
* of the date column that is on the specified day of the week.
*
* For example, `next_day('2015-07-27', "Sunday")` returns 2015-08-02 because that is the first
* Sunday after 2015-07-27.
*
* Day of the week parameter is case insensitive, and accepts:
* "Su", "Sun", "Sunday",
* "Mo", "Mon", "Monday",
* "Tu", "Tue", "Tuesday",
* "We", "Wed", "Wednesday",
* "Th", "Thu", "Thursday",
* "Fr", "Fri", "Friday",
* "Sa", "Sat", "Saturday".
*
* Differs from `Column#next_day` by wrapping it's result into an `Option` in case the column
* cannot be parsed into valid date.
*
* apache/spark
*/
def next_day[T](date: AbstractTypedColumn[T, String], dayOfWeek: String): date.ThisType[T, Option[java.sql.Date]] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it doesn't compile.
In Spark it returns java.sql.Date, I'm not sure whether I should add TypedEncoder for that or use something else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any issue with an encoder for java.sql.Date, if this is what's return in vanilla we can simply follow.

date.typed(untyped.next_day(date.untyped, dayOfWeek))
}
Original file line number Diff line number Diff line change
Expand Up @@ -1126,34 +1126,160 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite {
//check(forAll(prop[Option[Vector[Boolean]], Long] _))
}

def propYear(typedDS: TypedDataset[X1[String]])
(typedCol: TypedColumn[X1[String], Option[Int]], sparkFunc: Column => Column): Prop = {
val spark = session
import spark.implicits._

val nullHandler: Row => Option[Int] = _.get(0) match {
case i: Int => Some(i)
case _ => None
}

val sparkResult = typedDS.dataset
.select(sparkFunc($"a"))
.map(nullHandler)
.collect()
.toList

val typed = typedDS
.select(typedCol)
.collect()
.run()
.toList

typed ?= sparkResult
}

test("year") {
val spark = session
import spark.implicits._

val nullHandler: Row => Option[Int] = _.get(0) match {
case i: Int => Some(i)
case _ => None
def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
propYear(ds)(year(ds[String]('a)), untyped.year)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("dayofyear") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
val ds = TypedDataset.create(data)
propYear(ds)(dayofyear(ds[String]('a)), untyped.dayofyear)
}

val sparkResult = ds.toDF()
.select(untyped.year($"a"))
.map(nullHandler)
.collect()
.toList
check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

val typed = ds
.select(year(ds[String]('a)))
.collect()
.run()
.toList
test("weekofyear") {
val spark = session
import spark.implicits._

typed ?= sparkResult
}
def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
propYear(ds)(weekofyear(ds[String]('a)), untyped.weekofyear)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("month") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
propYear(ds)(month(ds[String]('a)), untyped.month)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("dayofmonth") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
propYear(ds)(dayofmonth(ds[String]('a)), untyped.dayofmonth)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("minute") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
propYear(ds)(minute(ds[String]('a)), untyped.minute)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("second") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
propYear(ds)(second(ds[String]('a)), untyped.second)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("next_day") {
val spark = session
import spark.implicits._

val weekDays = List(
"SU", "SUN", "SUNDAY",
"MO", "MON", "MONDAY",
"TU", "TUE", "TUESDAY",
"WE", "WED", "WEDNESDAY",
"TH", "THU", "THURSDAY",
"FR", "FRI", "FRIDAY",
"SA", "SAT", "SATURDAY"
)

val nullHandler: Row => Option[java.sql.Date] = _.get(0) match {
case d: java.sql.Date => Some(d)
case _ => None
}

def prop(data: List[X1[String]], dayOfWeek: String)(implicit E: Encoder[Option[java.sql.Date]]): Prop = {
val typedDS = TypedDataset.create(data)

val sparkResult = typedDS.toDF()
.select(untyped.next_day($"a", dayOfWeek))
.map(nullHandler)
.collect()
.toList

val typed = typedDS
.select(next_day(typedDS[String]('a), dayOfWeek))
.collect()
.run()
.toList

typed ?= sparkResult
}

check(forAll(dateTimeStringGen, Gen.oneOf(weekDays))((data, dayOfWeek) => prop(data.map(X1.apply), dayOfWeek)))
check(forAll(prop _))
}
}