Skip to content

Commit f642d86

Browse files
authored
Update mobile-verifier to process new subscriber mapping activity reports (#981)
* Add receiving subscriber mapper activity requests to mobile ingestor * update mobile verifier to process subscriber mapper activity messages * update rewarder to use new subscriber mapping activity * making saving actvity idempotent and fix tests * remove unused subscriber_location and subscriber_verified mapping from mobile verifier * remove unused tests * Change to spawn tokio task and fix lifetime issues * remove commented out code * refactoring of subscriber mapping activity processing * move send/sync to trait definition * remove associated type for AuthorizationVerifier and EntityVerifier * update proto dep back to master * update beacon
1 parent e810975 commit f642d86

File tree

23 files changed

+534
-1079
lines changed

23 files changed

+534
-1079
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,7 @@ sqlx = { git = "https://github.com/launchbadge/sqlx.git", rev = "42dd78fe931df65
135135
# [patch.'https://github.com/helium/proto']
136136
# helium-proto = { path = "../../proto" }
137137
# beacon = { path = "../../proto" }
138+
139+
# [patch.'https://github.com/helium/proto']
140+
# helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "jg/disco-shares-v2" }
141+

file_store/src/file_info.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward";
175175
pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund";
176176
pub const UNIQUE_CONNECTIONS_REPORT: &str = "unique_connections_report";
177177
pub const VERIFIED_UNIQUE_CONNECTIONS_REPORT: &str = "verified_unique_connections_report";
178+
pub const SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT: &str =
179+
"subscriber_mapping_activity_ingest_report";
180+
pub const VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT: &str =
181+
"verified_subscriber_mapping_activity_report";
178182

179183
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)]
180184
#[serde(rename_all = "snake_case")]
@@ -241,6 +245,8 @@ pub enum FileType {
241245
RadioUsageStatsReq,
242246
UniqueConnectionsReport,
243247
VerifiedUniqueConnectionsReport,
248+
SubscriberMappingActivityIngestReport,
249+
VerifiedSubscriberMappingActivityReport,
244250
}
245251

246252
impl fmt::Display for FileType {
@@ -328,6 +334,12 @@ impl FileType {
328334
Self::RadioUsageStatsReq => RADIO_USAGE_STATS_REQ,
329335
Self::UniqueConnectionsReport => UNIQUE_CONNECTIONS_REPORT,
330336
Self::VerifiedUniqueConnectionsReport => VERIFIED_UNIQUE_CONNECTIONS_REPORT,
337+
Self::SubscriberMappingActivityIngestReport => {
338+
SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT
339+
}
340+
Self::VerifiedSubscriberMappingActivityReport => {
341+
VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT
342+
}
331343
}
332344
}
333345
}
@@ -411,6 +423,12 @@ impl FromStr for FileType {
411423
RADIO_USAGE_STATS_REQ => Self::RadioUsageStatsReq,
412424
UNIQUE_CONNECTIONS_REPORT => Self::UniqueConnectionsReport,
413425
VERIFIED_UNIQUE_CONNECTIONS_REPORT => Self::VerifiedUniqueConnectionsReport,
426+
SUBSCRIBER_MAPPING_ACTIVITY_INGEST_REPORT => {
427+
Self::SubscriberMappingActivityIngestReport
428+
}
429+
VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT => {
430+
Self::VerifiedSubscriberMappingActivityReport
431+
}
414432
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
415433
};
416434
Ok(result)

file_store/src/traits/file_sink_write.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,3 +293,14 @@ impl_file_sink!(
293293
FileType::RewardManifest.to_str(),
294294
"reward_manifest"
295295
);
296+
impl_file_sink!(
297+
poc_mobile::SubscriberMappingActivityIngestReportV1,
298+
FileType::SubscriberMappingActivityIngestReport.to_str(),
299+
"subscriber_mapping_activity_ingest_report"
300+
);
301+
302+
impl_file_sink!(
303+
poc_mobile::VerifiedSubscriberMappingActivityReportV1,
304+
FileType::VerifiedSubscriberMappingActivityReport.to_str(),
305+
"verified_subscriber_mapping_activity_report"
306+
);

file_store/src/traits/msg_verify.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature);
106106
impl_msg_verify!(poc_mobile::HexUsageStatsReqV1, signature);
107107
impl_msg_verify!(poc_mobile::RadioUsageStatsReqV1, signature);
108108
impl_msg_verify!(poc_mobile::UniqueConnectionsReqV1, signature);
109+
impl_msg_verify!(poc_mobile::SubscriberMappingActivityReqV1, signature);
109110

110111
#[cfg(test)]
111112
mod test {

ingest/src/server_mobile.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use helium_proto::services::poc_mobile::{
2020
RadioUsageStatsResV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
2121
ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1,
2222
SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1,
23-
SubscriberLocationReqV1, SubscriberLocationRespV1,
23+
SubscriberLocationReqV1, SubscriberLocationRespV1, SubscriberMappingActivityIngestReportV1,
24+
SubscriberMappingActivityReqV1, SubscriberMappingActivityResV1,
2425
SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventReqV1,
2526
SubscriberVerifiedMappingEventResV1, UniqueConnectionsIngestReportV1,
2627
WifiHeartbeatIngestReportV1, WifiHeartbeatReqV1, WifiHeartbeatRespV1,
@@ -55,6 +56,7 @@ pub struct GrpcServer<AV> {
5556
hex_usage_stats_event_sink: FileSinkClient<HexUsageStatsIngestReportV1>,
5657
radio_usage_stats_event_sink: FileSinkClient<RadioUsageStatsIngestReportV1>,
5758
unique_connections_sink: FileSinkClient<UniqueConnectionsIngestReportV1>,
59+
subscriber_mapping_activity_sink: FileSinkClient<SubscriberMappingActivityIngestReportV1>,
5860
required_network: Network,
5961
address: SocketAddr,
6062
api_token: MetadataValue<Ascii>,
@@ -103,6 +105,7 @@ where
103105
hex_usage_stats_event_sink: FileSinkClient<HexUsageStatsIngestReportV1>,
104106
radio_usage_stats_event_sink: FileSinkClient<RadioUsageStatsIngestReportV1>,
105107
unique_connections_sink: FileSinkClient<UniqueConnectionsIngestReportV1>,
108+
subscriber_mapping_activity_sink: FileSinkClient<SubscriberMappingActivityIngestReportV1>,
106109
required_network: Network,
107110
address: SocketAddr,
108111
api_token: MetadataValue<Ascii>,
@@ -121,6 +124,7 @@ where
121124
hex_usage_stats_event_sink,
122125
radio_usage_stats_event_sink,
123126
unique_connections_sink,
127+
subscriber_mapping_activity_sink,
124128
required_network,
125129
address,
126130
api_token,
@@ -478,6 +482,33 @@ where
478482
Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id }))
479483
}
480484

485+
async fn submit_subscriber_mapping_activity(
486+
&self,
487+
request: Request<SubscriberMappingActivityReqV1>,
488+
) -> GrpcResult<SubscriberMappingActivityResV1> {
489+
let timestamp = Utc::now().timestamp_millis() as u64;
490+
let event = request.into_inner();
491+
492+
custom_tracing::record_b58("subscriber_id", &event.subscriber_id);
493+
custom_tracing::record_b58("pub_key", &event.carrier_pub_key);
494+
495+
let report = self
496+
.verify_public_key(&event.carrier_pub_key)
497+
.and_then(|public_key| self.verify_network(public_key))
498+
.and_then(|public_key| self.verify_signature(public_key, event))
499+
.map(|(_, event)| SubscriberMappingActivityIngestReportV1 {
500+
received_timestamp: timestamp,
501+
report: Some(event),
502+
})?;
503+
504+
_ = self
505+
.subscriber_mapping_activity_sink
506+
.write(report, [])
507+
.await;
508+
509+
Ok(Response::new(SubscriberMappingActivityResV1 { timestamp }))
510+
}
511+
481512
async fn submit_hex_usage_stats_report(
482513
&self,
483514
request: Request<HexUsageStatsReqV1>,
@@ -690,6 +721,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
690721
)
691722
.await?;
692723

724+
let (subscriber_mapping_activity_sink, subscriber_mapping_activity_server) =
725+
SubscriberMappingActivityIngestReportV1::file_sink(
726+
store_base_path,
727+
file_upload.clone(),
728+
FileSinkCommitStrategy::Automatic,
729+
FileSinkRollTime::Duration(settings.roll_time),
730+
env!("CARGO_PKG_NAME"),
731+
)
732+
.await?;
733+
693734
let Some(api_token) = settings
694735
.token
695736
.as_ref()
@@ -715,6 +756,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
715756
hex_usage_stats_event_sink,
716757
radio_usage_stats_event_sink,
717758
unique_connections_sink,
759+
subscriber_mapping_activity_sink,
718760
settings.network,
719761
settings.listen_addr,
720762
api_token,
@@ -741,6 +783,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
741783
.add_task(hex_usage_stats_event_server)
742784
.add_task(radio_usage_stats_event_server)
743785
.add_task(unique_connections_server)
786+
.add_task(subscriber_mapping_activity_server)
744787
.add_task(grpc_server)
745788
.build()
746789
.start()

ingest/tests/common/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use helium_proto::services::{
1919
};
2020
use ingest::server_mobile::GrpcServer;
2121
use mobile_config::client::authorization_client::AuthorizationVerifier;
22+
use mobile_config::client::ClientError;
2223
use prost::Message;
2324
use rand::rngs::OsRng;
2425
use std::{net::SocketAddr, sync::Arc, time::Duration};
@@ -42,13 +43,11 @@ impl MockAuthorizationClient {
4243

4344
#[async_trait]
4445
impl AuthorizationVerifier for MockAuthorizationClient {
45-
type Error = anyhow::Error;
46-
4746
async fn verify_authorized_key(
4847
&self,
4948
_pubkey: &PublicKeyBinary,
5049
_role: NetworkKeyRole,
51-
) -> anyhow::Result<bool> {
50+
) -> Result<bool, ClientError> {
5251
Ok(true)
5352
}
5453
}
@@ -80,6 +79,8 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
8079
let (hex_usage_stat_tx, hex_usage_stat_rx) = tokio::sync::mpsc::channel(10);
8180
let (radio_usage_stat_tx, radio_usage_stat_rx) = tokio::sync::mpsc::channel(10);
8281
let (unique_connections_tx, unique_connections_rx) = tokio::sync::mpsc::channel(10);
82+
let (subscriber_mapping_activity_tx, _subscriber_mapping_activity_rx) =
83+
tokio::sync::mpsc::channel(10);
8384

8485
let auth_client = MockAuthorizationClient::new();
8586

@@ -97,6 +98,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
9798
FileSinkClient::new(hex_usage_stat_tx, "hex_usage_test_file_sink"),
9899
FileSinkClient::new(radio_usage_stat_tx, "radio_usage_test_file_sink"),
99100
FileSinkClient::new(unique_connections_tx, "noop"),
101+
FileSinkClient::new(subscriber_mapping_activity_tx, "noop"),
100102
Network::MainNet,
101103
socket_addr,
102104
api_token,

mobile_config/src/client/authorization_client.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@ use std::{sync::Arc, time::Duration};
1111

1212
#[async_trait]
1313
pub trait AuthorizationVerifier: Send + Sync + 'static {
14-
type Error;
15-
1614
async fn verify_authorized_key(
1715
&self,
1816
pubkey: &PublicKeyBinary,
1917
role: mobile_config::NetworkKeyRole,
20-
) -> Result<bool, Self::Error>;
18+
) -> Result<bool, ClientError>;
2119
}
2220

2321
#[derive(Clone)]
@@ -51,8 +49,6 @@ impl AuthorizationClient {
5149

5250
#[async_trait]
5351
impl AuthorizationVerifier for AuthorizationClient {
54-
type Error = ClientError;
55-
5652
async fn verify_authorized_key(
5753
&self,
5854
pubkey: &PublicKeyBinary,

mobile_config/src/client/entity_client.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@ use retainer::Cache;
1010
use std::{sync::Arc, time::Duration};
1111

1212
#[async_trait]
13-
pub trait EntityVerifier {
14-
type Error;
15-
16-
async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result<bool, Self::Error>;
13+
pub trait EntityVerifier: Send + Sync + 'static {
14+
async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result<bool, ClientError>;
1715
}
1816

1917
#[derive(Clone)]
@@ -27,8 +25,6 @@ pub struct EntityClient {
2725

2826
#[async_trait]
2927
impl EntityVerifier for EntityClient {
30-
type Error = ClientError;
31-
3228
async fn verify_rewardable_entity(&self, entity_id: &[u8]) -> Result<bool, ClientError> {
3329
let entity_id = entity_id.to_vec();
3430
if let Some(entity_found) = self.cache.get(&entity_id).await {
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
CREATE TABLE IF NOT EXISTS subscriber_mapping_activity (
2+
subscriber_id BYTEA NOT NULL,
3+
discovery_reward_shares BIGINT NOT NULL,
4+
verification_reward_shares BIGINT NOT NULL,
5+
received_timestamp TIMESTAMPTZ NOT NULL,
6+
inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
7+
PRIMARY KEY (subscriber_id, received_timestamp)
8+
);
9+
10+
INSERT INTO subscriber_mapping_activity(subscriber_id, discovery_reward_shares, verification_reward_shares, received_timestamp, inserted_at)
11+
SELECT subscriber_id, 30, 0, received_timestamp, created_at AS inserted_at
12+
FROM subscriber_loc_verified;
13+
14+
UPDATE subscriber_mapping_activity sma
15+
SET verification_reward_shares = svme.total_reward_points
16+
FROM subscriber_verified_mapping_event svme
17+
WHERE sma.subscriber_id = svme.subscriber_id
18+
AND sma.received_timestamp::date = svme.received_timestamp::date;

0 commit comments

Comments
 (0)