Skip to content

Commit bea02ab

Browse files
authored
Add reward_override_entity_key to subscriber mapping activity (#1003)
- Handle reward_override_entity_key in SubscriberMappingActivityIngestReportV1 report. - Add reward_override_entity_key field to the subscriber_mapping_activity table - Add reward_override_entity_key to SubscriberReward proto - Handle reward_override_entity_key in reward_index (override the key/address in reward_index table)
1 parent 5365699 commit bea02ab

File tree

12 files changed

+289
-27
lines changed

12 files changed

+289
-27
lines changed

.github/workflows/CI.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ jobs:
120120
- 5432:5432
121121
localstack:
122122
image: localstack/localstack:latest
123-
environment:
123+
env:
124124
SERVICES: s3
125125
EAGER_SERVICE_LOADING: 1
126126
ports:

Cargo.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aws_local/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tokio::sync::Mutex;
1111
use tonic::transport::Uri;
1212
use uuid::Uuid;
1313

14-
pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:4566";
14+
pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://localstack:4566";
1515

1616
pub fn gen_bucket_name() -> String {
1717
format!("mvr-{}-{}", Uuid::new_v4(), Utc::now().timestamp_millis())
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE IF EXISTS subscriber_mapping_activity ADD COLUMN IF NOT EXISTS reward_override_entity_key TEXT;

mobile_verifier/src/reward_shares.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,9 @@ impl MapperShares {
230230
subscriber_id: mas.subscriber_id,
231231
discovery_location_amount,
232232
verification_mapping_amount,
233+
reward_override_entity_key: mas
234+
.reward_override_entity_key
235+
.unwrap_or_default(),
233236
})),
234237
},
235238
)
@@ -831,6 +834,7 @@ mod test {
831834
subscriber_id: n.encode_to_vec(),
832835
discovery_reward_shares: 30,
833836
verification_reward_shares: 30,
837+
reward_override_entity_key: None,
834838
})
835839
}
836840

mobile_verifier/src/subscriber_mapping_activity.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,15 @@ where
189189
if !verify_known_carrier_key(authorization_verifier, &activity.carrier_pub_key).await? {
190190
return Ok(SubscriberReportVerificationStatus::InvalidCarrierKey);
191191
};
192-
if !verify_subscriber_id(entity_verifier, &activity.subscriber_id).await? {
192+
if !verify_entity(&entity_verifier, &activity.subscriber_id).await? {
193193
return Ok(SubscriberReportVerificationStatus::InvalidSubscriberId);
194194
};
195+
if let Some(rek) = &activity.reward_override_entity_key {
196+
// use UTF8(key_serialization) as bytea
197+
if !verify_entity(entity_verifier, &rek.clone().into_bytes()).await? {
198+
return Ok(SubscriberReportVerificationStatus::InvalidRewardOverrideEntityKey);
199+
};
200+
}
195201
Ok(SubscriberReportVerificationStatus::Valid)
196202
}
197203

@@ -209,16 +215,16 @@ where
209215
.map_err(anyhow::Error::from)
210216
}
211217

212-
async fn verify_subscriber_id<EV>(
218+
async fn verify_entity<EV>(
213219
entity_verifier: impl AsRef<EV>,
214-
subscriber_id: &[u8],
220+
entity_id: &[u8],
215221
) -> anyhow::Result<bool>
216222
where
217223
EV: EntityVerifier,
218224
{
219225
entity_verifier
220226
.as_ref()
221-
.verify_rewardable_entity(subscriber_id)
227+
.verify_rewardable_entity(entity_id)
222228
.await
223229
.map_err(anyhow::Error::from)
224230
}
@@ -247,6 +253,7 @@ pub struct SubscriberMappingActivity {
247253
pub verification_reward_shares: u64,
248254
pub received_timestamp: DateTime<Utc>,
249255
pub carrier_pub_key: PublicKeyBinary,
256+
pub reward_override_entity_key: Option<String>,
250257
}
251258

252259
impl TryFrom<SubscriberMappingActivityIngestReportV1> for SubscriberMappingActivity {
@@ -257,12 +264,19 @@ impl TryFrom<SubscriberMappingActivityIngestReportV1> for SubscriberMappingActiv
257264
.report
258265
.ok_or_else(|| anyhow::anyhow!("SubscriberMappingActivityReqV1 not found"))?;
259266

267+
let reward_override_entity_key = if report.reward_override_entity_key.is_empty() {
268+
None
269+
} else {
270+
Some(report.reward_override_entity_key)
271+
};
272+
260273
Ok(Self {
261274
subscriber_id: report.subscriber_id,
262275
discovery_reward_shares: report.discovery_reward_shares,
263276
verification_reward_shares: report.verification_reward_shares,
264277
received_timestamp: value.received_timestamp.to_timestamp_millis()?,
265278
carrier_pub_key: PublicKeyBinary::from(report.carrier_pub_key),
279+
reward_override_entity_key,
266280
})
267281
}
268282
}
@@ -274,4 +288,39 @@ pub struct SubscriberMappingShares {
274288
pub discovery_reward_shares: u64,
275289
#[sqlx(try_from = "i64")]
276290
pub verification_reward_shares: u64,
291+
292+
pub reward_override_entity_key: Option<String>,
293+
}
294+
295+
#[cfg(test)]
296+
mod tests {
297+
use super::SubscriberMappingActivity;
298+
use helium_proto::services::poc_mobile::SubscriberMappingActivityIngestReportV1;
299+
300+
#[test]
301+
fn try_from_subscriber_mapping_activity_check_entity_key() {
302+
// Make sure reward_override_entity_key empty string transforms to None
303+
let smair = SubscriberMappingActivityIngestReportV1 {
304+
received_timestamp: 1,
305+
report: Some({
306+
helium_proto::services::poc_mobile::SubscriberMappingActivityReqV1 {
307+
subscriber_id: vec![10],
308+
discovery_reward_shares: 2,
309+
verification_reward_shares: 3,
310+
timestamp: 4,
311+
carrier_pub_key: vec![11],
312+
signature: vec![12],
313+
reward_override_entity_key: "".to_string(),
314+
}
315+
}),
316+
};
317+
let mut smair2 = smair.clone();
318+
smair2.report.as_mut().unwrap().reward_override_entity_key = "key".to_string();
319+
320+
let res = SubscriberMappingActivity::try_from(smair).unwrap();
321+
assert!(res.reward_override_entity_key.is_none());
322+
323+
let res = SubscriberMappingActivity::try_from(smair2).unwrap();
324+
assert_eq!(res.reward_override_entity_key, Some("key".to_string()));
325+
}
277326
}

mobile_verifier/src/subscriber_mapping_activity/db.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,22 @@ pub async fn save(
1212
transaction: &mut Transaction<'_, Postgres>,
1313
ingest_reports: impl Stream<Item = anyhow::Result<SubscriberMappingActivity>>,
1414
) -> anyhow::Result<()> {
15-
const NUM_IN_BATCH: usize = (u16::MAX / 5) as usize;
15+
const NUM_IN_BATCH: usize = (u16::MAX / 6) as usize;
1616

1717
ingest_reports
1818
.try_chunks(NUM_IN_BATCH)
1919
.err_into::<anyhow::Error>()
2020
.try_fold(transaction, |txn, chunk| async move {
21-
QueryBuilder::new("INSERT INTO subscriber_mapping_activity(subscriber_id, discovery_reward_shares, verification_reward_shares, received_timestamp, inserted_at)")
21+
QueryBuilder::new(r#"INSERT INTO subscriber_mapping_activity(
22+
subscriber_id, discovery_reward_shares, verification_reward_shares, received_timestamp, inserted_at, reward_override_entity_key)"#)
2223
.push_values(chunk, |mut b, activity| {
2324

2425
b.push_bind(activity.subscriber_id)
2526
.push_bind(activity.discovery_reward_shares as i64)
2627
.push_bind(activity.verification_reward_shares as i64)
2728
.push_bind(activity.received_timestamp)
28-
.push_bind(Utc::now());
29+
.push_bind(Utc::now())
30+
.push_bind(activity.reward_override_entity_key);
2931
})
3032
.push("ON CONFLICT (subscriber_id, received_timestamp) DO NOTHING")
3133
.build()
@@ -45,7 +47,7 @@ pub async fn rewardable_mapping_activity(
4547
) -> anyhow::Result<Vec<SubscriberMappingShares>> {
4648
sqlx::query_as(
4749
r#"
48-
SELECT DISTINCT ON (subscriber_id) subscriber_id, discovery_reward_shares, verification_reward_shares
50+
SELECT DISTINCT ON (subscriber_id) subscriber_id, discovery_reward_shares, verification_reward_shares, reward_override_entity_key
4951
FROM subscriber_mapping_activity
5052
WHERE received_timestamp >= $1
5153
AND received_timestamp < $2

mobile_verifier/tests/integrations/common/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use mobile_config::{
1919
sub_dao_epoch_reward_info::EpochRewardInfo,
2020
};
2121
use mobile_verifier::{
22-
boosting_oracles::AssignedCoverageObjects, GatewayResolution, GatewayResolver, PriceInfo,
22+
boosting_oracles::AssignedCoverageObjects,
23+
subscriber_mapping_activity::SubscriberMappingShares, GatewayResolution, GatewayResolver,
24+
PriceInfo,
2325
};
2426
use rust_decimal::{prelude::ToPrimitive, Decimal};
2527
use rust_decimal_macros::dec;
@@ -446,6 +448,12 @@ impl AsStringKeyedMapKey for PromotionReward {
446448
self.entity.to_owned()
447449
}
448450
}
451+
impl AsStringKeyedMapKey for SubscriberMappingShares {
452+
fn key(&self) -> String {
453+
use helium_proto::Message;
454+
String::decode(self.subscriber_id.as_bytes()).expect("decode subscriber id")
455+
}
456+
}
449457

450458
impl<V: AsStringKeyedMapKey + Clone> AsStringKeyedMap<V> for Vec<V> {
451459
fn as_keyed_map(&self) -> HashMap<String, V>

mobile_verifier/tests/integrations/rewarder_mappers.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,34 @@ async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> {
7070
Ok(())
7171
}
7272

73+
#[sqlx::test]
74+
async fn reward_mapper_check_entity_key_db(pool: PgPool) {
75+
let reward_info = reward_info_24_hours();
76+
// seed db
77+
let mut txn = pool.clone().begin().await.unwrap();
78+
seed_mapping_data(reward_info.epoch_period.end, &mut txn)
79+
.await
80+
.unwrap();
81+
txn.commit().await.expect("db txn failed");
82+
83+
let rewardable_mapping_activity = subscriber_mapping_activity::db::rewardable_mapping_activity(
84+
&pool,
85+
&reward_info.epoch_period,
86+
)
87+
.await
88+
.unwrap();
89+
90+
let sub_map = rewardable_mapping_activity.as_keyed_map();
91+
let sub_1 = sub_map.get(SUBSCRIBER_1).expect("sub 1");
92+
let sub_3 = sub_map.get(SUBSCRIBER_3).expect("sub 3");
93+
94+
assert!(sub_1.reward_override_entity_key.is_none());
95+
assert_eq!(
96+
sub_3.reward_override_entity_key,
97+
Some("entity key".to_string())
98+
);
99+
}
100+
73101
async fn seed_mapping_data(
74102
ts: DateTime<Utc>,
75103
txn: &mut Transaction<'_, Postgres>,
@@ -84,27 +112,31 @@ async fn seed_mapping_data(
84112
discovery_reward_shares: 30,
85113
verification_reward_shares: 0,
86114
carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(),
115+
reward_override_entity_key: None,
87116
},
88117
SubscriberMappingActivity {
89118
received_timestamp: ts - ChronoDuration::hours(2),
90119
subscriber_id: SUBSCRIBER_1.to_string().encode_to_vec(),
91120
discovery_reward_shares: 30,
92121
verification_reward_shares: 0,
93122
carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(),
123+
reward_override_entity_key: None,
94124
},
95125
SubscriberMappingActivity {
96126
received_timestamp: ts - ChronoDuration::hours(1),
97127
subscriber_id: SUBSCRIBER_2.to_string().encode_to_vec(),
98128
discovery_reward_shares: 30,
99129
verification_reward_shares: 0,
100130
carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(),
131+
reward_override_entity_key: None,
101132
},
102133
SubscriberMappingActivity {
103134
received_timestamp: ts - ChronoDuration::hours(1),
104135
subscriber_id: SUBSCRIBER_3.to_string().encode_to_vec(),
105136
discovery_reward_shares: 30,
106137
verification_reward_shares: 0,
107138
carrier_pub_key: PublicKeyBinary::from_str(HOTSPOT_1).unwrap(),
139+
reward_override_entity_key: Some("entity key".to_string()),
108140
},
109141
];
110142

0 commit comments

Comments
 (0)