Skip to content

Commit 3133040

Browse files
committed
Improvements and fixes
1 parent 455ecae commit 3133040

File tree

6 files changed

+49
-37
lines changed

6 files changed

+49
-37
lines changed

mqtt-tracker/create_es_sink.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
source .env
44

5-
curl -i -X PUT -H "Content-Type:application/json" \
5+
curl -s -X PUT -H "Content-Type:application/json" \
66
http://localhost:8083/connectors/sink-elastic-phone_data-00/config \
77
-d '{
88
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
@@ -18,5 +18,5 @@ curl -i -X PUT -H "Content-Type:application/json" \
1818
"key.ignore": "true",
1919
"schema.ignore": "true",
2020
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
21-
}'
21+
}' | jq '.'
2222

mqtt-tracker/data/ddl/users.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ CREATE TABLE USERS (USERID VARCHAR(255),
1414
SHARE_LOCATION_OPTIN BOOLEAN,
1515
PRIVACY_LOCATION_LAT DOUBLE,
1616
PRIVACY_LOCATION_LON DOUBLE,
17-
PRIVACY_ZONE_KM INT,
17+
PRIVACY_ZONE_KM DOUBLE,
1818
CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
1919
UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
2020
);

mqtt-tracker/data/kibana.ndjson

Lines changed: 11 additions & 11 deletions
Large diffs are not rendered by default.

mqtt-tracker/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ services:
147147
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
148148
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
149149
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
150-
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/data/connector-jars'
150+
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars'
151151
# External secrets config
152152
# See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
153153
CONNECT_CONFIG_PROVIDERS: 'file'

mqtt-tracker/load_sample_data.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
#!/bin/bash
22

3+
docker exec kafka-1 /usr/bin/kafka-topics --zookeeper zookeeper:2181 --create --topic data_mqtt --replication-factor 3 --partitions 6
4+
35
kafkacat -b localhost:9092 -t data_mqtt -K: -P -T -l ./data/dummy_data.kcat

mqtt-tracker/mqtt_demo.adoc

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,6 @@ SELECT WHO,
212212
TRACKER_ID,
213213
BATTERY_PCT,
214214
BATTERY_STATUS,
215-
REPORT_TRIGGER,
216215
CONNECTIVITY_STATUS
217216
FROM PHONE_DATA;
218217
----
@@ -251,18 +250,6 @@ curl 'http://localhost:5601/api/telemetry/v2/optIn' -H 'kbn-xsrf: nevergonnagive
251250
252251
echo -e "Import objects"
253252
curl 'http://localhost:5601/api/saved_objects/_import?overwrite=true' -H 'Connection: keep-alive' -H 'Origin: http://localhost:5601' -H 'kbn-version: 7.4.0' --form file=@data/kibana.ndjson
254-
--
255-
-- echo -e "\n--\n+> Create Kibana index patterns"
256-
-- curl -XPOST 'http://localhost:5601/api/saved_objects/index-pattern/phone_data_idx' \
257-
-- -H 'kbn-xsrf: nevergonnagiveyouup' \
258-
-- -H 'Content-Type: application/json' \
259-
-- -d '{"attributes":{"title":"phone_data","timeFieldName":"EVENT_TIME_EPOCH_MS_TS"}}'
260-
--
261-
echo -e "\n--\n+> Set default Kibana index"
262-
curl -XPOST 'http://localhost:5601/api/kibana/settings' \
263-
-H 'kbn-xsrf: nevergonnagiveyouup' \
264-
-H 'content-type: application/json' \
265-
-d '{"changes":{"defaultIndex":"phone_data_idx"}}'
266253
----
267254

268255
* Show Kibana http://localhost:5601/app/kibana#/discover?_g=(refreshInterval:(pause:!f,value:1000),time:(from:now-24h,to:now))&_a=(columns:!(WHO,CONNECTIVITY_STATUS,BATTERY_PCT,BATTERY_STATUS,DISTANCE_FROM_VENUE),index:phone_data_idx,interval:auto,query:(language:lucene,query:''),sort:!(!(EVENT_TIME_EPOCH_MS_TS,desc)))[discovery view] & http://localhost:5601/app/kibana#/visualize/create?type=tile_map&indexPattern=phone_data_idx&_g=(refreshInterval:(pause:!t,value:0),time:(from:now-7d,mode:quick,to:now))&_a=(filters:!(),linked:!f,query:(language:lucene,query:''),uiState:(),vis:(aggs:!((enabled:!t,id:'1',params:(),schema:metric,type:count),(enabled:!t,id:'2',params:(autoPrecision:!t,field:LOCATION,isFilteredByCollar:!t,mapCenter:!(0,0),mapZoom:2,precision:2,useGeocentroid:!t),schema:segment,type:geohash_grid)),params:(addTooltip:!t,colorSchema:'Yellow%20to%20Red',heatClusterSize:1.5,isDesaturated:!t,legendPosition:bottomright,mapCenter:!(0,0),mapType:'Shaded%20Circle%20Markers',mapZoom:2,wms:(enabled:!f,options:(format:image%2Fpng,transparent:!t),selectedTmsLayer:(attribution:'%3Cp%3E%26%23169;%20%3Ca%20href%3D%22https:%2F%2Fwww.openstreetmap.org%2Fcopyright%22%3EOpenStreetMap%20contributors%3C%2Fa%3E%7C%3Ca%20href%3D%22https:%2F%2Fopenmaptiles.org%22%3EOpenMapTiles%3C%2Fa%3E%7C%3Ca%20href%3D%22https:%2F%2Fwww.maptiler.com%22%3EMapTiler%3C%2Fa%3E%7C%3Ca%20href%3D%22https:%2F%2Fwww.elastic.co%2Felastic-maps-service%22%3EElastic%20Maps%20Service%3C%2Fa%3E%3C%2Fp%3E%26%2310;',id:road_map,maxZoom:18,minZoom:0,origin:elastic_maps_service))),title:'New%20Visualization',type:tile_map))[map viz]
@@ -300,10 +287,10 @@ SET 'auto.offset.reset' = 'earliest';
300287
CREATE STREAM USERS_STREAM WITH (KAFKA_TOPIC='mysql-asgard.demo.USERS', VALUE_FORMAT='AVRO');
301288
CREATE STREAM USERS_REKEY_P6 WITH (PARTITIONS=6) AS SELECT * FROM USERS_STREAM PARTITION BY USERID;
302289
CREATE STREAM USERS_REKEY_P1 WITH (PARTITIONS=1) AS SELECT * FROM USERS_STREAM PARTITION BY USERID;
303-
PRINT USERS_REKEY_P1 LIMIT 1;
304-
CREATE TABLE USERS WITH (KAFKA_TOPIC='USERS_REKEY_P1', VALUE_FORMAT='AVRO');
290+
PRINT USERS_REKEY_P6 LIMIT 1;
291+
CREATE TABLE USERS WITH (KAFKA_TOPIC='USERS_REKEY_P6', VALUE_FORMAT='AVRO');
292+
-- CREATE TABLE USERS WITH (KAFKA_TOPIC='USERS_REKEY_P1', VALUE_FORMAT='AVRO');
305293
-- DROP TABLE USERS;
306-
-- CREATE TABLE USERS WITH (KAFKA_TOPIC='USERS_REKEY_P6', VALUE_FORMAT='AVRO');
307294
----
308295

309296
Examine the data:
@@ -316,7 +303,6 @@ SELECT TIMESTAMPTOSTRING(R.ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
316303
R.WHO,
317304
U.EMAIL,
318305
U.SHARE_LOCATION_OPTIN,
319-
R.BATTERY_STATUS,
320306
R.LAT,
321307
R.LON
322308
FROM PHONE_DATA R
@@ -346,17 +332,16 @@ Set datagen running
346332
----
347333
SET 'auto.offset.reset' = 'latest';
348334
349-
SELECT TIMESTAMPTOSTRING(ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
335+
SELECT TIMESTAMPTOSTRING(R.ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
350336
WHO
351337
,U.EMAIL AS EMAIL
352338
,CASE WHEN U.SHARE_LOCATION_OPTIN = 1 THEN LOCATION
353339
ELSE CAST(NULL AS VARCHAR)
354340
END AS LOCATION
355-
,BATTERY_PCT
356-
,BATTERY_STATUS
357341
FROM PHONE_DATA R
358342
LEFT JOIN USERS U
359-
ON R.WHO = U.ROWKEY;
343+
ON R.WHO = U.ROWKEY
344+
WHERE WHO='ivor';
360345
----
361346

362347
In a new terminal, show MySQL with KSQL still visible.
@@ -376,6 +361,29 @@ UPDATE USERS SET SHARE_LOCATION_OPTIN=TRUE WHERE USERID='ivor';
376361
UPDATE USERS SET SHARE_LOCATION_OPTIN=FALSE WHERE USERID='ivor';
377362
----
378363

364+
Looking at the data in MySQL in more detail, we can see each user can optionally specify a _privacy zone_ within which their data won't be shared, but outside of which it can.
365+
366+
[source,sql]
367+
----
368+
SELECT * FROM USERS WHERE USERID='rmoff' \G
369+
----
370+
371+
[source,sql]
372+
----
373+
*************************** 1. row ***************************
374+
USERID: rmoff
375+
376+
SHARE_LOCATION_OPTIN: 1
377+
PRIVACY_LOCATION_LAT: 53.924729
378+
PRIVACY_LOCATION_LON: -1.804453
379+
PRIVACY_ZONE_KM: 1
380+
CREATE_TS: 2019-11-14 06:54:38
381+
UPDATE_TS: 2019-11-14 06:54:38
382+
1 row in set (0.00 sec)
383+
----
384+
385+
We can apply this logic in the SQL as part of the streaming application:
386+
379387
[source,sql]
380388
----
381389
CREATE STREAM PHONE_LOCATION_OPTIN AS
@@ -392,6 +400,7 @@ CREATE STREAM PHONE_LOCATION_OPTIN AS
392400
ELSE '<No user record>'
393401
END AS LOCATION,
394402
GEO_DISTANCE (LAT,LON,PRIVACY_LOCATION_LAT,PRIVACY_LOCATION_LON,'KM') AS DISTANCE_KM_FROM_PRIVACY_ZONE,
403+
PRIVACY_ZONE_KM AS PRIVACY_ZONE_THRESHOLD_KM
395404
BATTERY_PCT,
396405
BATTERY_STATUS,
397406
U.EMAIL AS EMAIL
@@ -405,7 +414,8 @@ CREATE STREAM PHONE_LOCATION_OPTIN AS
405414
SELECT TIMESTAMPTOSTRING(ROWTIME, 'MMM-dd HH:mm:ss','Europe/London') AS TS,
406415
WHO,
407416
LOCATION,
408-
DISTANCE_KM_FROM_PRIVACY_ZONE
417+
DISTANCE_KM_FROM_PRIVACY_ZONE,
418+
PRIVACY_ZONE_THRESHOLD_KM
409419
FROM PHONE_LOCATION_OPTIN
410420
WHERE WHO='rmoff';
411421
----

0 commit comments

Comments
 (0)