Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging deployment and Railway statistics #157

Merged
merged 24 commits into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9dd9832
Add rail-related tracking features
jpolchlo Aug 1, 2019
2132083
Adjust definitions of rail features, disaggregate rail counts
jpolchlo Aug 1, 2019
d3152ee
Fix casting error with JSON
jpolchlo Aug 1, 2019
3762f23
Unify table schemas and union
jpolchlo Aug 2, 2019
e97e21f
Address minor PR review comments
jpolchlo Aug 2, 2019
729259a
Address minor PR review comments
jpolchlo Aug 2, 2019
627f3e4
Write end position in augdiff and changeset streams to checkpoints ta…
jpolchlo Aug 6, 2019
ed5e6c3
Introduce new Makefile for streaming deployment
jpolchlo Aug 16, 2019
6fd0966
Clean up and improve streaming deployment scripts
jpolchlo Aug 19, 2019
4b96c25
Make script executable
jpolchlo Aug 19, 2019
e636e4e
Further changes to deployment scripts
jpolchlo Aug 20, 2019
4ab2bcc
Be smarter about which environment vars to check
jpolchlo Aug 20, 2019
957285c
Improve batch process deployment
jpolchlo Aug 23, 2019
a4b3172
Ensure streaming services are stopped during deployment cycle; simpli…
jpolchlo Aug 23, 2019
01bf457
Modify log messages
jpolchlo Aug 23, 2019
6da0912
Allow easy addition of extra EBS volumes in batch EMR deployment
jpolchlo Sep 18, 2019
06c46d4
Remove useful components to Vectorpipe and rename ChangesetORCCreator
jpolchlo Sep 19, 2019
3349bf5
Use VP functions for timestamp/seqence number conversions
jpolchlo Sep 25, 2019
baca909
[skip ci] Minor formatting update; leave note for future work
jpolchlo Sep 26, 2019
64ae32e
Clean up references
jpolchlo Sep 26, 2019
7dd079e
Minor fixes to address comments
jpolchlo Sep 26, 2019
5a26276
Move to VP 1.1.0
jpolchlo Sep 26, 2019
115ef17
Fix description
jpolchlo Sep 26, 2019
99d812e
Remove cluster configurations
jpolchlo Sep 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package osmesa.analytics.oneoffs
jpolchlo marked this conversation as resolved.
Show resolved Hide resolved

import cats.data.{Validated, ValidatedNel}
import cats.implicits._
import com.monovore.decline._
import io.circe.generic.auto._
import io.circe.{yaml, _}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import osmesa.analytics.Analytics
import vectorpipe.sources.{ChangesetSource, Source}

import java.net.URI
import java.sql.Timestamp
import java.time.Instant
import scalaj.http.Http

/*
* Usage example:
*
* sbt "project analytics" assembly
*
* spark-submit \
* --class osmesa.analytics.oneoffs.ChangesetORCUpdater \
* ingest/target/scala-2.11/osmesa-analytics.jar \
* --changeset-source http://somewhere/diffs/ \
* --database-url $DATABASE_URL
*/
object ChangesetORCUpdater
extends CommandApp(
name = "osmesa-changeset-orc-updater",
header = "Bring existing changesets ORC file up to date using changeset stream",
main = {

import ChangesetORCUpdaterUtils._

val changesetSourceOpt =
Opts
.option[URI](
"changesets",
short = "c",
metavar = "uri",
help = "Location of replication changesets"
)
.validate("Changeset source must have trailing '/'") { _.getPath.endsWith("/") }

val endTimeOpt =
Opts
.option[Instant]("end-time",
short = "e",
metavar = "timestamp",
help = "Timestamp of stream end (of the form 2016-02-29T13:45:00Z); if absent, the time now will be used")
.orNone

val orcArg = Opts
.argument[URI]("source ORC")
.validate("URI to ORC must have an s3 or file scheme") { _.getScheme != null }
.validate("orc must be an S3 or file Uri") { uri =>
uri.getScheme.startsWith("s3") || uri.getScheme.startsWith("file")
}
.validate("orc must be an .orc file") { _.getPath.endsWith(".orc") }

val outputArg = Opts.argument[URI]("destination ORC")
.validate("Output URI must have a scheme") { _.getScheme != null }
.validate("Output URI must have an S3 or file scheme") { uri =>
uri.getScheme.startsWith("s3") || uri.getScheme.startsWith("file")
}
.validate("orc must be an .orc file") { _.getPath.endsWith(".orc") }

(changesetSourceOpt,
endTimeOpt,
orcArg,
outputArg).mapN {
(changesetSource, endTime, orcUri, outputURI) =>
implicit val spark: SparkSession = Analytics.sparkSession("ChangesetStatsUpdater")

import spark.implicits._

val df = spark.read.orc(orcUri.toString)
val lastModified = df.select(max(coalesce('closed_at, 'created_at))).first.getAs[Timestamp](0)

val startSequence = findSequenceFor(lastModified.toInstant, changesetSource)
val endSequence = endTime.map(findSequenceFor(_, changesetSource)).getOrElse(getCurrentSequence(changesetSource).sequence)

val options = Map(
Source.BaseURI -> changesetSource.toString,
Source.StartSequence -> startSequence.toString,
Source.EndSequence -> (endSequence + 1).toString // sequence range is (]; end sequence is exclusive
)

val changesets = spark.read.format(Source.Changesets).options(options).load
changesets
.drop("comments", "sequence")
jpolchlo marked this conversation as resolved.
Show resolved Hide resolved
.union(df.select(
'id,
'tags,
'created_at as 'createdAt,
'open,
'closed_at as 'closedAt,
'comments_count as 'commentsCount,
'min_lat as 'minLat,
'max_lat as 'maxLat,
'min_lon as 'minLon,
'max_lon as 'maxLon,
'num_changes as 'numChanges,
'uid,
'user)
)
.repartition(1)
.write
.orc(outputURI.toString)

spark.stop()
}
}
)

object ChangesetORCUpdaterUtils {
mojodna marked this conversation as resolved.
Show resolved Hide resolved
implicit val readInstant: Argument[Instant] = new Argument[Instant] {
override def read(string: String): ValidatedNel[String, Instant] = {
try { Validated.valid(Instant.parse(string)) }
catch { case e: Exception => Validated.invalidNel(s"Invalid time: $string (${ e.getMessage })") }
}

override def defaultMetavar: String = "time"
}

private val formatter = DateTimeFormat.forPattern("y-M-d H:m:s.SSSSSSSSS Z")

private implicit val dateTimeDecoder: Decoder[DateTime] =
Decoder.instance(a => a.as[String].map(DateTime.parse(_, formatter)))

case class Sequence(last_run: DateTime, sequence: Long)

def getCurrentSequence(baseURI: URI): Sequence = {
jpolchlo marked this conversation as resolved.
Show resolved Hide resolved
val response =
Http(baseURI.resolve("state.yaml").toString).asString

val state = yaml.parser
.parse(response.body)
.leftMap(err => err: Error)
.flatMap(_.as[Sequence])
.valueOr(throw _)

state
}

def getSequence(baseURI: URI, sequence: Long): Sequence = {
mojodna marked this conversation as resolved.
Show resolved Hide resolved
val s = f"${sequence+1}%09d"
val path = s"${s.slice(0, 3)}/${s.slice(3, 6)}/${s.slice(6, 9)}.state.txt"

val response =
Http(baseURI.resolve(path).toString).asString

val state = yaml.parser
.parse(response.body)
.leftMap(err => err: Error)
.flatMap(_.as[Sequence])
.valueOr(throw _)

state
}
jpolchlo marked this conversation as resolved.
Show resolved Hide resolved

def estimateSequenceNumber(modifiedTime: Instant, baseURI: URI): Long = {
val current = getCurrentSequence(baseURI)
val diffMinutes = (current.last_run.toInstant.getMillis/1000 - modifiedTime.getEpochSecond) / 60
current.sequence - diffMinutes
}

def findSequenceFor(modifiedTime: Instant, baseURI: URI): Long = {
var guess = estimateSequenceNumber(modifiedTime, baseURI)
val target = org.joda.time.Instant.parse(modifiedTime.toString)
mojodna marked this conversation as resolved.
Show resolved Hide resolved

while (getSequence(baseURI, guess).last_run.isAfter(target)) { guess -= 1 }
while (getSequence(baseURI, guess).last_run.isBefore(target)) { guess += 1 }

getSequence(baseURI, guess).sequence
mojodna marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,17 @@ object ChangesetStatsCreator
'tags.getItem("created_by") as 'editor,
'uid,
'user,
'created_at,
'createdAt,
mojodna marked this conversation as resolved.
Show resolved Hide resolved
'tags.getItem("comment") as 'comment,
'tags.getItem("hashtags") as 'hashtag)
.agg(first('closed_at, ignoreNulls = true) as 'closed_at)
'tags.getItem("hashtags") as 'hashtags)
.agg(first('closedAt, ignoreNulls = true) as 'closedAt)
.select(
'id,
'editor,
'uid,
'user,
'created_at as 'createdAt,
'closed_at as 'closedAt,
'createdAt,
'closedAt,
merge_sets(hashtags('comment), hashtags('hashtags)) as 'hashtags
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI,
| ?::jsonb AS measurements,
| ?::jsonb AS counts,
| ? AS total_edits,
| ? AS augmented_diffs,
| ?::integer[] AS augmented_diffs,
| current_timestamp AS updated_at
|)
|INSERT INTO changesets AS c (
Expand All @@ -41,7 +41,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI,
| measurements = (
| SELECT jsonb_object_agg(key, value)
| FROM (
| SELECT key, sum(value::numeric) AS value
| SELECT key, sum((value->>0)::numeric) AS value
mojodna marked this conversation as resolved.
Show resolved Hide resolved
| FROM (
| SELECT * from jsonb_each(c.measurements)
| UNION ALL
Expand All @@ -54,7 +54,7 @@ class ChangesetStatsForeachWriter(databaseUri: URI,
| counts = (
| SELECT jsonb_object_agg(key, value)
| FROM (
| SELECT key, sum(value::numeric) AS value
| SELECT key, sum((value->>0)::numeric) AS value
mojodna marked this conversation as resolved.
Show resolved Hide resolved
| FROM (
| SELECT * from jsonb_each(c.counts)
| UNION ALL
Expand Down
27 changes: 25 additions & 2 deletions src/analytics/src/main/scala/osmesa/analytics/stats/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,27 @@ package object stats {
isCoastline(tags) or
isPOI(tags) as 'isInterestingWay

def isLinear(tags: Column): Column = isRoad(tags) or isWaterway(tags) or isCoastline(tags) as 'isLinear
// Does this feature represent a rail-related site or area (not track)
def isRailFeature(tags: Column): Column =
array_contains(splitDelimitedValues(tags.getItem("railway")), "station") or
array_contains(splitDelimitedValues(tags.getItem("railway")), "yard") or
array_contains(splitDelimitedValues(tags.getItem("landuse")), "railway") as 'isRailSite

// Does this feature represent a section of rail track
def isRailLine(tags: Column): Column = not(isRailFeature(tags)) and tags.getItem("railway").isNotNull as 'isRailLine

// Does this feature represent a rail-related entity
def isRailway(tags: Column): Column =
tags.getItem("railway").isNotNull or array_contains(splitDelimitedValues(tags.getItem("landuse")), "railway") as 'isRailway

def isLinear(tags: Column): Column = isRoad(tags) or isWaterway(tags) or isCoastline(tags) or isRailLine(tags) as 'isLinear

def isOther(tags: Column): Column = isTagged(tags) and
not(isRoad((tags))) and
not(isWaterway(tags)) and
not(isCoastline(tags)) and
not(isBuilding(tags)) and
not(isRailway(tags)) and
not(isPOI(tags)) as 'isOther

def DefaultMeasurements(implicit sparkSession: SparkSession): Column = {
Expand All @@ -67,7 +81,10 @@ package object stats {
lit("waterway_km_deleted"), (isWaterway('tags) and !'visible).cast(IntegerType) * 'delta / 1000,
lit("coastline_km_added"), (isCoastline('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000,
lit("coastline_km_modified"), (isCoastline('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000,
lit("coastline_km_deleted"), (isCoastline('tags) and !'visible).cast(IntegerType) * 'delta / 1000
lit("coastline_km_deleted"), (isCoastline('tags) and !'visible).cast(IntegerType) * 'delta / 1000,
lit("railline_km_added"), (isRailLine('tags) and isNew('version, 'minorVersion)).cast(IntegerType) * 'delta / 1000,
lit("railline_km_modified"), (isRailLine('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType) * 'delta / 1000,
lit("railline_km_deleted"), (isRailLine('tags) and !'visible).cast(IntegerType) * 'delta / 1000
)) as 'measurements
}

Expand All @@ -87,6 +104,12 @@ package object stats {
lit("buildings_added"), (isBuilding('tags) and isNew('version, 'minorVersion)).cast(IntegerType),
lit("buildings_modified"), (isBuilding('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType),
lit("buildings_deleted"), (isBuilding('tags) and !'visible).cast(IntegerType),
lit("railway_features_added"), (isRailFeature('tags) and isNew('version, 'minorVersion)).cast(IntegerType),
lit("railway_features_modified"), (isRailFeature('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType),
lit("railway_features_deleted"), (isRailFeature('tags) and !'visible).cast(IntegerType),
lit("raillines_added"), (isRailLine('tags) and isNew('version, 'minorVersion)).cast(IntegerType),
lit("raillines_modified"), (isRailLine('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType),
lit("raillines_deleted"), (isRailLine('tags) and !'visible).cast(IntegerType),
lit("pois_added"), (isPOI('tags) and isNew('version, 'minorVersion)).cast(IntegerType),
lit("pois_modified"), (isPOI('tags) and not(isNew('version, 'minorVersion)) and 'visible).cast(IntegerType),
lit("pois_deleted"), (isPOI('tags) and !'visible).cast(IntegerType),
Expand Down