Skip to content

Commit

Permalink
feature: connect batch etl to trieve
Browse files Browse the repository at this point in the history
  • Loading branch information
densumesh committed Jan 8, 2025
1 parent 7dc0755 commit 6ace28c
Show file tree
Hide file tree
Showing 36 changed files with 2,147 additions and 618 deletions.
68 changes: 61 additions & 7 deletions batch-etl/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions batch-etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@
name = "batch-etl"
version = "0.1.0"
edition = "2021"
default-run = "batch-etl-server"

[[bin]]
name = "batch-etl-server"
path = "src/main.rs"

[[bin]]
name = "create-job"
path = "src/workers/create-job-worker.rs"

[[bin]]
name = "check-completion"
path = "src/cron-jobs/check-completion.rs"

[dependencies]
utoipa-redoc = { version = "5.0.0", features = ["actix-web"] }
Expand All @@ -26,6 +39,7 @@ time = "0.3.37"
rust-s3 = "0.35.1"
reqwest = "0.12.9"
jsonl = "4.0.1"
broccoli_queue = "0.1.2"

[features]
default = []
Expand Down
3 changes: 2 additions & 1 deletion batch-etl/ch_migrations/1734595053_initial_tables/down.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS inputs;
DROP TABLE IF EXISTS jobs;
DROP TABLE IF EXISTS schemas;
DROP TABLE IF EXISTS schemas;
DROP TABLE IF EXISTS batches;
19 changes: 15 additions & 4 deletions batch-etl/ch_migrations/1734595053_initial_tables/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ CREATE TABLE IF NOT EXISTS jobs (
id String,
schema_id String,
input_id String,
status String,
batch_id String,
output_id String,
webhook_url String,
created_at DateTime,
updated_at DateTime
) ENGINE = ReplacingMergeTree(updated_at)
) ENGINE = MergeTree()
ORDER BY (schema_id, id)
PARTITION BY
(schema_id);
Expand All @@ -29,3 +27,16 @@ CREATE TABLE IF NOT EXISTS inputs (
updated_at DateTime
) ENGINE = MergeTree()
ORDER BY (id);

CREATE TABLE IF NOT EXISTS batches (
batch_id String,
job_id String,
output_id String,
status String,
created_at DateTime,
updated_at DateTime
) ENGINE = ReplacingMergeTree(updated_at)
ORDER BY (job_id, batch_id)
PARTITION BY
(job_id);

2 changes: 1 addition & 1 deletion batch-etl/ch_migrations/chm.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
url = "http://localhost:8123"
url = "http://localhost:8124"
user = "clickhouse"
password = "password"
database = "default"
12 changes: 6 additions & 6 deletions batch-etl/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
timeout: 5s
retries: 10
ports:
- "6379:6379"
- "6380:6379"
volumes:
- redis-data:/data
networks:
Expand All @@ -18,8 +18,8 @@ services:
s3:
image: minio/minio:RELEASE.2023-09-27T15-22-50Z
ports:
- 9000:9000
- 42625:42625
- 9004:9000
- 42626:42625
environment:
- MINIO_ROOT_USER=${MINIO_ROOT_USER}
- MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD}
Expand Down Expand Up @@ -64,9 +64,9 @@ services:
volumes:
- clickhouse-data:/var/lib/clickhouse
ports:
- "8123:8123"
- "9001:9000"
- "9009:9009"
- "8124:8123"
- "9002:9000"
- "9010:9009"
networks:
- app-network

Expand Down
38 changes: 38 additions & 0 deletions batch-etl/src/cron-jobs/check-completion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use batch_etl::operators::{
batch::{get_batch_output, get_pending_batches},
job::send_webhook,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
env_logger::builder()
.target(env_logger::Target::Stdout)
.filter_level(log::LevelFilter::Info)
.init();

let clickhouse_client = clickhouse::Client::default()
.with_url(std::env::var("CLICKHOUSE_URL").unwrap_or("http://localhost:8123".to_string()))
.with_user(std::env::var("CLICKHOUSE_USER").unwrap_or("default".to_string()))
.with_password(std::env::var("CLICKHOUSE_PASSWORD").unwrap_or("".to_string()))
.with_database(std::env::var("CLICKHOUSE_DATABASE").unwrap_or("default".to_string()))
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");

let pending_batches = get_pending_batches(&clickhouse_client).await.unwrap();
log::info!("Pending batches: {:?}", pending_batches);

for (batch, id, webhook_url) in pending_batches {
log::info!("Processing batch: {:?}", batch);
let batch_url = get_batch_output(&clickhouse_client, batch).await.unwrap();
log::info!("Batch URL: {:?}", batch_url);
if let Some(webhook_url) = webhook_url {
if let Some(batch_url) = batch_url {
log::info!("Sending webhook to: {:?}", webhook_url);
send_webhook(webhook_url, id, batch_url).await.unwrap();
}
}
}

Ok(())
}
21 changes: 21 additions & 0 deletions batch-etl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use actix_web::{
middleware::Logger,
web::{self, PayloadConfig},
App, HttpServer,
};
use broccoli_queue::queue::BroccoliQueue;
use chm::tools::migrations::{run_pending_migrations, SetupArgs};
use errors::custom_json_error_handler;
use routes::{
Expand Down Expand Up @@ -131,6 +133,17 @@ pub async fn main() -> std::io::Result<()> {
.limit(134200000)
.error_handler(custom_json_error_handler);

let broccoli_queue = BroccoliQueue::builder(redis_url)
.pool_connections(redis_connections.try_into().unwrap())
.build()
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to create broccoli queue {:?}", e),
)
})?;

HttpServer::new(move || {
App::new()
.into_utoipa_app()
Expand All @@ -139,6 +152,7 @@ pub async fn main() -> std::io::Result<()> {
.app_data(json_cfg.clone())
.app_data(web::Data::new(redis_pool.clone()))
.app_data(web::Data::new(clickhouse_client.clone()))
.app_data(web::Data::new(broccoli_queue.clone()))
.service(utoipa_actix_web::scope("/api/schema").configure(|config| {
config.service(create_schema).service(get_schema);
}))
Expand All @@ -154,6 +168,13 @@ pub async fn main() -> std::io::Result<()> {
}))
.openapi_service(|api| Redoc::with_url("/redoc", api))
.into_app()
.wrap(
// Set up logger, but avoid logging hot status endpoints
Logger::new("%r %s %b %{Referer}i %{User-Agent}i %T %{TR-Dataset}i")
.exclude("/")
.exclude("/api/health")
.exclude("/metrics"),
)
})
.bind(("0.0.0.0", 8082))?
.run()
Expand Down
Loading

0 comments on commit 6ace28c

Please sign in to comment.