Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupByInterval not working [Spark: 2.4.4] #5

Open
drahnreb opened this issue Nov 30, 2019 · 3 comments
Open

groupByInterval not working [Spark: 2.4.4] #5

drahnreb opened this issue Nov 30, 2019 · 3 comments

Comments

@drahnreb
Copy link

drahnreb commented Nov 30, 2019

Two joint dfs should be grouped by a third (clock) df.
But Error is thrown:
java.lang.NoClassDefFoundError: Could not initialize class com.twosigma.flint.rdd.function.group.Intervalize$

    724         with traceback_utils.SCCallSiteSync(self._sc) as css:
    725             tsrdd = self.timeSeriesRDD.groupByInterval(clock.timeSeriesRDD, scala_key,
--> 726                                                        inclusion, rounding)
    727         return TimeSeriesDataFrame._from_tsrdd(tsrdd, self.sql_ctx)
    728 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3361.groupByInterval.
: java.lang.NoClassDefFoundError: Could not initialize class com.twosigma.flint.rdd.function.group.Intervalize$
	at com.twosigma.flint.rdd.OrderedRDD.intervalize(OrderedRDD.scala:560)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.summarizeIntervals(TimeSeriesRDD.scala:1605)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.groupByInterval(TimeSeriesRDD.scala:1493)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce

from datetime import datetime
import numpy as np

import ts.flint
from ts.flint import FlintContext
flintContext = FlintContext(sqlContext)

values = {'time': [1543672800000, 1543672800100, 1543672800260],
         'F_1': [22, 38, 26],
         'F.2': [7.3, 71.3, 7.9]}

states = {'time': [1543672800100, 1543672800200, 1543672800300],
         'types': ["0", "24", "42"],
         'state': ["False", "True", "True"]}

stops = {'time': [1543672800150, 1543672800200, 1543672800300, 43672800360]}

states_pd = pd.DataFrame(states, columns=states.keys())
values_pd = pd.DataFrame(values, columns=values.keys())
stops_pd = pd.DataFrame(stops, columns=stops.keys())
states_pd['time'] = pd.to_datetime(states_pd['time'], unit='ms', origin='unix')
values_pd['time'] = pd.to_datetime(values_pd['time'], unit='ms', origin='unix')
stops_pd['time'] = pd.to_datetime(stops_pd['time'], unit='ms', origin='unix')

states_df = spark.createDataFrame(states_pd)
values_df = spark.createDataFrame(values_pd)
stops_df = spark.createDataFrame(stops_pd)

# Convert to Flint DataFrame
flint_df1, flint_states, flint_stops = [flintContext.read \
              .option("isSorted", False) \
              .option("timeColumn", 'time') \
              .option("timeUnit", 'ms') \
              .dataframe(
                # https://github.com/twosigma/flint:
                # 'To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType'
                df.withColumn("time", (df.time.cast('double')*1000).cast("long"))
              ) for df in [values_df, states_df, stops_df]]

### combine data
tolerance = '100ms' #exact or '100ms'
data_joined = flint_df1.futureLeftJoin(flint_states, tolerance=tolerance)

data_joined.show()
+--------------------+---+----+-----+-----+
| time|F_1| F.2|types|state|
+--------------------+---+----+-----+-----+
| 2018-12-01 14:00:00.000| 22| 7.3| 0|False|
|2018-12-01 14:00:00.100| 38|71.3| 0|False|
|2018-12-01 14:00:00.260| 26| 7.9| 42| True|
+--------------------+---+----+-----+-----+

### cut into bins
data_binned = data_joined.groupByInterval(flint_stops.select('time'),
                                                 inclusion='begin') # automatically uses col 'time'
@drahnreb
Copy link
Author

drahnreb commented Dec 3, 2019

Troubleshooting suggests, that this has to do with the non-supported spark version and wrong build in which the Intervalize class is missing.
After compiling a version for spark 2.4 the groupByInterval works without problems on a local cluster.

However, could you please document the necessary code changes of ts-flint to build a databricks version. Thanks.

@drahnreb
Copy link
Author

drahnreb commented Dec 4, 2019

I managed to build it for the current databricks runtimes (5.3 and above; tested with 6.1)
Steps to reproduce:
0. Requirements: install sbt

  1. git clone official ts-flint repository with latest commit 40fd887.

  2. cd into cloned repo top-level-dir and check specified versions in build.sbt
    e.g. changes made:
    line 34:
    scalaVersion := "2.11.12",
    line 50 (windows specific):
    "Local Maven Repository" at "file:///" + Path.userHome.absolutePath + "/.m2/repository",
    line 61:
    val spark = "2.4.4"
    line 65:
    val arrow = "0.12.0"

  3. add env variable (based on #71):

export TERM=xterm-color
  1. build from source (scala):
sbt assemblyNoTest

compiled .jar that runs under current databricks runtimes

@drahnreb
Copy link
Author

drahnreb commented Dec 4, 2019

PR: #6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant