From 4ccb363ebc23741a630ca9b5c81bf0cfb4f98a28 Mon Sep 17 00:00:00 2001 From: simonpoole Date: Mon, 28 Jan 2019 16:10:31 +0100 Subject: [PATCH] Add replication file generation and replication from files This adds a facility to generate replication files in the "classical" OSM directory layout style and a facility to consume such files from a remote location via http. --- pom.xml | 479 +++++++++--------- src/main/java/de/komoot/photon/App.java | 53 +- .../de/komoot/photon/CommandLineArgs.java | 17 +- .../photon/elasticsearch/PhotonAction.java | 20 + .../elasticsearch/ReplicationState.java | 123 +++++ .../elasticsearch/ReplicationUpdater.java | 158 ++++++ .../photon/elasticsearch/Replicatior.java | 172 +++++++ .../de/komoot/photon/utils/DateFormatter.java | 85 ++++ .../photon/utils/FileSequenceFormatter.java | 87 ++++ .../photon/utils/SequenceFormatter.java | 72 +++ 10 files changed, 1010 insertions(+), 256 deletions(-) create mode 100644 src/main/java/de/komoot/photon/elasticsearch/PhotonAction.java create mode 100644 src/main/java/de/komoot/photon/elasticsearch/ReplicationState.java create mode 100644 src/main/java/de/komoot/photon/elasticsearch/ReplicationUpdater.java create mode 100644 src/main/java/de/komoot/photon/elasticsearch/Replicatior.java create mode 100644 src/main/java/de/komoot/photon/utils/DateFormatter.java create mode 100644 src/main/java/de/komoot/photon/utils/FileSequenceFormatter.java create mode 100644 src/main/java/de/komoot/photon/utils/SequenceFormatter.java diff --git a/pom.xml b/pom.xml index 659d52d54..71591be0b 100644 --- a/pom.xml +++ b/pom.xml @@ -1,245 +1,242 @@ - - 4.0.0 - - de.komoot.photon - photon - 0.3.0 - - - - org.elasticsearch - elasticsearch - ${elasticsearch.version} - - - org.elasticsearch - jna - - - - - org.elasticsearch.plugin - transport-netty4-client - ${elasticsearch.version} - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - - - org.elasticsearch.client - transport - ${elasticsearch.version} - - - org.elasticsearch - jna - 4.4.0 - - - postgresql - postgresql - 9.1-901-1.jdbc4 - - - org.slf4j - slf4j-api - ${slf4j.version} - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - - - com.beust - jcommander - 1.32 - - - org.openstreetmap.osmosis - osmosis-hstore-jdbc - 0.43.1 - - - org.apache.commons - commons-lang3 - 3.3.2 - - - org.codehaus.jackson - jackson-jaxrs - 1.4.0 - - - com.neovisionaries - nv-i18n - 1.4 - - - org.projectlombok - lombok - 1.16.18 - provided - - - org.springframework - spring-jdbc - 4.0.0.RELEASE - - - commons-dbcp - commons-dbcp - 1.4 - - - com.vividsolutions - jts - 1.13 - - - com.sparkjava - spark-core - 2.8.0 - - - com.google.code.findbugs - jsr305 - 2.0.2 - - - postgis - postgis-jdbc-jts - 1.1.5 - - - junit - junit - 4.12 - test - - - - org.mockito - mockito-core - 2.13.0 - test - - - - com.google.guava - guava - 19.0 - - - net.lingala.zip4j - zip4j - 1.2.3 - - - commons-io - commons-io - 2.4 - - - org.json - json - 20140107 - - - commons-beanutils - commons-beanutils - 1.9.2 - test - - - - - - - datanucleus - datanucleus - http://www.datanucleus.org/downloads/maven2/ - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 2.2 - - true - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - package - - shade - - - - - - de.komoot.photon.App - - - - - - - - org.apache.maven.plugins - maven-resources-plugin - 2.6 - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - UTF-8 - - - - - - es/ - - - src/main/resources/ - - - - - 1.8 - 1.8 - UTF-8 - UTF-8 - UTF-8 - + + 4.0.0 + de.komoot.photon + photon + 0.3.0 + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + + + org.elasticsearch + jna + + + + + org.elasticsearch.plugin + transport-netty4-client + ${elasticsearch.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + org.elasticsearch.client + transport + ${elasticsearch.version} + + + org.elasticsearch + jna + 4.4.0 + + + postgresql + postgresql + 9.1-901-1.jdbc4 + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + com.beust + jcommander + 1.32 + + + org.openstreetmap.osmosis + osmosis-hstore-jdbc + 0.43.1 + + + org.apache.commons + commons-lang3 + 3.3.2 + + + org.codehaus.jackson + jackson-jaxrs + 1.4.0 + + + com.neovisionaries + nv-i18n + 1.4 + + + org.projectlombok + lombok + 1.16.18 + provided + + + org.springframework + spring-jdbc + 4.0.0.RELEASE + + + commons-dbcp + commons-dbcp + 1.4 + + + com.vividsolutions + jts + 1.13 + + + com.sparkjava + spark-core + 2.8.0 + + + com.google.code.findbugs + jsr305 + 2.0.2 + + + postgis + postgis-jdbc-jts + 1.1.5 + + + junit + junit + 4.12 + test + + + org.mockito + mockito-core + 2.13.0 + test + + + com.google.guava + guava + 19.0 + + + net.lingala.zip4j + zip4j + 1.2.3 + + + commons-io + commons-io + 2.4 + + + org.json + json + 20140107 + + + commons-beanutils + commons-beanutils + 1.9.2 + test + + + com.cedarsoftware + json-io + 4.10.1 + + + + + datanucleus + datanucleus + http://www.datanucleus.org/downloads/maven2/ + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.2 + + true + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + de.komoot.photon.App + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.6 + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + UTF-8 + + + + + + es/ + + + src/main/resources/ + + + + + 1.8 + 1.8 + UTF-8 + UTF-8 + UTF-8 5.5.0 1.7.25 2.8.2 - - -Xdoclint:none - - - + -Xdoclint:none + + \ No newline at end of file diff --git a/src/main/java/de/komoot/photon/App.java b/src/main/java/de/komoot/photon/App.java index 1cfb2402f..c6588d0a5 100644 --- a/src/main/java/de/komoot/photon/App.java +++ b/src/main/java/de/komoot/photon/App.java @@ -1,8 +1,10 @@ package de.komoot.photon; - import com.beust.jcommander.JCommander; import com.beust.jcommander.ParameterException; + +import de.komoot.photon.elasticsearch.ReplicationUpdater; +import de.komoot.photon.elasticsearch.Replicatior; import de.komoot.photon.elasticsearch.Server; import de.komoot.photon.nominatim.NominatimConnector; import de.komoot.photon.nominatim.NominatimUpdater; @@ -11,12 +13,12 @@ import spark.Request; import spark.Response; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import static spark.Spark.*; - @Slf4j public class App { @@ -43,6 +45,11 @@ public static void main(String[] rawArgs) throws Exception { return; } + if (args.isReplicate()) { + startReplicationUpdate(args); + return; + } + boolean shutdownES = false; final Server esServer = new Server(args).start(); try { @@ -63,11 +70,11 @@ public static void main(String[] rawArgs) throws Exception { // no special action specified -> normal mode: start search API startApi(args, esClient); } finally { - if (shutdownES) esServer.shutdown(); + if (shutdownES) + esServer.shutdown(); } } - /** * dump elastic search index and create a new and empty one * @@ -83,7 +90,6 @@ private static void startRecreatingIndex(Server esServer) { log.info("deleted photon index and created an empty new one."); } - /** * take nominatim data and dump it to json * @@ -93,7 +99,8 @@ private static void startJsonDump(CommandLineArgs args) { try { final String filename = args.getJsonDump(); final JsonDumper jsonDumper = new JsonDumper(filename, args.getLanguages()); - NominatimConnector nominatimConnector = new NominatimConnector(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword()); + NominatimConnector nominatimConnector = new NominatimConnector(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), + args.getPassword()); nominatimConnector.setImporter(jsonDumper); nominatimConnector.readEntireDatabase(args.getCountryCodes().split(",")); log.info("json dump was created: " + filename); @@ -102,7 +109,6 @@ private static void startJsonDump(CommandLineArgs args) { } } - /** * take nominatim data to fill elastic search index * @@ -126,11 +132,22 @@ private static void startNominatimImport(CommandLineArgs args, Server esServer, log.info("imported data from nominatim to photon with languages: " + args.getLanguages()); } + /** + * Run generation of replication files once + * + * @param args the arguments from the command line + */ + private static void startReplicationUpdate(CommandLineArgs args) { + final NominatimUpdater nominatimUpdater = new NominatimUpdater(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword()); + Updater updater = new ReplicationUpdater(new File(args.getReplicationDirectory()), null); + nominatimUpdater.setUpdater(updater); + nominatimUpdater.update(); + } /** * start api to accept search requests via http * - * @param args + * @param args the arguments from the command line * @param esNodeClient */ private static void startApi(CommandLineArgs args, Client esNodeClient) { @@ -146,11 +163,19 @@ private static void startApi(CommandLineArgs args, Client esNodeClient) { // setup update API final NominatimUpdater nominatimUpdater = new NominatimUpdater(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword()); Updater updater = new de.komoot.photon.elasticsearch.Updater(esNodeClient, args.getLanguages()); - nominatimUpdater.setUpdater(updater); - - get("/nominatim-update", (Request request, Response response) -> { - new Thread(() -> nominatimUpdater.update()).start(); - return "nominatim update started (more information in console output) ..."; - }); + String replicationUrl = args.getReplicationUrl(); + if (replicationUrl == null) { + if (args.isGenerateFiles()) { + updater = new ReplicationUpdater(new File(args.getReplicationDirectory()), updater); + } + nominatimUpdater.setUpdater(updater); + get("/nominatim-update", (Request request, Response response) -> { + new Thread(() -> nominatimUpdater.update()).start(); + return "nominatim update started (more information in console output) ..."; + }); + } else { + Replicatior replicator = new Replicatior(new File(args.getReplicationDirectory()), args.getReplicationUrl(), args.getReplicationInterval(), updater); + replicator.start(); + } } } diff --git a/src/main/java/de/komoot/photon/CommandLineArgs.java b/src/main/java/de/komoot/photon/CommandLineArgs.java index 3557c0af6..c129ad308 100644 --- a/src/main/java/de/komoot/photon/CommandLineArgs.java +++ b/src/main/java/de/komoot/photon/CommandLineArgs.java @@ -55,7 +55,22 @@ public class CommandLineArgs { @Parameter(names = "-listen-ip", description = "listen to address (default '0.0.0.0')") private String listenIp = "0.0.0.0"; - + + @Parameter(names = "-replicate", description = "generate a replication file") + private boolean replicate = false; + + @Parameter(names = "-replication-dir", description = "directory for replication files (default '.')") + private String replicationDirectory = new File(".").getAbsolutePath(); + + @Parameter(names = "-generate-files", description = "generate replication files while updating from Nominatim") + private boolean generateFiles = false; + + @Parameter(names = "-replication-url", description = "url of base directory for file based replication (default none)") + private String replicationUrl = null; + + @Parameter(names = "-replication-interval", description = "interval between trying to read replication files in minutes (default 60 minutes)") + private int replicationInterval = 60; + @Parameter(names = "-h", description = "show help / usage") private boolean usage = false; } diff --git a/src/main/java/de/komoot/photon/elasticsearch/PhotonAction.java b/src/main/java/de/komoot/photon/elasticsearch/PhotonAction.java new file mode 100644 index 000000000..91a8129e5 --- /dev/null +++ b/src/main/java/de/komoot/photon/elasticsearch/PhotonAction.java @@ -0,0 +1,20 @@ +package de.komoot.photon.elasticsearch; + +import de.komoot.photon.PhotonDoc; + +/** + * Container class for an update action and a PhotonDoc + * + * @author Simon + * + */ +public class PhotonAction { + + public enum ACTION {CREATE, DELETE, UPDATE, UPDATE_OR_CREATE}; + + public ACTION action; + + public long id; + + public PhotonDoc doc; +} diff --git a/src/main/java/de/komoot/photon/elasticsearch/ReplicationState.java b/src/main/java/de/komoot/photon/elasticsearch/ReplicationState.java new file mode 100644 index 000000000..29f80a9cc --- /dev/null +++ b/src/main/java/de/komoot/photon/elasticsearch/ReplicationState.java @@ -0,0 +1,123 @@ +package de.komoot.photon.elasticsearch; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Date; + +import javax.annotation.Nonnull; + +import org.json.JSONObject; + +import de.komoot.photon.utils.DateFormatter; +import lombok.Getter; +import lombok.Setter; + +/** + * Container for the replication state + * + * @author Simon + * + */ +@Getter +@Setter +public class ReplicationState { + + private static final String TIMESTAMP_KEY = "timestamp"; + private static final String FORMAT_KEY = "format"; + private static final String SEQUENCE_NUMBER_KEY = "sequenceNumber"; + + final long sequenceNumber; + final String format; + final Date timeStamp; + + final DateFormatter dateFormatter; + + /** + * Construct a new instance + * + * @param sequenceNumber the current sequence number + * @param format the format of the replication file + * @param timeStamp the current time + */ + public ReplicationState(long sequenceNumber, @Nonnull String format, @Nonnull Date timeStamp) { + this.sequenceNumber = sequenceNumber; + this.format = format; + this.timeStamp = timeStamp; + dateFormatter = new DateFormatter(); + } + + /** + * Read the replication state from a local file + * + * @param stateFilePath the Path to the file + * @return a ReplicationState instance + * @throws IOException if something goes wrong + */ + @Nonnull + public static ReplicationState readState(@Nonnull Path stateFilePath) throws IOException { + String content = new String(Files.readAllBytes(stateFilePath), Charset.forName("UTF-8")); + JSONObject state = new JSONObject(content); + + return new ReplicationState(state.getLong(SEQUENCE_NUMBER_KEY), state.getString(FORMAT_KEY), null); + } + + /** + * Read the replication state from an url + * + * @param urlString the url from the file + * @return a ReplicationState instance + * @throws IOException if something goes wrong + */ + @Nonnull + public static ReplicationState readState(@Nonnull String urlString) throws IOException { + URL url = new URL(urlString); + HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection(); + urlConnection.setRequestProperty("User-Agent", "Photon"); + urlConnection.setInstanceFollowRedirects(true); + try { + InputStream in = new BufferedInputStream(urlConnection.getInputStream()); + ByteArrayOutputStream result = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int length; + while ((length = in.read(buffer)) != -1) { + result.write(buffer, 0, length); + } + + JSONObject state = new JSONObject(result.toString("UTF-8")); + return new ReplicationState(state.getLong(SEQUENCE_NUMBER_KEY), state.getString(FORMAT_KEY), null); + } finally { + urlConnection.disconnect(); + } + } + + /** + * Get a JSON representation of the replication state + * + * @return a JSONObject containg the state + */ + @Nonnull + public JSONObject getJson() { + final JSONObject state = new JSONObject(); + state.put(SEQUENCE_NUMBER_KEY, sequenceNumber); + state.put(FORMAT_KEY, format); + state.put(TIMESTAMP_KEY, dateFormatter.format(timeStamp)); + return state; + } + + /** + * Get a JSON representation of the replication state as a String + * + * @return a String containing JSON + */ + @Nonnull + public String toJsonString() { + return getJson().toString(4); + } +} diff --git a/src/main/java/de/komoot/photon/elasticsearch/ReplicationUpdater.java b/src/main/java/de/komoot/photon/elasticsearch/ReplicationUpdater.java new file mode 100644 index 000000000..a450f7e57 --- /dev/null +++ b/src/main/java/de/komoot/photon/elasticsearch/ReplicationUpdater.java @@ -0,0 +1,158 @@ +package de.komoot.photon.elasticsearch; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.zip.GZIPOutputStream; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import com.cedarsoftware.util.io.JsonWriter; + +import de.komoot.photon.PhotonDoc; +import de.komoot.photon.elasticsearch.PhotonAction.ACTION; +import de.komoot.photon.utils.FileSequenceFormatter; +import lombok.extern.slf4j.Slf4j; + +/** + * Generate replication files from updates, a further updater can be daisy-chained + * + * @author Simon + * + */ +@Slf4j +public class ReplicationUpdater implements de.komoot.photon.Updater { + + public static final String REPLICATION_FORMAT = "0.0.0"; + + final de.komoot.photon.Updater otherUpdater; + final File baseDirectory; + List actions = new ArrayList<>(); + long sequenceNumber; + final FileSequenceFormatter formatter; + final Path stateFilePath; + + /** + * Construct a new ReplicationUpdater + * + * @param baseDirectory the directory in which we store the replication files + * @param otherUpdater a further updater to call + */ + public ReplicationUpdater(@Nonnull final File baseDirectory, @Nullable final de.komoot.photon.Updater otherUpdater) { + this.otherUpdater = otherUpdater; + this.baseDirectory = baseDirectory; + formatter = new FileSequenceFormatter(baseDirectory); + stateFilePath = Paths.get(baseDirectory + "/state.json"); + try { + ReplicationState initialState = ReplicationState.readState(stateFilePath); + sequenceNumber = initialState.getSequenceNumber(); + sequenceNumber++; + } catch (IOException e) { + sequenceNumber = 0; + } + log.info(String.format("Next sequence number %d", sequenceNumber)); + } + + @Override + public void create(PhotonDoc doc) { + if (otherUpdater != null) { + otherUpdater.create(doc); + } + addAction(actions, doc.getPlaceId(), ACTION.CREATE, doc); + } + + @Override + public void update(PhotonDoc doc) { + if (otherUpdater != null) { + otherUpdater.update(doc); + } + addAction(actions, doc.getPlaceId(), ACTION.UPDATE, doc); + } + + @Override + public void delete(Long id) { + if (otherUpdater != null) { + otherUpdater.delete(id); + } + addAction(actions, id, ACTION.DELETE, null); + } + + @Override + public void updateOrCreate(PhotonDoc updatedDoc) { + if (otherUpdater != null) { + otherUpdater.updateOrCreate(updatedDoc); + } + addAction(actions, updatedDoc.getPlaceId(), ACTION.UPDATE_OR_CREATE, updatedDoc); + } + + @Override + public void finish() { + if (otherUpdater != null) { + otherUpdater.finish(); + } + if (actions.isEmpty()) { + log.warn("Update empty"); + return; + } + String json = JsonWriter.objectToJson(actions); + try (PrintStream outputStream = new PrintStream( + new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream(formatter.getFormattedName(sequenceNumber, ".json.gz")))), false, + "UTF-8");) { + outputStream.print(json); + saveState(REPLICATION_FORMAT); + sequenceNumber++; + actions.clear(); + } catch (Exception ex) { + log.error("Exception writing replication file ", ex); + } + } + + /** + * Save the current state in a per-sequence id file and then link the global state file to it + * + * @param replicationFormat the current replication format version + */ + private void saveState(@Nonnull String replicationFormat) { + final ReplicationState state = new ReplicationState(sequenceNumber, replicationFormat, new Date()); + PrintStream outputStream = null; + try { + File newStateFile = formatter.getFormattedName(sequenceNumber, ".state.json"); + outputStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(newStateFile))); + outputStream.print(state.toJsonString()); + Files.deleteIfExists(stateFilePath); + Files.createLink(stateFilePath, newStateFile.toPath()); + } catch (IOException e) { + log.error("Exception writing replication state file ", e); + } finally { + if (outputStream != null) { + outputStream.close(); + } + } + } + + /** + * Add an action and a PhotonDoc to the list of updates + * + * @param actions the List of PhotonActions + * @param placeId the place id for the doc + * @param actionType the action to perform + * @param doc the PhotonDoc or null if the action is delete + */ + private void addAction(@Nonnull List actions, long placeId, @Nonnull ACTION actionType, @Nullable PhotonDoc doc) { + PhotonAction action; + action = new PhotonAction(); + action.action = actionType; + action.id = placeId; + action.doc = doc; + actions.add(action); + } +} diff --git a/src/main/java/de/komoot/photon/elasticsearch/Replicatior.java b/src/main/java/de/komoot/photon/elasticsearch/Replicatior.java new file mode 100644 index 000000000..8a30d4b03 --- /dev/null +++ b/src/main/java/de/komoot/photon/elasticsearch/Replicatior.java @@ -0,0 +1,172 @@ +package de.komoot.photon.elasticsearch; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.file.Paths; +import java.util.Date; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.zip.GZIPInputStream; + +import javax.annotation.Nonnull; + +import com.cedarsoftware.util.io.JsonObject; +import com.cedarsoftware.util.io.JsonReader; +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.GeometryFactory; +import com.vividsolutions.jts.geom.Point; + +import de.komoot.photon.Updater; +import de.komoot.photon.utils.SequenceFormatter; +import lombok.extern.slf4j.Slf4j; + +/** + * Update Photon from a remote repository of serialised PhotonDocs + * + * @author Simon Poole + * + */ +@Slf4j +public class Replicatior { + + final String replicationUrlString; + final File baseDirectory; + final SequenceFormatter sequenceFormatter; + final Updater updater; + final int interval; + final GeometryFactory factory; + + /** + * Create a new instance of Replicator + * + * @param replicationUrl the base URL of where the replication files are stored + * @param interval how often should we query the server in minutes + */ + public Replicatior(@Nonnull final File baseDirectory, @Nonnull final String replicationUrlString, int interval, @Nonnull final Updater updater) { + this.replicationUrlString = replicationUrlString; + this.baseDirectory = baseDirectory; + this.updater = updater; + this.interval = interval; + sequenceFormatter = new SequenceFormatter(9, 3); + // + factory = new GeometryFactory(); + } + + /** + * Start the regular updates + */ + public void start() { + log.info(String.format("Starting replication from %s every %d minutes", replicationUrlString, interval)); + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new ReplicationTask(), 60 * 1000L, interval * 60 * 1000L); + } + + /** + * Read the local and remote state file and then download any missing replication files + * + * FIXME: should likely throw and exception if the difference between sequence numbers is very large + * + */ + class ReplicationTask extends TimerTask { + + @Override + public void run() { + + try { + // get our local state file + ReplicationState lastState = ReplicationState.readState(Paths.get(baseDirectory + "/laststate.json")); + long lastSequenceNumber = lastState.getSequenceNumber(); + // get the remote state file + ReplicationState currentState = ReplicationState.readState(replicationUrlString + "/state.json"); + long currentSequenceNumber = currentState.getSequenceNumber(); + // get all replication files up to the current one and update the database + for (long i = lastSequenceNumber + 1; i <= currentSequenceNumber; i++) { + log.info(String.format("Runing update for sequence %d", i)); + URL url = new URL(replicationUrlString + "/" + sequenceFormatter.getFormattedName(i, ".json.gz")); + HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection(); + urlConnection.setRequestProperty("User-Agent", "Photon"); + urlConnection.setInstanceFollowRedirects(true); + InputStream in = new BufferedInputStream(new GZIPInputStream(urlConnection.getInputStream())); + JsonReader reader = new JsonReader(in); + reader.addReader(com.vividsolutions.jts.geom.Point.class, new PointReader()); + List actions = (List) reader.readObject(); + int deletions = 0; + int updates = 0; + for (PhotonAction action : actions) { + switch (action.action) { + case DELETE: + updater.delete(action.id); + deletions++; + break; + case CREATE: + updater.create(action.doc); + updates++; + break; + case UPDATE: + updater.update(action.doc); + updates++; + break; + case UPDATE_OR_CREATE: + updater.updateOrCreate(action.doc); + updates++; + break; + } + } + updater.finish(); + lastState = new ReplicationState(i, ReplicationUpdater.REPLICATION_FORMAT, new Date()); + PrintStream outputStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(baseDirectory + "/laststate.json"))); + outputStream.print(lastState.toJsonString()); + outputStream.close(); + reader.close(); + log.info(String.format("Update done. %d deletions %d updates or additions.", deletions, updates)); + } + } catch (IOException e) { + log.error("Replicator failing", e); + cancel(); + } + } + } + + /** + * json-io needs some help de-serialising the Point class + * + */ + class PointReader implements JsonReader.JsonClassReaderEx { + + @SuppressWarnings("rawtypes") + @Override + public Point read(Object jOb, Deque> stack, Map args) { + JsonObject coordinates = (JsonObject) ((JsonObject) jOb).get("coordinates"); + Object c = coordinates.get("coords"); + Double[] coords = null; + if (c instanceof Double[]) { + coords = (Double[]) c; + } else if (c instanceof JsonObject && ((JsonObject) c).isArray()) { + Object[] temp = ((JsonObject) c).getArray(); + coords = new Double[temp.length]; + for (int i = 0; i < temp.length; i++) { + coords[i] = (Double) temp[i]; + } + } else if (c instanceof Object[]) { + Object[] temp = (Object[]) c; + coords = new Double[temp.length]; + for (int i = 0; i < temp.length; i++) { + coords[i] = (Double) temp[i]; + } + } else { // crash and burn + throw new IllegalArgumentException("PointReader unknown serialisation " + c.getClass().getCanonicalName() + " " + c.toString()); + } + return factory.createPoint(new Coordinate(coords[0], coords[1])); + } + } +} diff --git a/src/main/java/de/komoot/photon/utils/DateFormatter.java b/src/main/java/de/komoot/photon/utils/DateFormatter.java new file mode 100644 index 000000000..523926239 --- /dev/null +++ b/src/main/java/de/komoot/photon/utils/DateFormatter.java @@ -0,0 +1,85 @@ +// This software is released into the Public Domain. See copying.txt for details. +package de.komoot.photon.utils; + +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.TimeZone; + + +/** + * Outputs a date in a format suitable for an OSM XML file. + * + * @author Brett Henderson + */ +public class DateFormatter { + + private GregorianCalendar calendar; + + + /** + * Creates a new instance. + */ + public DateFormatter() { + calendar = new GregorianCalendar(TimeZone.getTimeZone("UTC")); + } + + + /** + * Formats a date in XML format. + * + * @param date + * The date to be formatted. + * @return The string representing the date. + */ + public String format(Date date) { + StringBuilder result; + int year; + int month; + int day; + int hour; + int minute; + int second; + + calendar.setTime(date); + + result = new StringBuilder(20); + + year = calendar.get(Calendar.YEAR); + month = calendar.get(Calendar.MONTH) + 1; + day = calendar.get(Calendar.DATE); + hour = calendar.get(Calendar.HOUR_OF_DAY); + minute = calendar.get(Calendar.MINUTE); + second = calendar.get(Calendar.SECOND); + + result.append(year); + result.append('-'); + if (month < 10) { + result.append('0'); + } + result.append(month); + result.append('-'); + if (day < 10) { + result.append('0'); + } + result.append(day); + result.append('T'); + if (hour < 10) { + result.append('0'); + } + result.append(hour); + result.append(':'); + if (minute < 10) { + result.append('0'); + } + result.append(minute); + result.append(':'); + if (second < 10) { + result.append('0'); + } + result.append(second); + result.append('Z'); + + return result.toString(); + } +} diff --git a/src/main/java/de/komoot/photon/utils/FileSequenceFormatter.java b/src/main/java/de/komoot/photon/utils/FileSequenceFormatter.java new file mode 100644 index 000000000..040eee751 --- /dev/null +++ b/src/main/java/de/komoot/photon/utils/FileSequenceFormatter.java @@ -0,0 +1,87 @@ +// This software is released into the Public Domain. +// this is nicked from osmosis by Brett Henderson https://github.com/openstreetmap/osmosis + +package de.komoot.photon.utils; + +import java.io.File; +import java.io.IOException; +import java.util.StringTokenizer; + + +/** + * Formats replication sequence numbers into file names. + */ +public class FileSequenceFormatter { + + private SequenceFormatter sequenceFormatter; + private File workingDirectory; + + + /** + * Creates a new instance. The minimum length and grouping length will default to 9 and 3 + * respectively. + * + * @param workingDirectory + * The directory from which to base all created files. + */ + public FileSequenceFormatter(File workingDirectory) { + this(workingDirectory, 9, 3); + } + + + /** + * Creates a new instance. + * + * @param minimumLength + * The minimum length file sequence string to generate. For example, setting a length + * of 2 will generate sequence numbers from "00" to "99". + * @param groupingLength + * The number of characters to write before separating with a '/' character. Used for + * creating sequence numbers to be written to files in a nested directory structure. + * @param workingDirectory + * The directory from which to base all created files. + */ + public FileSequenceFormatter(File workingDirectory, int minimumLength, int groupingLength) { + this.workingDirectory = workingDirectory; + + sequenceFormatter = new SequenceFormatter(minimumLength, groupingLength); + } + + + /** + * Formats the sequence number into a file name. Any sub-directories required will be + * automatically created. + * + * @param sequenceNumber + * The sequence number. + * @param fileNameSuffix + * The suffix to append to the end of the file name. + * @return The formatted file. + * @throws IOException + */ + public File getFormattedName(long sequenceNumber, String fileNameSuffix) throws IOException { + String fileName; + StringTokenizer pathTokenizer; + File formattedPath; + + fileName = sequenceFormatter.getFormattedName(sequenceNumber, fileNameSuffix); + + pathTokenizer = new StringTokenizer(fileName, "/"); + + formattedPath = workingDirectory; + while (pathTokenizer.hasMoreTokens()) { + // Move to the next item in the path. + formattedPath = new File(formattedPath, pathTokenizer.nextToken()); + + // If this is a directory within the path (ie. not the final element in the path) then + // ensure it exists and create it if it doesn't exist. + if (pathTokenizer.hasMoreTokens()) { + if (!formattedPath.exists() && !formattedPath.mkdir()) { + throw new IOException("Unable to create directory \"" + formattedPath + "\"."); + } + } + } + + return formattedPath; + } +} diff --git a/src/main/java/de/komoot/photon/utils/SequenceFormatter.java b/src/main/java/de/komoot/photon/utils/SequenceFormatter.java new file mode 100644 index 000000000..9be3aa345 --- /dev/null +++ b/src/main/java/de/komoot/photon/utils/SequenceFormatter.java @@ -0,0 +1,72 @@ +// This software is released into the Public Domain. +// this is nicked from osmosis by Brett Henderson https://github.com/openstreetmap/osmosis + +package de.komoot.photon.utils; + +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.text.NumberFormat; +import java.util.Locale; + + +/** + * Formats replication sequence numbers into file names. + */ +public class SequenceFormatter { + + private NumberFormat sequenceFormat; + + + /** + * Creates a new instance. + * + * @param minimumLength + * The minimum length file sequence string to generate. For example, setting a length + * of 2 will generate sequence numbers from "00" to "99". + * @param groupingLength + * The number of characters to write before separating with a '/' character. Used for + * creating sequence numbers to be written to files in a nested directory structure. + */ + public SequenceFormatter(int minimumLength, int groupingLength) { + DecimalFormatSymbols formatSymbols; + StringBuilder formatString; + + formatSymbols = new DecimalFormatSymbols(Locale.US); + formatSymbols.setGroupingSeparator('/'); + + formatString = new StringBuilder(); + for (int i = 0; i < minimumLength || i <= groupingLength; i++) { + if (i > 0 && groupingLength > 0 && i % groupingLength == 0) { + formatString.append(','); + } + + if (i < minimumLength) { + formatString.append('0'); + } else { + formatString.append('#'); + } + } + formatString.reverse(); + + this.sequenceFormat = new DecimalFormat(formatString.toString(), formatSymbols); + } + + + /** + * Formats the sequence number into a file name. Any sub-directories required will be + * automatically created. + * + * @param sequenceNumber + * The sequence number. + * @param fileNameSuffix + * The suffix to append to the end of the file name. + * @return The formatted file name. + */ + public String getFormattedName(long sequenceNumber, String fileNameSuffix) { + String fileName; + + fileName = sequenceFormat.format(sequenceNumber) + fileNameSuffix; + + return fileName; + } +}