Skip to content

Commit 7ed6c4d

Browse files
committed
update get_address to use url instead of sockaddr
1 parent 802f727 commit 7ed6c4d

File tree

6 files changed

+52
-20
lines changed

6 files changed

+52
-20
lines changed

server/src/catalog.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,12 @@ pub async fn update_snapshot(
118118
let mut ch = false;
119119
for m in manifests.iter() {
120120
let s = get_address();
121-
let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE);
121+
let p = format!(
122+
"{}.{}.{}",
123+
s.domain().unwrap(),
124+
s.port().unwrap_or_default(),
125+
MANIFEST_FILE
126+
);
122127
if m.manifest_path.contains(&p) {
123128
ch = true;
124129
}
@@ -152,7 +157,12 @@ pub async fn update_snapshot(
152157
};
153158

154159
let addr = get_address();
155-
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
160+
let mainfest_file_name = format!(
161+
"{}.{}.{}",
162+
addr.domain().unwrap(),
163+
addr.port().unwrap_or_default(),
164+
MANIFEST_FILE
165+
);
156166
let path =
157167
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
158168
storage
@@ -186,7 +196,12 @@ pub async fn update_snapshot(
186196
};
187197

188198
let addr = get_address();
189-
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
199+
let mainfest_file_name = format!(
200+
"{}.{}.{}",
201+
addr.domain().unwrap(),
202+
addr.port().unwrap(),
203+
MANIFEST_FILE
204+
);
190205
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
191206
storage
192207
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ impl IngestServer {
183183
let store = CONFIG.storage().get_object_store();
184184

185185
let sock = get_address();
186-
let path = ingestor_metadata_path(sock.ip().to_string(), sock.port().to_string());
186+
let path = ingestor_metadata_path(
187+
sock.domain().unwrap().to_string(),
188+
sock.port().unwrap_or_default().to_string(),
189+
);
187190

188191
if store.get_object(&path).await.is_ok() {
189192
println!("ingestor metadata already exists");
@@ -192,13 +195,19 @@ impl IngestServer {
192195

193196
let scheme = CONFIG.parseable.get_scheme();
194197
let resource = IngestorMetadata::new(
195-
sock.port().to_string(),
198+
sock.port().unwrap_or_default().to_string(),
196199
CONFIG
197200
.parseable
198201
.domain_address
199202
.clone()
200203
.unwrap_or_else(|| {
201-
Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap()
204+
Url::parse(&format!(
205+
"{}://{}:{}",
206+
scheme,
207+
sock.domain().unwrap(),
208+
sock.port().unwrap_or_default()
209+
))
210+
.unwrap()
202211
})
203212
.to_string(),
204213
DEFAULT_VERSION.to_string(),

server/src/metrics/prom_utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ struct StorageMetrics {
2323
impl Default for Metrics {
2424
fn default() -> Self {
2525
let socket = get_address();
26-
let address = format!("http://{}:{}", socket.ip(), socket.port());
26+
let address = format!(
27+
"http://{}:{}",
28+
socket.domain().unwrap(),
29+
socket.port().unwrap_or_default()
30+
);
2731
Metrics {
2832
address,
2933
parseable_events_ingested: 0.0,

server/src/storage/object_storage.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -542,8 +542,8 @@ fn schema_path(stream_name: &str) -> RelativePathBuf {
542542
let addr = get_address();
543543
let file_name = format!(
544544
".ingestor.{}.{}{}",
545-
addr.ip(),
546-
addr.port(),
545+
addr.domain().unwrap(),
546+
addr.port().unwrap_or_default(),
547547
SCHEMA_FILE_NAME
548548
);
549549

@@ -562,8 +562,8 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
562562
let addr = get_address();
563563
let file_name = format!(
564564
".ingestor.{}.{}{}",
565-
addr.ip(),
566-
addr.port(),
565+
addr.domain().unwrap(),
566+
addr.port().unwrap_or_default(),
567567
STREAM_METADATA_FILE_NAME
568568
);
569569
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
@@ -590,7 +590,12 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf {
590590
#[inline(always)]
591591
fn manifest_path(prefix: &str) -> RelativePathBuf {
592592
let addr = get_address();
593-
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
593+
let mainfest_file_name = format!(
594+
"{}.{}.{}",
595+
addr.domain().unwrap(),
596+
addr.port().unwrap_or_default(),
597+
MANIFEST_FILE
598+
);
594599
RelativePathBuf::from_iter([prefix, &mainfest_file_name])
595600
}
596601

server/src/storage/staging.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ impl StorageDir {
6464
+ &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap();
6565
let local_uri = str::replace(&uri, "/", ".");
6666
let sock = get_address();
67-
let ip = sock.ip();
68-
let port = sock.port();
67+
let ip = sock.domain().unwrap();
68+
let port = sock.port().unwrap_or_default();
6969
format!("{local_uri}{ip}.{port}.{extention}")
7070
}
7171

server/src/utils.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ pub mod update;
2525
use crate::option::CONFIG;
2626
use chrono::{DateTime, NaiveDate, Timelike, Utc};
2727
use std::env;
28+
#[allow(unused_imports)]
2829
use std::net::SocketAddr;
30+
use url::Url;
2931

3032
#[allow(dead_code)]
3133
pub fn hostname() -> Option<String> {
@@ -224,10 +226,9 @@ impl TimePeriod {
224226
}
225227
}
226228

227-
#[inline(always)]
228-
pub fn get_address() -> SocketAddr {
229+
pub fn get_address() -> Url {
229230
if CONFIG.parseable.ingestor_url.is_empty() {
230-
CONFIG.parseable.address.parse::<SocketAddr>().unwrap()
231+
CONFIG.parseable.address.parse::<Url>().unwrap()
231232
} else {
232233
let addr_from_env = CONFIG
233234
.parseable
@@ -245,9 +246,7 @@ pub fn get_address() -> SocketAddr {
245246
let var_port = port[1..].to_string();
246247
port = get_from_env(&var_port);
247248
}
248-
format!("{}:{}", hostname, port)
249-
.parse::<SocketAddr>()
250-
.unwrap()
249+
format!("{}:{}", hostname, port).parse::<Url>().unwrap()
251250
}
252251
}
253252
fn get_from_env(var_to_fetch: &str) -> String {

0 commit comments

Comments
 (0)