Skip to content

Commit 80905d0

Browse files
committed
feat: indexable file store
1 parent 1e0f74c commit 80905d0

File tree

5 files changed

+63
-43
lines changed

5 files changed

+63
-43
lines changed

agent-control/src/agent_control/defaults.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::agent_type::agent_type_id::AgentTypeID;
2-
use crate::k8s::store::StoreKey;
2+
use crate::opamp::data_store::StoreKey;
33
use crate::opamp::remote_config::signature::SIGNATURE_CUSTOM_CAPABILITY;
44
use crate::sub_agent::identity::AgentIdentity;
55
use opamp_client::capabilities;

agent-control/src/k8s/store.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ use super::client::SyncK8sClient;
44
use super::labels::Labels;
55
use crate::agent_control::agent_id::AgentID;
66
use crate::agent_control::defaults::{FOLDER_NAME_FLEET_DATA, FOLDER_NAME_LOCAL_DATA};
7+
use crate::opamp::data_store::StoreKey;
78
use std::sync::{Arc, RwLock};
89

9-
/// The key used to identify the data in the Store.
10-
pub type StoreKey = str;
11-
1210
/// Represents a Kubernetes persistent store of Agents data such as instance id and configs.
1311
/// The store is implemented using one ConfigMap per Agent with all the data.
1412
pub struct K8sStore {

agent-control/src/on_host/file_store.rs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,9 @@ use tracing::{debug, error, trace};
1515
use crate::{
1616
agent_control::{
1717
agent_id::AgentID,
18-
defaults::{
19-
FOLDER_NAME_FLEET_DATA, FOLDER_NAME_LOCAL_DATA, STORE_KEY_LOCAL_DATA_CONFIG,
20-
STORE_KEY_OPAMP_DATA_CONFIG,
21-
},
18+
defaults::{FOLDER_NAME_FLEET_DATA, FOLDER_NAME_LOCAL_DATA},
2219
},
23-
opamp::instance_id::on_host::storer::build_config_name,
20+
opamp::{data_store::StoreKey, instance_id::on_host::storer::build_config_name},
2421
};
2522

2623
pub struct FileStore<F, D>
@@ -37,22 +34,22 @@ where
3734
pub struct LocalDir(PathBuf);
3835

3936
impl LocalDir {
40-
pub fn get_local_values_file_path(&self, agent_id: &AgentID) -> PathBuf {
37+
pub fn get_local_file_path(&self, agent_id: &AgentID, key: &StoreKey) -> PathBuf {
4138
self.0
4239
.join(FOLDER_NAME_LOCAL_DATA)
4340
.join(agent_id)
44-
.join(build_config_name(STORE_KEY_LOCAL_DATA_CONFIG))
41+
.join(build_config_name(key))
4542
}
4643
}
4744

4845
pub struct RemoteDir(PathBuf);
4946

5047
impl RemoteDir {
51-
pub fn get_remote_values_file_path(&self, agent_id: &AgentID) -> PathBuf {
48+
pub fn get_remote_file_path(&self, agent_id: &AgentID, key: &StoreKey) -> PathBuf {
5249
self.0
5350
.join(FOLDER_NAME_FLEET_DATA)
5451
.join(agent_id)
55-
.join(build_config_name(STORE_KEY_OPAMP_DATA_CONFIG))
52+
.join(build_config_name(key))
5653
}
5754
}
5855

@@ -125,23 +122,28 @@ where
125122
})
126123
}
127124

128-
pub fn get_opamp_data<T>(&self, agent_id: &AgentID) -> Result<Option<T>, Error>
125+
pub fn get_opamp_data<T>(&self, agent_id: &AgentID, key: &StoreKey) -> Result<Option<T>, Error>
129126
where
130127
T: DeserializeOwned,
131128
{
132129
let remote_dir = self.remote_dir.read().unwrap();
133-
self.get(remote_dir.get_remote_values_file_path(agent_id))
130+
self.get(remote_dir.get_remote_file_path(agent_id, key))
134131
}
135132

136-
pub fn get_local_data<T>(&self, agent_id: &AgentID) -> Result<Option<T>, Error>
133+
pub fn get_local_data<T>(&self, agent_id: &AgentID, key: &StoreKey) -> Result<Option<T>, Error>
137134
where
138135
T: DeserializeOwned,
139136
{
140-
self.get(self.local_dir.get_local_values_file_path(agent_id))
137+
self.get(self.local_dir.get_local_file_path(agent_id, key))
141138
}
142139

143140
/// Stores data in the specified StoreKey of an Agent store.
144-
pub fn set_opamp_data<T>(&self, agent_id: &AgentID, data: &T) -> Result<(), Error>
141+
pub fn set_opamp_data<T>(
142+
&self,
143+
agent_id: &AgentID,
144+
key: &StoreKey,
145+
data: &T,
146+
) -> Result<(), Error>
145147
where
146148
T: Serialize,
147149
{
@@ -151,7 +153,7 @@ where
151153
#[allow(clippy::readonly_write_lock)]
152154
let remote_dir = self.remote_dir.write().unwrap();
153155

154-
let remote_values_path = remote_dir.get_remote_values_file_path(agent_id);
156+
let remote_values_path = remote_dir.get_remote_file_path(agent_id, key);
155157

156158
self.ensure_directory_existence(&remote_values_path)
157159
.map_err(|err| {
@@ -176,14 +178,14 @@ where
176178
}
177179

178180
/// Delete data of an Agent store.
179-
pub fn delete_opamp_data(&self, agent_id: &AgentID) -> Result<(), Error> {
181+
pub fn delete_opamp_data(&self, agent_id: &AgentID, key: &StoreKey) -> Result<(), Error> {
180182
// I'm writing (deleting) the locked file, not mutating the path
181183
// I think the OS will handle concurrent write/delete fine from all
182184
// threads/subprocesses of the program, but just in case. We can revisit later.
183185
#[allow(clippy::readonly_write_lock)]
184186
let remote_dir = self.remote_dir.write().unwrap();
185187

186-
let remote_path_file = remote_dir.get_remote_values_file_path(agent_id);
188+
let remote_path_file = remote_dir.get_remote_file_path(agent_id, key);
187189
if remote_path_file.exists() {
188190
debug!("deleting remote config: {:?}", remote_path_file);
189191
std::fs::remove_file(remote_path_file)?;
@@ -199,7 +201,10 @@ pub mod tests {
199201
directory_manager::DirectoryManager, file_reader::FileReader, writer_file::FileWriter,
200202
};
201203

202-
use crate::agent_control::agent_id::AgentID;
204+
use crate::agent_control::{
205+
agent_id::AgentID,
206+
defaults::{STORE_KEY_LOCAL_DATA_CONFIG, STORE_KEY_OPAMP_DATA_CONFIG},
207+
};
203208

204209
use super::*;
205210

@@ -213,9 +218,10 @@ pub mod tests {
213218
self.remote_dir
214219
.read()
215220
.unwrap()
216-
.get_remote_values_file_path(agent_id)
221+
.get_remote_file_path(agent_id, STORE_KEY_OPAMP_DATA_CONFIG)
217222
} else {
218-
self.local_dir.get_local_values_file_path(agent_id)
223+
self.local_dir
224+
.get_local_file_path(agent_id, STORE_KEY_LOCAL_DATA_CONFIG)
219225
}
220226
}
221227
}

agent-control/src/opamp/data_store.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ use serde::{Serialize, de::DeserializeOwned};
22

33
use crate::agent_control::agent_id::AgentID;
44

5+
/// The key used to identify the data in the OpAMP Data Store.
6+
pub type StoreKey = str;
7+
58
/// Implementers of this trait represent data stores for OpAMP-related data.
69
///
710
/// They expose ways to get, set and delete data associated with the management of agent

agent-control/src/values/file.rs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use crate::agent_control::agent_id::AgentID;
4+
use crate::agent_control::defaults::{STORE_KEY_LOCAL_DATA_CONFIG, STORE_KEY_OPAMP_DATA_CONFIG};
45
use crate::on_host::file_store::FileStore;
56
use crate::opamp::remote_config::hash::ConfigState;
67
use crate::values::config::{Config, RemoteConfig};
@@ -65,7 +66,7 @@ where
6566
#[tracing::instrument(skip_all, err)]
6667
fn load_local(&self, agent_id: &AgentID) -> Result<Option<Config>, ConfigRepositoryError> {
6768
self.file_store
68-
.get_local_data::<YAMLConfig>(agent_id)
69+
.get_local_data::<YAMLConfig>(agent_id, STORE_KEY_LOCAL_DATA_CONFIG)
6970
.map_err(|err| ConfigRepositoryError::LoadError(format!("loading local config: {err}")))
7071
.map(|opt_yaml| opt_yaml.map(|yc| Config::LocalConfig(yc.into())))
7172
}
@@ -80,7 +81,7 @@ where
8081
Ok(None)
8182
} else {
8283
self.file_store
83-
.get_opamp_data::<RemoteConfig>(agent_id)
84+
.get_opamp_data::<RemoteConfig>(agent_id, STORE_KEY_OPAMP_DATA_CONFIG)
8485
.map_err(|err| {
8586
ConfigRepositoryError::LoadError(format!("loading remote config: {err}"))
8687
})
@@ -97,7 +98,7 @@ where
9798
debug!(agent_id = agent_id.to_string(), "saving remote config");
9899

99100
self.file_store
100-
.set_opamp_data(agent_id, remote_config)
101+
.set_opamp_data(agent_id, STORE_KEY_OPAMP_DATA_CONFIG, remote_config)
101102
.map_err(|e| ConfigRepositoryError::StoreError(format!("storing remote config: {}", e)))
102103
}
103104

@@ -106,7 +107,7 @@ where
106107
agent_id: &AgentID,
107108
) -> Result<Option<RemoteConfig>, ConfigRepositoryError> {
108109
self.file_store
109-
.get_opamp_data::<RemoteConfig>(agent_id)
110+
.get_opamp_data::<RemoteConfig>(agent_id, STORE_KEY_OPAMP_DATA_CONFIG)
110111
.map_err(|e| {
111112
ConfigRepositoryError::LoadError(format!("getting remote config hash: {}", e))
112113
})
@@ -124,15 +125,19 @@ where
124125

125126
let maybe_config = self
126127
.file_store
127-
.get_opamp_data::<RemoteConfig>(agent_id)
128+
.get_opamp_data::<RemoteConfig>(agent_id, STORE_KEY_OPAMP_DATA_CONFIG)
128129
.map_err(|e| {
129130
ConfigRepositoryError::LoadError(format!("updating remote config state: {e}"))
130131
})?;
131132

132133
match maybe_config {
133134
Some(remote_config) => self
134135
.file_store
135-
.set_opamp_data(agent_id, &remote_config.with_state(state))
136+
.set_opamp_data(
137+
agent_id,
138+
STORE_KEY_OPAMP_DATA_CONFIG,
139+
&remote_config.with_state(state),
140+
)
136141
.map_err(|err| {
137142
ConfigRepositoryError::StoreError(format!(
138143
"updating remote config state: {err}"
@@ -151,9 +156,11 @@ where
151156
fn delete_remote(&self, agent_id: &AgentID) -> Result<(), ConfigRepositoryError> {
152157
debug!(agent_id = agent_id.to_string(), "deleting remote config");
153158

154-
self.file_store.delete_opamp_data(agent_id).map_err(|e| {
155-
ConfigRepositoryError::DeleteError(format!("deleting remote config: {}", e))
156-
})
159+
self.file_store
160+
.delete_opamp_data(agent_id, STORE_KEY_OPAMP_DATA_CONFIG)
161+
.map_err(|e| {
162+
ConfigRepositoryError::DeleteError(format!("deleting remote config: {}", e))
163+
})
157164
}
158165
}
159166

@@ -206,9 +213,9 @@ state: applied
206213
let remote_dir_path = RemoteDir::from(PathBuf::from("some/remote/path/"));
207214
let local_dir_path = LocalDir::from(PathBuf::from("some/local/path/"));
208215
let test_path = if remote_enabled {
209-
remote_dir_path.get_remote_values_file_path(&agent_id)
216+
remote_dir_path.get_remote_file_path(&agent_id, STORE_KEY_OPAMP_DATA_CONFIG)
210217
} else {
211-
local_dir_path.get_local_values_file_path(&agent_id)
218+
local_dir_path.get_local_file_path(&agent_id, STORE_KEY_LOCAL_DATA_CONFIG)
212219
};
213220

214221
// Expectations
@@ -247,8 +254,9 @@ state: applied
247254
let dir_manager = MockDirectoryManager::new();
248255
let remote_dir_path = RemoteDir::from(PathBuf::from("some/remote/path/"));
249256
let local_dir_path = LocalDir::from(PathBuf::from("some/local/path/"));
250-
let remote_path = remote_dir_path.get_remote_values_file_path(&agent_id);
251-
let local_path = local_dir_path.get_local_values_file_path(&agent_id);
257+
let remote_path =
258+
remote_dir_path.get_remote_file_path(&agent_id, STORE_KEY_OPAMP_DATA_CONFIG);
259+
let local_path = local_dir_path.get_local_file_path(&agent_id, STORE_KEY_LOCAL_DATA_CONFIG);
252260

253261
// Expectations
254262
file_rw.should_not_read_file_not_found(&remote_path, "some_error_message".to_string());
@@ -285,7 +293,7 @@ state: applied
285293
let dir_manager = MockDirectoryManager::new();
286294
let remote_dir_path = PathBuf::from("some/remote/path/");
287295
let local_dir_path = LocalDir::from(PathBuf::from("some/local/path/"));
288-
let local_path = local_dir_path.get_local_values_file_path(&agent_id);
296+
let local_path = local_dir_path.get_local_file_path(&agent_id, STORE_KEY_LOCAL_DATA_CONFIG);
289297

290298
// Expectations
291299
file_rw.should_not_read_file_not_found(&local_path, "some message".to_string());
@@ -313,8 +321,10 @@ state: applied
313321
let dir_manager = MockDirectoryManager::new();
314322
let remote_dir_path = RemoteDir::from(PathBuf::from("some/remote/path/"));
315323
let local_dir_path = LocalDir::from(PathBuf::from("some/local/path/"));
316-
let remote_test_path = remote_dir_path.get_remote_values_file_path(&agent_id);
317-
let local_test_path = local_dir_path.get_local_values_file_path(&agent_id);
324+
let remote_test_path =
325+
remote_dir_path.get_remote_file_path(&agent_id, STORE_KEY_OPAMP_DATA_CONFIG);
326+
let local_test_path =
327+
local_dir_path.get_local_file_path(&agent_id, STORE_KEY_LOCAL_DATA_CONFIG);
318328

319329
// Expectations
320330
if remote_enabled {
@@ -348,7 +358,8 @@ state: applied
348358
let mut dir_manager = MockDirectoryManager::new();
349359
let remote_dir_path = RemoteDir::from(PathBuf::from("some/remote/path/"));
350360
let local_dir_path = LocalDir::from(PathBuf::from("some/local/path/"));
351-
let remote_path = remote_dir_path.get_remote_values_file_path(&agent_id);
361+
let remote_path =
362+
remote_dir_path.get_remote_file_path(&agent_id, STORE_KEY_OPAMP_DATA_CONFIG);
352363

353364
// Expectations
354365
dir_manager.should_create(remote_path.parent().unwrap());
@@ -381,7 +392,8 @@ state: applied
381392
let mut dir_manager = MockDirectoryManager::new();
382393
let remote_dir_path = RemoteDir::from(PathBuf::from("some/remote/path/"));
383394
let local_dir_path = LocalDir::from(PathBuf::from("some/local/path/"));
384-
let remote_path = remote_dir_path.get_remote_values_file_path(&agent_id);
395+
let remote_path =
396+
remote_dir_path.get_remote_file_path(&agent_id, STORE_KEY_OPAMP_DATA_CONFIG);
385397

386398
// Expectations
387399
dir_manager.should_not_create(
@@ -413,7 +425,8 @@ state: applied
413425
let mut dir_manager = MockDirectoryManager::new();
414426
let remote_dir_path = RemoteDir::from(PathBuf::from("some/remote/path/"));
415427
let local_dir_path = LocalDir::from(PathBuf::from("some/local/path/"));
416-
let remote_path = remote_dir_path.get_remote_values_file_path(&agent_id);
428+
let remote_path =
429+
remote_dir_path.get_remote_file_path(&agent_id, STORE_KEY_OPAMP_DATA_CONFIG);
417430

418431
// Expectations
419432
dir_manager.should_create(remote_path.parent().unwrap());

0 commit comments

Comments
 (0)