Skip to content
This repository was archived by the owner on Jan 29, 2022. It is now read-only.

Mortar changes for mongo-hadoop connector #102

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) {
* Get the record writer that points to the output collection.
*/
public RecordWriter<K, V> getRecordWriter(final TaskAttemptContext context) {
return new MongoRecordWriter(MongoConfigUtil.getOutputCollections(context.getConfiguration()), context);
return new MongoRecordWriter(MongoConfigUtil.getOutputCollections(context.getConfiguration()), context, updateKeys, multiUpdate);
}

public MongoOutputFormat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.mongodb.hadoop.mapred.output.MongoOutputCommitter;
import com.mongodb.hadoop.mapred.output.MongoRecordWriter;
import com.mongodb.hadoop.util.MongoConfigUtil;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCommitter;
Expand All @@ -33,7 +34,17 @@

@SuppressWarnings("deprecation")
public class MongoOutputFormat<K, V> implements OutputFormat<K, V> {

private final String[] updateKeys;
private final boolean multiUpdate;

public MongoOutputFormat() {
this(null, false);
}

public MongoOutputFormat(String[] updateKeys, boolean multiUpdate) {
this.updateKeys = updateKeys;
this.multiUpdate = multiUpdate;
}

public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws IOException {
Expand All @@ -50,7 +61,7 @@ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) {

public RecordWriter<K, V> getRecordWriter(final FileSystem ignored, final JobConf job, final String name,
final Progressable progress) {
return new MongoRecordWriter<K, V>(MongoConfigUtil.getOutputCollections(job), job);
return new MongoRecordWriter<K, V>(MongoConfigUtil.getOutputCollections(job), job, updateKeys, multiUpdate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.mongodb.MongoException;
import com.mongodb.hadoop.MongoOutput;
import com.mongodb.hadoop.io.BSONWritable;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
Expand All @@ -38,11 +39,21 @@ public class MongoRecordWriter<K, V> implements RecordWriter<K, V> {
private final List<DBCollection> collections;

private final JobConf configuration;


private final String[] updateKeys;
private final boolean multiUpdate;

public MongoRecordWriter(final List<DBCollection> c, final JobConf conf) {
this(c, conf, null, false);
}


public MongoRecordWriter(final List<DBCollection> c, final JobConf conf, String[] updateKeys, boolean multiUpdate) {
collections = c;
configuration = conf;
numberOfHosts = c.size();
this.updateKeys = updateKeys;
this.multiUpdate = multiUpdate;
}


Expand Down Expand Up @@ -75,7 +86,23 @@ public void write(final K key, final V value) throws IOException {

try {
DBCollection dbCollection = getDbCollectionByRoundRobin();
dbCollection.save(o);

if (updateKeys == null) {
dbCollection.save(o);
} else {
// Form the query fields
DBObject query = new BasicDBObject(updateKeys.length);
for (String updateKey : updateKeys) {
query.put(updateKey, o.get(updateKey));
o.removeField(updateKey);
}
// If _id is null remove it, we don't want to override with null _id
if (o.get("_id") == null) {
o.removeField("_id");
}
DBObject set = new BasicDBObject().append("$set", o);
dbCollection.update(query, set, true, multiUpdate);
}
} catch (final MongoException e) {
throw new IOException("can't write to mongo", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public MongoRecordWriter(final List<DBCollection> c, final TaskAttemptContext ct
collections = new ArrayList<DBCollection>(c);
context = ctx;
this.updateKeys = updateKeys;
this.multiUpdate = false;
this.multiUpdate = multi;
this.numberOfHosts = c.size();

//authenticate if necessary - but don't auth twice on same DB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,17 @@ public MongoCollectionSplitter(final Configuration conf) {

protected void init() {
MongoURI inputURI = MongoConfigUtil.getInputURI(conf);
this.inputCollection = MongoConfigUtil.getCollection(inputURI);
DB db = this.inputCollection.getDB();

DB db;
try {
this.inputCollection = MongoConfigUtil.getCollection(inputURI);
db = this.inputCollection.getDB();
} catch (Exception e) {
String message = e.getMessage() + "\n\nMongo connection strings are required to be of the form:\n" +
" mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/database.collection";
throw new IllegalStateException(message, e);
}

this.mongo = db.getMongo();
MongoURI authURI = MongoConfigUtil.getAuthURI(conf);
if (authURI != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ public List<InputSplit> calculateSplits() throws SplitFailedException {
numChunks++;
}

return createSplitList(numChunks, shardToSplits);
}

/**
* Round robin splits across shards. The splits are going to end up as Map jobs
* processed in the same order as the splits. We want to have continuous map
* jobs be on separate shards so that as you're completing map jobs the work
* is spread evenly across shard machines.
*
* @param numChunks - Number of chunks
* @param shardToSplits - Map of shardName to list of splits on that shard.
*/
protected static List<InputSplit> createSplitList(int numChunks,
Map<String, LinkedList<InputSplit>> shardToSplits) {
final List<InputSplit> splits = new ArrayList<InputSplit>(numChunks);
int splitIndex = 0;
while (splitIndex < numChunks) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.mongodb.hadoop.splitter;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Test;

import com.mongodb.MongoURI;

@SuppressWarnings("deprecation")
public class ShardChunkMongoSplitterTest {
@Test
public void testCreateSplitList_oneShard() {
int numChunks = 2;
InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1"));
InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2"));
LinkedList<InputSplit> shardSplits = new LinkedList<InputSplit>(Arrays.asList(split1, split2));

Map<String,LinkedList<InputSplit>> shardToSplits = new HashMap<String, LinkedList<InputSplit>>();
shardToSplits.put("shard1", shardSplits);

List<InputSplit> splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits);
assertEquals(split1, splits.get(0));
assertEquals(split2, splits.get(1));
}

@Test
public void testCreateSplitList_twoEvenShards() {
int numChunks = 4;
InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1"));
InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2"));
InputSplit split3 = new TestMongoInputSplit(new MongoURI("mongodb://split3"));
InputSplit split4 = new TestMongoInputSplit(new MongoURI("mongodb://split4"));
LinkedList<InputSplit> shardSplits1 = new LinkedList<InputSplit>(Arrays.asList(split1, split2));
LinkedList<InputSplit> shardSplits2 = new LinkedList<InputSplit>(Arrays.asList(split3, split4));

Map<String,LinkedList<InputSplit>> shardToSplits = new HashMap<String, LinkedList<InputSplit>>();
shardToSplits.put("shard1", shardSplits1);
shardToSplits.put("shard2", shardSplits2);

List<InputSplit> splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits);
assertEquals(split1, splits.get(0));
assertEquals(split3, splits.get(1));
assertEquals(split2, splits.get(2));
assertEquals(split4, splits.get(3));
}

@Test
public void testCreateSplitList_twoUnevenShards() {
int numChunks = 6;
InputSplit split1 = new TestMongoInputSplit(new MongoURI("mongodb://split1"));
InputSplit split2 = new TestMongoInputSplit(new MongoURI("mongodb://split2"));
InputSplit split3 = new TestMongoInputSplit(new MongoURI("mongodb://split3"));
InputSplit split4 = new TestMongoInputSplit(new MongoURI("mongodb://split4"));
InputSplit split5 = new TestMongoInputSplit(new MongoURI("mongodb://split5"));
InputSplit split6 = new TestMongoInputSplit(new MongoURI("mongodb://split6"));
LinkedList<InputSplit> shardSplits1 = new LinkedList<InputSplit>(Arrays.asList(split1, split2));
LinkedList<InputSplit> shardSplits2 = new LinkedList<InputSplit>(Arrays.asList(split3, split4, split5, split6));

Map<String,LinkedList<InputSplit>> shardToSplits = new HashMap<String, LinkedList<InputSplit>>();
shardToSplits.put("shard1", shardSplits1);
shardToSplits.put("shard2", shardSplits2);

List<InputSplit> splits = ShardChunkMongoSplitter.createSplitList(numChunks, shardToSplits);
assertEquals(split1, splits.get(0));
assertEquals(split3, splits.get(1));
assertEquals(split2, splits.get(2));
assertEquals(split4, splits.get(3));
assertEquals(split5, splits.get(4));
assertEquals(split6, splits.get(5));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.mongodb.hadoop.splitter;

import com.mongodb.MongoURI;
import com.mongodb.hadoop.input.MongoInputSplit;

public class TestMongoInputSplit extends MongoInputSplit {

public TestMongoInputSplit(MongoURI inputURI) {
this.inputURI = inputURI;
}
}
Loading