Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication file generation and replication from file [WIP] #385

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@
<version>1.9.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.cedarsoftware</groupId>
<artifactId>json-io</artifactId>
<version>4.10.1</version>
</dependency>
</dependencies>

<repositories>
Expand Down
53 changes: 39 additions & 14 deletions src/main/java/de/komoot/photon/App.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package de.komoot.photon;


import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;

import de.komoot.photon.elasticsearch.ReplicationUpdater;
import de.komoot.photon.elasticsearch.Replicatior;
import de.komoot.photon.elasticsearch.Server;
import de.komoot.photon.nominatim.NominatimConnector;
import de.komoot.photon.nominatim.NominatimUpdater;
Expand All @@ -12,12 +14,12 @@
import spark.Request;
import spark.Response;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;

import static spark.Spark.*;


@Slf4j
public class App {

Expand Down Expand Up @@ -47,6 +49,11 @@ public static void main(String[] rawArgs) throws Exception {
return;
}

if (args.isReplicate()) {
startReplicationUpdate(args);
return;
}

boolean shutdownES = false;
final Server esServer = new Server(args).start();
try {
Expand All @@ -67,11 +74,11 @@ public static void main(String[] rawArgs) throws Exception {
// no special action specified -> normal mode: start search API
startApi(args, esClient);
} finally {
if (shutdownES) esServer.shutdown();
if (shutdownES)
esServer.shutdown();
}
}


/**
* dump elastic search index and create a new and empty one
*
Expand All @@ -87,7 +94,6 @@ private static void startRecreatingIndex(Server esServer) {
log.info("deleted photon index and created an empty new one.");
}


/**
* take nominatim data and dump it to json
*
Expand All @@ -97,7 +103,8 @@ private static void startJsonDump(CommandLineArgs args) {
try {
final String filename = args.getJsonDump();
final JsonDumper jsonDumper = new JsonDumper(filename, args.getLanguages());
NominatimConnector nominatimConnector = new NominatimConnector(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
NominatimConnector nominatimConnector = new NominatimConnector(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(),
args.getPassword());
nominatimConnector.setImporter(jsonDumper);
nominatimConnector.readEntireDatabase(args.getCountryCodes().split(","));
log.info("json dump was created: " + filename);
Expand All @@ -106,7 +113,6 @@ private static void startJsonDump(CommandLineArgs args) {
}
}


/**
* take nominatim data to fill elastic search index
*
Expand All @@ -130,11 +136,22 @@ private static void startNominatimImport(CommandLineArgs args, Server esServer,
log.info("imported data from nominatim to photon with languages: " + args.getLanguages());
}

/**
* Run generation of replication files once
*
* @param args the arguments from the command line
*/
private static void startReplicationUpdate(CommandLineArgs args) {
final NominatimUpdater nominatimUpdater = new NominatimUpdater(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
Updater updater = new ReplicationUpdater(new File(args.getReplicationDirectory()), null);
nominatimUpdater.setUpdater(updater);
nominatimUpdater.update();
}

/**
* start api to accept search requests via http
*
* @param args
* @param args the arguments from the command line
* @param esNodeClient
*/
private static void startApi(CommandLineArgs args, Client esNodeClient) {
Expand All @@ -159,11 +176,19 @@ private static void startApi(CommandLineArgs args, Client esNodeClient) {
// setup update API
final NominatimUpdater nominatimUpdater = new NominatimUpdater(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
Updater updater = new de.komoot.photon.elasticsearch.Updater(esNodeClient, args.getLanguages());
nominatimUpdater.setUpdater(updater);

get("/nominatim-update", (Request request, Response response) -> {
new Thread(() -> nominatimUpdater.update()).start();
return "nominatim update started (more information in console output) ...";
});
String replicationUrl = args.getReplicationUrl();
if (replicationUrl == null) {
if (args.isGenerateFiles()) {
updater = new ReplicationUpdater(new File(args.getReplicationDirectory()), updater);
}
nominatimUpdater.setUpdater(updater);
get("/nominatim-update", (Request request, Response response) -> {
new Thread(() -> nominatimUpdater.update()).start();
return "nominatim update started (more information in console output) ...";
});
} else {
Replicatior replicator = new Replicatior(new File(args.getReplicationDirectory()), args.getReplicationUrl(), args.getReplicationInterval(), updater);
replicator.start();
}
}
}
15 changes: 15 additions & 0 deletions src/main/java/de/komoot/photon/CommandLineArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ public class CommandLineArgs {

@Parameter(names = "-cors-origin", description = "enable cross-site resource sharing for the specified origin (default CORS not supported)")
private String corsOrigin = null;

@Parameter(names = "-replicate", description = "generate a replication file")
private boolean replicate = false;

@Parameter(names = "-replication-dir", description = "directory for replication files (default '.')")
private String replicationDirectory = new File(".").getAbsolutePath();

@Parameter(names = "-generate-files", description = "generate replication files while updating from Nominatim")
private boolean generateFiles = false;

@Parameter(names = "-replication-url", description = "url of base directory for file based replication (default none)")
private String replicationUrl = null;

@Parameter(names = "-replication-interval", description = "interval between trying to read replication files in minutes (default 60 minutes)")
private int replicationInterval = 60;

@Parameter(names = "-h", description = "show help / usage")
private boolean usage = false;
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/de/komoot/photon/elasticsearch/PhotonAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package de.komoot.photon.elasticsearch;

import de.komoot.photon.PhotonDoc;

/**
* Container class for an update action and a PhotonDoc
*
* @author Simon
*
*/
public class PhotonAction {

public enum ACTION {CREATE, DELETE, UPDATE, UPDATE_OR_CREATE};

public ACTION action;

public long id;

public PhotonDoc doc;
}
123 changes: 123 additions & 0 deletions src/main/java/de/komoot/photon/elasticsearch/ReplicationState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package de.komoot.photon.elasticsearch;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Date;

import javax.annotation.Nonnull;

import org.json.JSONObject;

import de.komoot.photon.utils.DateFormatter;
import lombok.Getter;
import lombok.Setter;

/**
* Container for the replication state
*
* @author Simon
*
*/
@Getter
@Setter
public class ReplicationState {

private static final String TIMESTAMP_KEY = "timestamp";
private static final String FORMAT_KEY = "format";
private static final String SEQUENCE_NUMBER_KEY = "sequenceNumber";

final long sequenceNumber;
final String format;
final Date timeStamp;

final DateFormatter dateFormatter;

/**
* Construct a new instance
*
* @param sequenceNumber the current sequence number
* @param format the format of the replication file
* @param timeStamp the current time
*/
public ReplicationState(long sequenceNumber, @Nonnull String format, @Nonnull Date timeStamp) {
this.sequenceNumber = sequenceNumber;
this.format = format;
this.timeStamp = timeStamp;
dateFormatter = new DateFormatter();
}

/**
* Read the replication state from a local file
*
* @param stateFilePath the Path to the file
* @return a ReplicationState instance
* @throws IOException if something goes wrong
*/
@Nonnull
public static ReplicationState readState(@Nonnull Path stateFilePath) throws IOException {
String content = new String(Files.readAllBytes(stateFilePath), Charset.forName("UTF-8"));
JSONObject state = new JSONObject(content);

return new ReplicationState(state.getLong(SEQUENCE_NUMBER_KEY), state.getString(FORMAT_KEY), null);
}

/**
* Read the replication state from an url
*
* @param urlString the url from the file
* @return a ReplicationState instance
* @throws IOException if something goes wrong
*/
@Nonnull
public static ReplicationState readState(@Nonnull String urlString) throws IOException {
URL url = new URL(urlString);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
urlConnection.setRequestProperty("User-Agent", "Photon");
urlConnection.setInstanceFollowRedirects(true);
try {
InputStream in = new BufferedInputStream(urlConnection.getInputStream());
ByteArrayOutputStream result = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int length;
while ((length = in.read(buffer)) != -1) {
result.write(buffer, 0, length);
}

JSONObject state = new JSONObject(result.toString("UTF-8"));
return new ReplicationState(state.getLong(SEQUENCE_NUMBER_KEY), state.getString(FORMAT_KEY), null);
} finally {
urlConnection.disconnect();
}
}

/**
* Get a JSON representation of the replication state
*
* @return a JSONObject containg the state
*/
@Nonnull
public JSONObject getJson() {
final JSONObject state = new JSONObject();
state.put(SEQUENCE_NUMBER_KEY, sequenceNumber);
state.put(FORMAT_KEY, format);
state.put(TIMESTAMP_KEY, dateFormatter.format(timeStamp));
return state;
}

/**
* Get a JSON representation of the replication state as a String
*
* @return a String containing JSON
*/
@Nonnull
public String toJsonString() {
return getJson().toString(4);
}
}
Loading