Skip to content

Commit f5b7c46

Browse files
authored
Add localstack integration and DataSetDownloaderDaemon test (#1000)
* Add AwsLocal rust module * Add dataset downloader test * Add localstack service to GithubCI
1 parent 2f730ed commit f5b7c46

File tree

16 files changed

+368
-10
lines changed

16 files changed

+368
-10
lines changed

.github/workflows/CI.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@ jobs:
118118
--health-retries 5
119119
ports:
120120
- 5432:5432
121+
localstack:
122+
image: localstack/localstack:latest
123+
environment:
124+
SERVICES: s3
125+
EAGER_SERVICE_LOADING: 1
126+
ports:
127+
- 4566:4566
121128
steps:
122129
- name: Install Dependencies
123130
run: |
@@ -229,4 +236,5 @@ jobs:
229236
RELEASE_VERSION: ${{ github.event.inputs.release_version }}
230237
run: |
231238
chmod +x ./.github/scripts/make_debian.sh
232-
./.github/scripts/make_debian.sh
239+
./.github/scripts/make_debian.sh
240+

.github/workflows/DockerCI.yml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,13 @@ jobs:
176176
--health-retries 5
177177
ports:
178178
- 5432:5432
179+
localstack:
180+
image: localstack/localstack:latest
181+
env:
182+
SERVICES: s3
183+
EAGER_SERVICE_LOADING: 1
184+
ports:
185+
- 4566:4566
179186
steps:
180187
- name: Checkout Repository
181188
uses: actions/checkout@v4
@@ -218,4 +225,4 @@ jobs:
218225
cargo test -p ${{ matrix.package }} -- --include-ignored
219226
220227
- name: Fix Permissions for Caching
221-
run: sudo chown -R $(whoami):$(whoami) target
228+
run: sudo chown -R $(whoami):$(whoami) target

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ members = [
2626
"solana",
2727
"task_manager",
2828
"hex_assignments",
29+
"aws_local"
2930
]
3031
resolver = "2"
3132

@@ -113,6 +114,10 @@ tokio-util = "0"
113114
uuid = { version = "1", features = ["v4", "serde"] }
114115
tower-http = { version = "0", features = ["trace"] }
115116
derive_builder = "0"
117+
aws-config = "0.51"
118+
aws-sdk-s3 = "0.21"
119+
aws-types = { version = "0.51", features = ["hardcoded-credentials"]}
120+
tempfile = "3"
116121

117122
[patch.crates-io]
118123
anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madninja/const_pubkey" }

aws_local/Cargo.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "aws-local"
3+
version = "0.1.0"
4+
authors.workspace = true
5+
license.workspace = true
6+
edition.workspace = true
7+
8+
[dependencies]
9+
aws-config = {workspace = true}
10+
aws-sdk-s3 = {workspace = true}
11+
aws-types = {workspace = true, features = ["hardcoded-credentials"]}
12+
tokio = {workspace = true}
13+
triggered = {workspace = true}
14+
tonic = {workspace = true}
15+
chrono = {workspace = true}
16+
prost = {workspace = true}
17+
anyhow = {workspace = true}
18+
uuid = {workspace = true}
19+
tempfile = {workspace = true}
20+
file-store = { path = "../file_store", features = ["local"] }

aws_local/src/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
It helps to run tests with [localstack](https://www.localstack.cloud/)

aws_local/src/lib.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use anyhow::{anyhow, Result};
2+
use aws_config::meta::region::RegionProviderChain;
3+
use aws_sdk_s3::{Client, Endpoint, Region};
4+
use chrono::Utc;
5+
use file_store::traits::MsgBytes;
6+
use file_store::{file_sink, file_upload, FileStore, FileType, Settings};
7+
use std::path::Path;
8+
use std::{str::FromStr, sync::Arc};
9+
use tempfile::TempDir;
10+
use tokio::sync::Mutex;
11+
use tonic::transport::Uri;
12+
use uuid::Uuid;
13+
14+
pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:4566";
15+
16+
pub fn gen_bucket_name() -> String {
17+
format!("mvr-{}-{}", Uuid::new_v4(), Utc::now().timestamp_millis())
18+
}
19+
20+
// Interacts with the locastack.
21+
// Used to create mocked aws buckets and files.
22+
pub struct AwsLocal {
23+
pub fs_settings: Settings,
24+
pub file_store: FileStore,
25+
pub aws_client: aws_sdk_s3::Client,
26+
}
27+
28+
impl AwsLocal {
29+
async fn create_aws_client(settings: &Settings) -> aws_sdk_s3::Client {
30+
let endpoint: Option<Endpoint> = match &settings.endpoint {
31+
Some(endpoint) => Uri::from_str(endpoint)
32+
.map(Endpoint::immutable)
33+
.map(Some)
34+
.unwrap(),
35+
_ => None,
36+
};
37+
let region = Region::new(settings.region.clone());
38+
let region_provider = RegionProviderChain::first_try(region).or_default_provider();
39+
40+
let mut config = aws_config::from_env().region(region_provider);
41+
config = config.endpoint_resolver(endpoint.unwrap());
42+
43+
let creds = aws_types::credentials::Credentials::from_keys(
44+
settings.access_key_id.as_ref().unwrap(),
45+
settings.secret_access_key.as_ref().unwrap(),
46+
None,
47+
);
48+
config = config.credentials_provider(creds);
49+
50+
let config = config.load().await;
51+
52+
Client::new(&config)
53+
}
54+
55+
pub async fn new(endpoint: &str, bucket: &str) -> AwsLocal {
56+
let settings = Settings {
57+
bucket: bucket.into(),
58+
endpoint: Some(endpoint.into()),
59+
region: "us-east-1".into(),
60+
access_key_id: Some("random".into()),
61+
secret_access_key: Some("random2".into()),
62+
};
63+
let client = Self::create_aws_client(&settings).await;
64+
client.create_bucket().bucket(bucket).send().await.unwrap();
65+
AwsLocal {
66+
aws_client: client,
67+
fs_settings: settings.clone(),
68+
file_store: file_store::FileStore::from_settings(&settings)
69+
.await
70+
.unwrap(),
71+
}
72+
}
73+
74+
pub fn fs_settings(&self) -> Settings {
75+
self.fs_settings.clone()
76+
}
77+
78+
pub async fn put_proto_to_aws<T: prost::Message + MsgBytes>(
79+
&self,
80+
items: Vec<T>,
81+
file_type: FileType,
82+
metric_name: &'static str,
83+
) -> Result<String> {
84+
let tmp_dir = TempDir::new()?;
85+
let tmp_dir_path = tmp_dir.path().to_owned();
86+
87+
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
88+
89+
let (file_upload, file_upload_server) =
90+
file_upload::FileUpload::from_settings_tm(&self.fs_settings)
91+
.await
92+
.unwrap();
93+
94+
let (item_sink, item_server) =
95+
file_sink::FileSinkBuilder::new(file_type, &tmp_dir_path, file_upload, metric_name)
96+
.auto_commit(false)
97+
.roll_time(std::time::Duration::new(15, 0))
98+
.create::<T>()
99+
.await
100+
.unwrap();
101+
102+
for item in items {
103+
item_sink.write(item, &[]).await.unwrap();
104+
}
105+
let item_recv = item_sink.commit().await.unwrap();
106+
107+
let uploaded_file = Arc::new(Mutex::new(String::default()));
108+
let up_2 = uploaded_file.clone();
109+
let mut timeout = std::time::Duration::new(5, 0);
110+
111+
tokio::spawn(async move {
112+
let uploaded_files = item_recv.await.unwrap().unwrap();
113+
assert!(uploaded_files.len() == 1);
114+
let mut val = up_2.lock().await;
115+
*val = uploaded_files.first().unwrap().to_string();
116+
117+
// After files uploaded to aws the must be removed.
118+
// So we wait when dir will be empty.
119+
// It means all files are uploaded to aws
120+
loop {
121+
if is_dir_has_files(&tmp_dir_path) {
122+
let dur = std::time::Duration::from_millis(10);
123+
tokio::time::sleep(dur).await;
124+
timeout -= dur;
125+
continue;
126+
}
127+
break;
128+
}
129+
130+
shutdown_trigger.trigger();
131+
});
132+
133+
tokio::try_join!(
134+
file_upload_server.run(shutdown_listener.clone()),
135+
item_server.run(shutdown_listener.clone())
136+
)
137+
.unwrap();
138+
139+
tmp_dir.close()?;
140+
141+
let res = uploaded_file.lock().await;
142+
Ok(res.clone())
143+
}
144+
145+
pub async fn put_file_to_aws(&self, file_path: &Path) -> Result<()> {
146+
let path_str = file_path.display();
147+
if !file_path.exists() {
148+
return Err(anyhow!("File {path_str} is absent"));
149+
}
150+
if !file_path.is_file() {
151+
return Err(anyhow!("File {path_str} is not a file"));
152+
}
153+
self.file_store.put(file_path).await?;
154+
155+
Ok(())
156+
}
157+
}
158+
159+
fn is_dir_has_files(dir_path: &Path) -> bool {
160+
let entries = std::fs::read_dir(dir_path)
161+
.unwrap()
162+
.map(|res| res.map(|e| e.path().is_dir()))
163+
.collect::<Result<Vec<_>, std::io::Error>>()
164+
.unwrap();
165+
entries.contains(&false)
166+
}

file_store/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ helium-proto = {workspace = true}
3030
helium-crypto = {workspace = true}
3131
csv = "*"
3232
http = {workspace = true}
33-
aws-config = "0.51"
34-
aws-sdk-s3 = "0.21"
35-
aws-types = { version = "0.51", features = ["hardcoded-credentials"], optional = true}
33+
aws-config = {workspace = true}
34+
aws-sdk-s3 = {workspace = true}
35+
aws-types = {workspace = true, optional = true}
3636
strum = {version = "0", features = ["derive"]}
3737
strum_macros = "0"
3838
sha2 = {workspace = true}
@@ -53,7 +53,7 @@ task-manager = { path = "../task_manager" }
5353

5454
[dev-dependencies]
5555
hex-literal = "0"
56-
tempfile = "3"
56+
tempfile = { workspace = true }
5757

5858
[features]
5959
default = ["sqlx-postgres"]

mobile_verifier/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,6 @@ task-manager = { path = "../task_manager" }
6565

6666
[dev-dependencies]
6767
proptest = "1.5.0"
68+
aws-local = { path = "../aws_local" }
69+
tempfile = "3"
70+

mobile_verifier/src/boosting_oracles/data_sets.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ impl DataSetDownloaderDaemon {
315315
}
316316
}
317317

318-
async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> {
318+
pub async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> {
319319
let new_urbanized = self
320320
.data_sets
321321
.urbanization
@@ -392,8 +392,7 @@ impl DataSetDownloaderDaemon {
392392
Ok(())
393393
}
394394

395-
pub async fn run(mut self) -> anyhow::Result<()> {
396-
tracing::info!("Starting data set downloader task");
395+
pub async fn fetch_first_datasets(&mut self) -> anyhow::Result<()> {
397396
self.data_sets
398397
.urbanization
399398
.fetch_first_data_set(&self.pool, &self.data_set_directory)
@@ -410,6 +409,12 @@ impl DataSetDownloaderDaemon {
410409
.service_provider_override
411410
.fetch_first_data_set(&self.pool, &self.data_set_directory)
412411
.await?;
412+
Ok(())
413+
}
414+
415+
pub async fn run(mut self) -> anyhow::Result<()> {
416+
tracing::info!("Starting data set downloader task");
417+
self.fetch_first_datasets().await?;
413418
// Attempt to fill in any unassigned hexes. This is for the edge case in
414419
// which we shutdown before a coverage object updates.
415420
if is_hex_boost_data_ready(&self.data_sets) {

0 commit comments

Comments
 (0)