Skip to content

Commit 45c8844

Browse files
kirisakipyshx
andauthored
feat(websocket): support GCS client (#290)
* feat(websocket): Support snapshot for GCS * fix(websocket): Remove unnecessary mut * feat(websocket): Make GcsClient enable to take a arbitrary value * feat(websocket): support download * feat(websocket): define GCS client * fix(websocket): resolve conflicts * organize imports of domain files --------- Co-authored-by: pyshx <[email protected]>
1 parent aac2aff commit 45c8844

File tree

10 files changed

+1211
-91
lines changed

10 files changed

+1211
-91
lines changed

websocket/Cargo.lock

Lines changed: 925 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

websocket/Cargo.toml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[workspace]
22
members = [
3-
"crates/*",
3+
"crates/*",
44
]
55

66
resolver = "2"
@@ -38,16 +38,17 @@ strip = true
3838
flow-websocket-domain = {path = "crates/domain"}
3939

4040
async-trait = "0.1.80"
41-
serde = { version = "1.0", features = ["derive"] }
42-
serde_json = {version = "1.0.117", features = ["arbitrary_precision"]}
43-
chrono = { version = "0.4", features = ["serde"] }
44-
redis = { version = "0.25.4", features = ["aio", "tokio-comp"] }
41+
chrono = {version = "0.4", features = ["serde"]}
42+
google-cloud-storage = "0.18"
43+
redis = {version = "0.25.4", features = ["aio", "tokio-comp"]}
4544
rslock = "0.3.0"
45+
serde = {version = "1.0", features = ["derive"]}
46+
serde_json = {version = "1.0.117", features = ["arbitrary_precision"]}
4647
tokio = {version = "1.38.0", features = ["full", "time"]}
4748
uuid = {version = "1.8.0", features = [
4849
"v4",
4950
"fast-rng",
5051
"macro-diagnostics",
5152
"serde",
5253
]}
53-
yrs = "0.18"
54+
yrs = "0.18"

websocket/crates/domain/src/project.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use serde::{Deserialize, Serialize};
2-
use crate::snapshot::{ProjectSnapshot, ObjectTenant, ObjectDelete};
3-
use crate::repository::ProjectSnapshotRepository;
41
use std::error::Error;
52

6-
use crate::utils::generate_id;
3+
use serde::{Deserialize, Serialize};
74

5+
use crate::repository::ProjectSnapshotRepository;
6+
use crate::snapshot::{ObjectDelete, ObjectTenant, ProjectSnapshot};
7+
use crate::utils::generate_id;
88

99
#[derive(Debug, Serialize, Deserialize, Clone)]
1010
pub struct Project {

websocket/crates/domain/src/repository.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
use std::error::Error;
2+
13
use crate::project::{Project, ProjectEditingSession};
24
use crate::snapshot::ProjectSnapshot;
3-
use std::error::Error;
45

56
#[async_trait::async_trait]
67
pub trait ProjectRepository {

websocket/crates/infra/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ version.workspace = true
1414
flow-websocket-domain.workspace = true
1515

1616
async-trait.workspace = true
17+
google-cloud-storage.workspace = true
1718
redis.workspace = true
1819
rslock.workspace = true
1920
serde.workspace = true
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use google_cloud_storage::client::{Client, ClientConfig};
2+
use google_cloud_storage::http::objects::download::Range;
3+
use google_cloud_storage::http::objects::get::GetObjectRequest;
4+
use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
5+
use serde::{Deserialize, Serialize};
6+
7+
#[derive(Clone)]
8+
pub struct GcsClient {
9+
client: Client,
10+
bucket: String,
11+
}
12+
13+
impl GcsClient {
14+
pub async fn new(bucket: String) -> Result<Self, Box<dyn std::error::Error>> {
15+
let config = ClientConfig::default().with_auth().await?;
16+
let client = Client::new(config);
17+
Ok(GcsClient { client, bucket })
18+
}
19+
20+
pub async fn upload<T: Serialize>(
21+
&self,
22+
path: String,
23+
data: &T,
24+
) -> Result<(), Box<dyn std::error::Error>> {
25+
let upload_type = UploadType::Simple(Media::new(path));
26+
let bytes = serde_json::to_string(data)?;
27+
let _uploaded = self
28+
.client
29+
.upload_object(
30+
&UploadObjectRequest {
31+
bucket: self.bucket.clone(),
32+
..Default::default()
33+
},
34+
bytes,
35+
&upload_type,
36+
)
37+
.await?;
38+
Ok(())
39+
}
40+
41+
pub async fn download<T: for<'de> Deserialize<'de>>(
42+
&self,
43+
path: String,
44+
) -> Result<T, Box<dyn std::error::Error>> {
45+
let bytes = self
46+
.client
47+
.download_object(
48+
&GetObjectRequest {
49+
bucket: self.bucket.clone(),
50+
object: path,
51+
..Default::default()
52+
},
53+
&Range::default(),
54+
)
55+
.await?;
56+
let src = String::from_utf8(bytes)?;
57+
let data = serde_json::from_str(&src)?;
58+
Ok(data)
59+
}
60+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod gcs_client;

websocket/crates/infra/src/persistence/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
pub mod gcs;
2+
pub mod project_repository;
13
pub mod redis;
2-
pub mod project_repository;

websocket/crates/infra/src/persistence/project_repository.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ use std::sync::Arc;
33

44
use async_trait::async_trait;
55

6-
use flow_websocket_domain::project::{Project, ProjectEditingSession};
7-
use flow_websocket_domain::repository::{ProjectRepository, ProjectEditingSessionRepository};
8-
6+
use crate::persistence::gcs::gcs_client::GcsClient;
97
use crate::persistence::redis::redis_client::RedisClient;
108

9+
use flow_websocket_domain::project::{Project, ProjectEditingSession};
10+
use flow_websocket_domain::repository::{
11+
ProjectEditingSessionRepository, ProjectRepository, ProjectSnapshotRepository,
12+
};
13+
use flow_websocket_domain::snapshot::ProjectSnapshot;
14+
1115
pub struct ProjectRedisRepository {
1216
redis_client: Arc<RedisClient>,
1317
}
@@ -34,7 +38,10 @@ impl ProjectEditingSessionRepository for ProjectRedisRepository {
3438
Ok(())
3539
}
3640

37-
async fn get_active_session(&self, project_id: &str) -> Result<Option<ProjectEditingSession>, Box<dyn Error>> {
41+
async fn get_active_session(
42+
&self,
43+
project_id: &str,
44+
) -> Result<Option<ProjectEditingSession>, Box<dyn Error>> {
3845
let key = format!("project:{}:active_session", project_id);
3946
self.redis_client.get(&key).await
4047
}
@@ -45,3 +52,40 @@ impl ProjectEditingSessionRepository for ProjectRedisRepository {
4552
Ok(())
4653
}
4754
}
55+
56+
/// A `ProjectGcsRepository` is a thin wrapper of `GcsClient`.
57+
pub struct ProjectGcsRepository {
58+
client: GcsClient,
59+
}
60+
61+
impl ProjectGcsRepository {
62+
/// Returns the `ProjectGcsRepository`.
63+
fn new(client: GcsClient) -> Self {
64+
Self { client }
65+
}
66+
}
67+
68+
#[async_trait]
69+
impl ProjectSnapshotRepository for ProjectGcsRepository {
70+
/// Create a snapshot.
71+
async fn create_snapshot(&self, snapshot: ProjectSnapshot) -> Result<(), Box<dyn Error>> {
72+
let path = format!("snapshot/{}", snapshot.id);
73+
self.client.upload(path, &snapshot).await?;
74+
Ok(())
75+
}
76+
77+
/// Get the latest snapshot.
78+
async fn get_latest_snapshot(
79+
&self,
80+
project_id: &str,
81+
) -> Result<Option<ProjectSnapshot>, Box<dyn Error>> {
82+
let path = format!("snapshot/{}:latest_snapshot", project_id);
83+
self.client.download(path).await
84+
}
85+
86+
/// Get the state of the latest snapshot.
87+
async fn get_latest_snapshot_state(&self, project_id: &str) -> Result<Vec<u8>, Box<dyn Error>> {
88+
let path = format!("snapshot/{}:latest_snapshot_state", project_id);
89+
self.client.download(path).await
90+
}
91+
}

0 commit comments

Comments
 (0)