diff --git a/build.gradle b/build.gradle index de0b2e32..260e4a73 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ apply plugin: 'com.github.ben-manes.versions' ext.configDir = new File(rootDir, 'config') ext.hadoopBinaries = "${rootDir}/hadoop-binaries".toString() -ext.javaDriverVersion = '3.2.1' +ext.javaDriverVersion = '3.12.5' ext.hiveVersion = System.getenv("HIVE_VERSION") ?: '1.2.1' ext.pigVersion = System.getenv("PIG_VERSION") ?: '0.15.0' ext.hadoopVersion = System.getenv("HADOOP_VERSION") ?: '2.7.2' @@ -58,7 +58,7 @@ if(project.hasProperty("clusterVersion")) { } allprojects { - version = '2.0.2' + version = '2.0.3' group = 'org.mongodb.mongo-hadoop' } @@ -184,7 +184,7 @@ configure(subprojects) { options.links 'http://hadoop.apache.org/docs/r2.7.2/api' options.links 'http://api.mongodb.org/java/3.2/' if (JavaVersion.current().isJava8Compatible()) { - options.addStringOption('Xdoclint:none', '-quiet') + options.addStringOption('Xdoclint:none', '-quiet') } } @@ -196,9 +196,9 @@ configure(subprojects) { // tests testJar // } - test { - dependsOn 'jar', 'testsJar', ':startCluster', ':downloadEnronEmails' - } +// test { + // dependsOn 'jar', 'testsJar', ':startCluster', ':downloadEnronEmails' + // } modifyPom { project { @@ -505,4 +505,4 @@ task cleanLogs(type: Delete) { task cleanHadoop(type: Delete, dependsOn: cleanLogs) { delete hadoopHome, hiveHome, pigHome -} +} \ No newline at end of file diff --git a/core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java b/core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java index 9582e17e..e5e2b113 100644 --- a/core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java +++ b/core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java @@ -210,18 +210,18 @@ public void setSkip(final Integer skip) { @Override public void write(final DataOutput out) throws IOException { BSONObject spec = BasicDBObjectBuilder.start() - .add("inputURI", getInputURI().toString()) - .add("authURI", getAuthURI() != null ? getAuthURI().toString() : null) - .add("keyField", getKeyField()) - .add("fields", getFields()) - .add("query", getQuery()) - .add("sort", getSort()) - .add("min", getMin()) - .add("max", getMax()) - .add("notimeout", getNoTimeout()) - .add("limit", limit) - .add("skip", skip) - .get(); + .add("inputURI", getInputURI().toString()) + .add("authURI", getAuthURI() != null ? getAuthURI().toString() : null) + .add("keyField", getKeyField()) + .add("fields", getFields()) + .add("query", getQuery()) + .add("sort", getSort()) + .add("min", getMin()) + .add("max", getMax()) + .add("notimeout", getNoTimeout()) + .add("limit", limit) + .add("skip", skip) + .get(); byte[] buf = _bsonEncoder.encode(spec); out.write(buf); } @@ -278,7 +278,7 @@ public DBCursor getCursor() { coll = MongoConfigUtil.getCollection(this.inputURI); } - this.cursor = coll.find(this.query, this.fields).sort(this.sort); + this.cursor = coll.find(this.query, this.fields).hint(new BasicDBObject("_id", 1)).sort(this.sort); if (this.notimeout) { this.cursor.setOptions(Bytes.QUERYOPTION_NOTIMEOUT); } @@ -301,21 +301,21 @@ public DBCursor getCursor() { @Override public String toString() { String result = - "MongoInputSplit{inputURI hosts=" + this.inputURI.getHosts() - + ", inputURI namespace=" + this.inputURI.getDatabase() + "." - + this.inputURI.getCollection(); + "MongoInputSplit{inputURI hosts=" + this.inputURI.getHosts() + + ", inputURI namespace=" + this.inputURI.getDatabase() + "." + + this.inputURI.getCollection(); if (authURI != null) { result += "authURI hosts=" + authURI.getHosts() - + ", authURI database=" + authURI.getDatabase(); + + ", authURI database=" + authURI.getDatabase(); } return result - + ", min=" + this.min + ", max=" + this.max - + ", query=" + this.query - + ", sort=" + this.sort - + ", fields=" + this.fields - + ", limit=" + this.limit - + ", skip=" + this.skip - + ", notimeout=" + this.notimeout + '}'; + + ", min=" + this.min + ", max=" + this.max + + ", query=" + this.query + + ", sort=" + this.sort + + ", fields=" + this.fields + + ", limit=" + this.limit + + ", skip=" + this.skip + + ", notimeout=" + this.notimeout + '}'; } @Override @@ -364,14 +364,14 @@ public boolean equals(final Object o) { return false; } if (limit == null && that.getLimit() != null - || !limit.equals(that.getLimit())) { + || !limit.equals(that.getLimit())) { return false; } if (skip == null && that.getSkip() != null - || !skip.equals(that.getSkip())) { + || !skip.equals(that.getSkip())) { return false; } return true; } -} +} \ No newline at end of file diff --git a/gradle/functions.gradle b/gradle/functions.gradle index 20b69fcc..5aae665f 100644 --- a/gradle/functions.gradle +++ b/gradle/functions.gradle @@ -3,7 +3,7 @@ import org.apache.tools.ant.filters.ReplaceTokens def downloadFile(url) { def tmpDir = new File(System.properties['java.io.tmpdir']) def file = new File(tmpDir, new File(new URL(url).getPath()).getName()) - + def count = 0; while (!file.exists()) { try { @@ -50,6 +50,7 @@ task installPig() << { } task downloadEnronEmails() << { + //extract(dataHome, dataHome, downloadFile('https://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/mongodb-enron-email/enron_mongo.tar.bz2')) extract(dataHome, dataHome, downloadFile('https://s3.amazonaws.com/mongodb-enron-email/enron_mongo.tar.bz2')) } @@ -75,7 +76,7 @@ task copyFiles(dependsOn: [installHadoop, installHive, installPig]) << { safeCopy("streaming/build/libs/mongo-hadoop-streaming-${project(':core').version}.jar", hadoopLib, "mongo-hadoop-streaming.jar") safeCopy("hive/build/libs/mongo-hadoop-hive-${project(':core').version}.jar", hiveHome + '/lib', "mongo-hadoop-hive.jar") safeCopy(findJar(":core", "mongo-java-driver"), hadoopLib, "mongo-java-driver.jar") - + println "Updating cluster configuration" copy { from 'clusterConfigs' @@ -109,4 +110,4 @@ def safeCopy(fromPath, toPath, newName) { if (!copied) { throw new GradleException("Failed to copy a file: " + fromPath, new FileNotFoundException(fromPath)) } -} +} \ No newline at end of file diff --git a/gradle/hadoop.gradle b/gradle/hadoop.gradle index 2f56f284..75da0c12 100644 --- a/gradle/hadoop.gradle +++ b/gradle/hadoop.gradle @@ -14,10 +14,10 @@ def execute(command, args = [], outStream = null, errStream = null, background = env << System.getProperties() def executor = new org.zeroturnaround.exec.ProcessExecutor().command([command.toString()] + args) - .readOutput(true) - .environment(env) - .redirectOutput(outStream) - .redirectError(errStream != null ? errStream : outStream); + .readOutput(true) + .environment(env) + .redirectOutput(outStream) + .redirectError(errStream != null ? errStream : outStream); if (!background) { try { def result = executor.execute(); @@ -39,7 +39,7 @@ def stopService(signal, service, name) { if (Os.isFamily(Os.FAMILY_WINDOWS)) { if (signal == 'TERM') { execute("taskkill", "/PID", process.split()[0]) - } else{ + } else{ execute("taskkill", "/F", "/PID", process.split()[0]) } }else{ @@ -139,11 +139,11 @@ task historicalYield(dependsOn: startCluster) << { exec() { commandLine mongoimport, "-d", "mongo_hadoop", "-c", "yield_historical.in", "--drop", - "examples/treasury_yield/src/main/resources/yield_historical_in.json" + "examples/treasury_yield/src/main/resources/yield_historical_in.json" } hadoop("examples/treasury_yield/build/libs/treasury_yield-${project(':core').version}.jar", - "com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig", [ + "com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig", [ "mongo.input.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.in", "mongo.output.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.out" ]) @@ -151,14 +151,14 @@ task historicalYield(dependsOn: startCluster) << { task sensorData(dependsOn: 'startCluster') << { hadoop("examples/sensors/build/libs/sensors-${project(':core').version}.jar", - "com.mongodb.hadoop.examples.sensors.Devices", []) + "com.mongodb.hadoop.examples.sensors.Devices", []) hadoop("examples/sensors/build/libs/sensors-${project(':core').version}.jar", - "com.mongodb.hadoop.examples.sensors.Logs", ["io.sort.mb=100"]) + "com.mongodb.hadoop.examples.sensors.Logs", ["io.sort.mb=100"]) } - -task enronEmails(dependsOn: [downloadEnronEmails, startCluster]) << { +//downloadEnronEmails +task enronEmails(dependsOn: [startCluster]) << { // Create BSON file input directory. exec() { commandLine "${hadoopHome}/bin/hdfs", "dfs", "-mkdir", "-p", "/messages" @@ -170,8 +170,8 @@ task enronEmails(dependsOn: [downloadEnronEmails, startCluster]) << { "-c", "org.apache.hadoop.io.compress.BZip2Codec", "-o", "hdfs://localhost:8020/messages"]) // MR job - hadoop("examples/enron/build/libs/enron-${project(':core').version}.jar", - "com.mongodb.hadoop.examples.enron.EnronMail", []) + //hadoop("examples/enron/build/libs/enron-${project(':core').version}.jar", + // "com.mongodb.hadoop.examples.enron.EnronMail", []) } task shakespeare(dependsOn: [downloadShakespeare, startCluster]) << { @@ -214,4 +214,4 @@ def hadoop(jar, className, args, commandArgs = []) { environment << hadoopEnv commandLine line } -} +} \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 0087cd3b..758de960 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 58b57c8d..2d80b69a 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,5 @@ -#Mon Mar 09 18:25:42 PDT 2015 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.8.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-all.zip diff --git a/gradlew b/gradlew index 91a7e269..6c291a37 100755 --- a/gradlew +++ b/gradlew @@ -161,4 +161,4 @@ function splitJvmOpts() { eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" -exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" +exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain \ No newline at end of file diff --git a/gradlew.bat b/gradlew.bat index 8a0b282a..c8be9359 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,90 +1,90 @@ -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windowz variants - -if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windowz variants + +if not "%OS%" == "Windows_NT" goto win9xME_args +if "%@eval[2+2]" == "4" goto 4NT_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* +goto execute + +:4NT_args +@rem Get arguments from the 4NT Shell from JP Software +set CMD_LINE_ARGS=%$ + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega \ No newline at end of file diff --git a/hive/src/main/java/com/mongodb/hadoop/hive/BSONSerDe.java b/hive/src/main/java/com/mongodb/hadoop/hive/BSONSerDe.java index 00573ed0..7d451453 100644 --- a/hive/src/main/java/com/mongodb/hadoop/hive/BSONSerDe.java +++ b/hive/src/main/java/com/mongodb/hadoop/hive/BSONSerDe.java @@ -22,7 +22,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.SerDe; +//import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; @@ -62,7 +63,8 @@ * The BSONSerDe class deserializes (parses) and serializes object from BSON to Hive represented object. It's initialized with the hive * columns and hive recognized types as well as other config variables mandated by the StorageHanders. */ -public class BSONSerDe implements SerDe { +//public class BSONSerDe implements SerDe { +public class BSONSerDe extends AbstractSerDe { private static final Log LOG = LogFactory.getLog(BSONSerDe.class); // stores the 1-to-1 mapping of MongoDB fields to hive columns @@ -85,7 +87,7 @@ public class BSONSerDe implements SerDe { public Map hiveToMongo; //CHECKSTYLE:ON - // A row represents a row in the Hive table + // A row represents a row in the Hive table private List row = new ArrayList(); // BSONWritable to hold documents to be serialized. @@ -124,9 +126,9 @@ public void initialize(final Configuration conf, final Properties tblProps) thro // Get the structure and object inspector docTypeInfo = - (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); docOI = - TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(docTypeInfo); + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(docTypeInfo); // Create the BSONWritable instance for future use. bsonWritable = new BSONWritable(); @@ -209,8 +211,8 @@ public Object deserialize(final Writable writable) throws SerDeException { mongoMapping = fieldName; } else { mongoMapping = hiveToMongo.containsKey(fieldName) - ? hiveToMongo.get(fieldName) - : fieldName; + ? hiveToMongo.get(fieldName) + : fieldName; } value = deserializeField(getValue(doc, mongoMapping), fieldTypeInfo, fieldName); } catch (Exception e) { @@ -250,7 +252,7 @@ public Object deserializeField(final Object value, final TypeInfo valueTypeInfo, case PRIMITIVE: return deserializePrimitive(value, (PrimitiveTypeInfo) valueTypeInfo); case STRUCT: - // Supports both struct and map, but should use struct + // Supports both struct and map, but should use struct return deserializeStruct(value, (StructTypeInfo) valueTypeInfo, ext); case UNION: // Mongo also has no union @@ -306,7 +308,7 @@ private Object deserializeStruct(final Object value, final StructTypeInfo valueT for (int i = 0; i < structNames.size(); i++) { String fieldName = structNames.get(i).toLowerCase(); - // hiveMapping -> prefixed by parent struct names. + // hiveMapping -> prefixed by parent struct names. // For example, in {"wife":{"name":{"first":"Sydney"}}}, // the hiveMapping of "first" is "wife.name.first" String hiveMapping = ext.length() == 0 ? fieldName : ext + "." + fieldName; @@ -320,8 +322,8 @@ private Object deserializeStruct(final Object value, final StructTypeInfo valueT mongoMapping = hiveToMongo.get(hiveMapping); } else { mongoMapping = ext.length() > 0 && hiveToMongo.containsKey(ext) - ? hiveToMongo.get(ext) + "." + fieldName - : hiveMapping; + ? hiveToMongo.get(ext) + "." + fieldName + : hiveMapping; } } @@ -483,7 +485,7 @@ public Class getSerializedClass() { @Override public Writable serialize(final Object obj, final ObjectInspector oi) throws SerDeException { bsonWritable.setDoc( - (BSONObject) serializeStruct(obj, (StructObjectInspector) oi, "")); + (BSONObject) serializeStruct(obj, (StructObjectInspector) oi, "")); return bsonWritable; } //CHECKSTYLE:ON @@ -555,7 +557,7 @@ private Object serializeStruct(final Object obj, final StructObjectInspector str String fieldName, hiveMapping; - // get corresponding mongoDB field + // get corresponding mongoDB field if (ext.length() == 0) { fieldName = columnNames.get(i); hiveMapping = fieldName; @@ -572,10 +574,10 @@ private Object serializeStruct(final Object obj, final StructObjectInspector str int lastDotPos = mongoMapping.lastIndexOf("."); String lastMapping = lastDotPos == -1 ? mongoMapping : mongoMapping.substring(lastDotPos + 1); bsonObject.put(lastMapping, - serializeObject(fieldObj, fieldOI, hiveMapping)); + serializeObject(fieldObj, fieldOI, hiveMapping)); } else { bsonObject.put(fieldName, - serializeObject(fieldObj, fieldOI, hiveMapping)); + serializeObject(fieldObj, fieldOI, hiveMapping)); } } @@ -651,4 +653,4 @@ private Object serializePrimitive(final Object obj, final PrimitiveObjectInspect return oi.getPrimitiveJavaObject(obj); } } -} +} \ No newline at end of file diff --git a/hive/src/main/java/com/mongodb/hadoop/hive/MongoStorageHandler.java b/hive/src/main/java/com/mongodb/hadoop/hive/MongoStorageHandler.java index c203a25a..8bc99de2 100644 --- a/hive/src/main/java/com/mongodb/hadoop/hive/MongoStorageHandler.java +++ b/hive/src/main/java/com/mongodb/hadoop/hive/MongoStorageHandler.java @@ -37,7 +37,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDe; +//import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -57,7 +58,7 @@ * rows in a Hive table */ public class MongoStorageHandler extends DefaultStorageHandler - implements HiveStoragePredicateHandler { + implements HiveStoragePredicateHandler { // stores the location of the collection public static final String MONGO_URI = "mongo.uri"; // get location of where meta-data is stored about the mongo collection @@ -84,24 +85,25 @@ public HiveMetaHook getMetaHook() { } @Override - public Class getSerDeClass() { + //public Class getSerDeClass() { + public Class getSerDeClass() { return BSONSerDe.class; } private Properties getProperties( - final Configuration conf, final String path) throws IOException { + final Configuration conf, final String path) throws IOException { if (properties == null) { properties = - MongoConfigUtil.readPropertiesFromFile(conf, path); + MongoConfigUtil.readPropertiesFromFile(conf, path); } return properties; } @Override public DecomposedPredicate decomposePredicate( - final JobConf jobConf, - final Deserializer deserializer, - final ExprNodeDesc predicate) { + final JobConf jobConf, + final Deserializer deserializer, + final ExprNodeDesc predicate) { BSONSerDe serde = (BSONSerDe) deserializer; // Create a new analyzer capable of handling equality and general @@ -110,19 +112,19 @@ public DecomposedPredicate decomposePredicate( // expressions, but we could push down more than that in the future by // writing our own analyzer. IndexPredicateAnalyzer analyzer = - IndexPredicateAnalyzer.createAnalyzer(false); + IndexPredicateAnalyzer.createAnalyzer(false); // Predicate may contain any column. for (String colName : serde.columnNames) { analyzer.allowColumnName(colName); } List searchConditions = - new LinkedList(); + new LinkedList(); ExprNodeDesc residual = analyzer.analyzePredicate( - predicate, searchConditions); + predicate, searchConditions); DecomposedPredicate decomposed = new DecomposedPredicate(); decomposed.pushedPredicate = - analyzer.translateSearchConditions(searchConditions); + analyzer.translateSearchConditions(searchConditions); decomposed.residualPredicate = (ExprNodeGenericFuncDesc) residual; return decomposed; } @@ -136,10 +138,10 @@ private class MongoHiveMetaHook implements HiveMetaHook { public void preCreateTable(final Table tbl) throws MetaException { Map tblParams = tbl.getParameters(); if (!(tblParams.containsKey(MONGO_URI) - || tblParams.containsKey(PROPERTIES_FILE_PATH))) { + || tblParams.containsKey(PROPERTIES_FILE_PATH))) { throw new MetaException( - format("You must specify '%s' or '%s' in TBLPROPERTIES", - MONGO_URI, PROPERTIES_FILE_PATH)); + format("You must specify '%s' or '%s' in TBLPROPERTIES", + MONGO_URI, PROPERTIES_FILE_PATH)); } } @@ -165,34 +167,34 @@ public void commitDropTable(final Table tbl, final boolean deleteData) throws Me if (tblParams.containsKey(MONGO_URI)) { String mongoURIStr = tblParams.get(MONGO_URI); coll = MongoConfigUtil.getCollection( - new MongoClientURI(mongoURIStr)); + new MongoClientURI(mongoURIStr)); } else if (tblParams.containsKey(PROPERTIES_FILE_PATH)) { String propertiesPathStr = - tblParams.get(PROPERTIES_FILE_PATH); + tblParams.get(PROPERTIES_FILE_PATH); Properties properties; try { properties = - getProperties(getConf(), propertiesPathStr); + getProperties(getConf(), propertiesPathStr); } catch (IOException e) { throw new MetaException( - "Could not read properties file " - + propertiesPathStr + ". Reason: " + e.getMessage()); + "Could not read properties file " + + propertiesPathStr + ". Reason: " + e.getMessage()); } if (!properties.containsKey(MONGO_URI)) { throw new MetaException( - "No URI given in properties file: " - + propertiesPathStr); + "No URI given in properties file: " + + propertiesPathStr); } String uriString = properties.getProperty(MONGO_URI); coll = MongoConfigUtil.getCollection( - new MongoClientURI(uriString)); + new MongoClientURI(uriString)); } else { throw new MetaException( - format( - "Could not find properties '%s' or '%s'. " - + "At least one must be defined. " - + "Collection not dropped.", - MONGO_URI, PROPERTIES_FILE_PATH)); + format( + "Could not find properties '%s' or '%s'. " + + "At least one must be defined. " + + "Collection not dropped.", + MONGO_URI, PROPERTIES_FILE_PATH)); } try { coll.drop(); @@ -243,11 +245,11 @@ private void copyJobProperties(final Properties from, final Map // First, merge properties from the given properties file, if there // was one. These can be overwritten by other table properties later. String propertiesFilePathString = - from.getProperty(PROPERTIES_FILE_PATH); + from.getProperty(PROPERTIES_FILE_PATH); if (propertiesFilePathString != null) { try { Properties properties = - getProperties(getConf(), propertiesFilePathString); + getProperties(getConf(), propertiesFilePathString); for (Map.Entry prop : properties.entrySet()) { String key = (String) prop.getKey(); String value = (String) prop.getValue(); @@ -261,8 +263,8 @@ private void copyJobProperties(final Properties from, final Map } } catch (IOException e) { LOG.error( - "Error while trying to read properties file " - + propertiesFilePathString, e); + "Error while trying to read properties file " + + propertiesFilePathString, e); } } @@ -282,4 +284,4 @@ private void copyJobProperties(final Properties from, final Map to.put(MongoConfigUtil.OUTPUT_URI, mongoURIStr); } } -} +} \ No newline at end of file