Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CentroidStatistics VectorTile Job #104

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Init for centroid stats
  • Loading branch information
moradology committed Oct 18, 2018
commit 022b5b62760159fe7983558e24b2edeb4a564637
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -83,3 +83,5 @@ dist/
emr/terraform/auth.json

derby.log

data/*
15 changes: 15 additions & 0 deletions centroid-stats.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

ROOT_DIR=$(PWD)
echo $ROOT_DIR
cd src
sbt "project analytics" assembly
zip -d analytics/target/scala-2.11/osmesa-analytics.jar 'META-INF/*.SF' 'META-INF/*.RSA' 'META-INF/*SF'
docker run \
-v /tmp:/tmp/data \
-v $ROOT_DIR/src/analytics/target/scala-2.11/osmesa-analytics.jar:/opt/osmesa-analytics.jar \
-p 4040:4040 \
bde2020/spark-master:2.3.1-hadoop2.7 \
/spark/bin/spark-submit --class osmesa.analytics.oneoffs.CentroidStats /opt/osmesa-analytics.jar \
--history file:///tmp/data/isle-of-man-internal.osh.orc \
--output file:///tmp/data/stats-test.orc
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: '3.0'
services:
stat-gen:
image: osm_analytics:latest
container_name: stat-gen
volumes:
- ./data:/tmp/data
- /Users/nzimmerman/git_repos/osmesa/src/analytics/target/scala-2.11/osmesa-analytics.jar:/opt/osmesa-analytics2.jar
ports:
- "4040:4040"
command: >
/spark/bin/spark-submit --class osmesa.analytics.oneoffs.CentroidStats /opt/osmesa-analytics2.jar
--history file:///tmp/data/isle-of-man-internal.osh.orc
--output file:///tmp/data/stats-test.orc
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package osmesa.analytics.oneoffs

import java.net.URI

import cats.implicits._
import com.monovore.decline.{CommandApp, Opts}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SaveMode, SparkSession}
import osmesa.analytics.Analytics
import osmesa.common.ProcessOSM
import osmesa.common.functions._
import osmesa.common.functions.osm._
import org.locationtech.geomesa.spark.jts._


object CentroidStats extends CommandApp(
name = "centroid-stats",
header = "centroid statistics",
main = {
val historyOpt =
Opts.option[String]("history", help = "Location of the History ORC file to process.")
val outputOpt =
Opts.option[URI](long = "output", help = "Output URI prefix; trailing / must be included")

(historyOpt, outputOpt).mapN { (historySource, output) =>
implicit val spark: SparkSession = Analytics.sparkSession("ChangesetStats").withJTS
import spark.implicits._

val history = spark.read.orc(historySource)

val pointGeoms = ProcessOSM.geocode(ProcessOSM.constructPointGeometries(
// pre-filter to POI nodes
history.where('type === "node" and isPOI('tags))
).withColumn("minorVersion", lit(0)))

val wayGeoms = ProcessOSM.geocode(ProcessOSM.reconstructWayGeometries(
// pre-filter to interesting ways
history.where('type === "way" and (isBuilding('tags) or isRoad('tags) or isWaterway('tags) or isPOI('tags))),
// let reconstructWayGeometries do its thing; nodes are cheap
history.where('type === "node")
).drop('geometryChanged))

@transient val idByUpdated = Window.partitionBy('id).orderBy('updated)

val augmentedWays = wayGeoms
.withColumn("length", st_length('geom))
.withColumn("delta",
when(isRoad('tags) or isWaterway('tags),
coalesce(abs('length - (lag('length, 1) over idByUpdated)), lit(0)))
.otherwise(lit(0)))

val wayChangesetStats = augmentedWays
.withColumn("road_m_added",
when(isRoad('tags) and 'version === 1 and 'minorVersion === 0, 'length)
.otherwise(lit(0)))
.withColumn("road_m_modified",
when(isRoad('tags) and not('version === 1 and 'minorVersion === 0), 'delta)
.otherwise(lit(0)))
.withColumn("waterway_m_added",
when(isWaterway('tags) and 'version === 1 and 'minorVersion === 0, 'length)
.otherwise(lit(0)))
.withColumn("waterway_m_modified",
when(isWaterway('tags) and not('version === 1 and 'minorVersion === 0), 'delta)
.otherwise(lit(0)))
.withColumn("roads_added",
when(isRoad('tags) and 'version === 1 and 'minorVersion === 0, lit(1))
.otherwise(lit(0)))
.withColumn("roads_modified",
when(isRoad('tags) and not('version === 1 and 'minorVersion === 0), lit(1))
.otherwise(lit(0)))
.withColumn("waterways_added",
when(isWaterway('tags) and 'version === 1 and 'minorVersion === 0, lit(1))
.otherwise(lit(0)))
.withColumn("waterways_modified",
when(isWaterway('tags) and not('version === 1 and 'minorVersion === 0), lit(1))
.otherwise(lit(0)))
.withColumn("buildings_added",
when(isBuilding('tags) and 'version === 1 and 'minorVersion === 0, lit(1))
.otherwise(lit(0)))
.withColumn("buildings_modified",
when(isBuilding('tags) and not('version === 1 and 'minorVersion === 0), lit(1))
.otherwise(lit(0)))
.withColumn("pois_added",
when(isPOI('tags) and 'version === 1 and 'minorVersion === 0, lit(1))
.otherwise(lit(0)))
.withColumn("pois_modified",
when(isPOI('tags) and not('version === 1 and 'minorVersion === 0), lit(1))
.otherwise(lit(0)))
.groupBy('changeset)
.agg(
sum('road_m_added / 1000).as('road_km_added),
sum('road_m_modified / 1000).as('road_km_modified),
sum('waterway_m_added / 1000).as('waterway_km_added),
sum('waterway_m_modified / 1000).as('waterway_km_modified),
sum('roads_added).as('roads_added),
sum('roads_modified).as('roads_modified),
sum('waterways_added).as('waterways_added),
sum('waterways_modified).as('waterways_modified),
sum('buildings_added).as('buildings_added),
sum('buildings_modified).as('buildings_modified),
sum('pois_added).as('pois_added),
sum('pois_modified).as('pois_modified),
count_values(flatten(collect_list('countries))) as 'countries
)

val pointChangesetStats = pointGeoms
.withColumn("pois_added",
when(isPOI('tags) and 'version === 1, lit(1))
.otherwise(lit(0)))
.withColumn("pois_modified",
when(isPOI('tags) and 'version > 1, lit(1))
.otherwise(lit(0)))
.groupBy('changeset)
.agg(
sum('pois_added) as 'pois_added,
sum('pois_modified) as 'pois_modified,
count_values(flatten(collect_list('countries))) as 'countries
)

// coalesce values to deal with nulls introduced by the outer join
val rawChangesetStats = wayChangesetStats
.withColumnRenamed("pois_added", "way_pois_added")
.withColumnRenamed("pois_modified", "way_pois_modified")
.withColumnRenamed("countries", "way_countries")
.join(pointChangesetStats
.withColumnRenamed("pois_added", "node_pois_added")
.withColumnRenamed("pois_modified", "node_pois_modified")
.withColumnRenamed("countries", "node_countries"),
Seq("changeset"),
"full_outer")
.withColumn("road_km_added", coalesce('road_km_added, lit(0)))
.withColumn("road_km_modified", coalesce('road_km_modified, lit(0)))
.withColumn("waterway_km_added", coalesce('waterway_km_added, lit(0)))
.withColumn("waterway_km_modified", coalesce('waterway_km_modified, lit(0)))
.withColumn("roads_added", coalesce('roads_added, lit(0)))
.withColumn("roads_modified", coalesce('roads_modified, lit(0)))
.withColumn("waterways_added", coalesce('waterways_added, lit(0)))
.withColumn("waterways_modified", coalesce('waterways_modified, lit(0)))
.withColumn("buildings_added", coalesce('buildings_added, lit(0)))
.withColumn("buildings_modified", coalesce('buildings_modified, lit(0)))
.withColumn("pois_added",
coalesce('way_pois_added, lit(0)) + coalesce('node_pois_added, lit(0)))
.withColumn("pois_modified",
coalesce('way_pois_modified, lit(0)) + coalesce('node_pois_modified, lit(0)))
.withColumn("countries", merge_counts('node_countries, 'way_countries))


rawChangesetStats
.repartition(50)
.write
.mode(SaveMode.Overwrite)
.orc(output.resolve("centroid-stats").toString)

spark.stop()
}
}
)

2 changes: 1 addition & 1 deletion src/common/src/main/scala/osmesa/common/ProcessOSM.scala
Original file line number Diff line number Diff line change
@@ -726,7 +726,7 @@ object ProcessOSM {
val countryLookup = new CountryLookup()

partition.map { row =>
val countryCodes = Option(row.getAs[Array[Byte]]("geom")).map(_.readWKB) match {
val countryCodes = Option(row.getAs[jts.Geometry]("geom")) match {
case Some(geom) => countryLookup.lookup(geom).map(x => x.code)
case None => Seq.empty[String]
}