Skip to content

Commit

Permalink
vicky-worker: chunk log uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
yu-re-ka committed Oct 4, 2023
1 parent ba8d39c commit 51285fb
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions vicky-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ pub struct Task {
}

fn log_sink(cfg: Arc<Config>, task_id: Uuid) -> impl Sink<String, Error = anyhow::Error> + Send {
futures_util::sink::unfold((), move |_, line| {
futures_util::sink::unfold((), move |_, lines| {
let cfg = cfg.clone();
async move {
api::<_, ()>(
&cfg,
Method::POST,
&format!("api/v1/tasks/{}/logs", task_id),
&serde_json::json!({ "lines": [line] }),
&serde_json::json!({ "lines": lines }),
)
.await
}
Expand Down Expand Up @@ -112,7 +112,12 @@ async fn try_run_task(cfg: Arc<Config>, task: &Task) -> anyhow::Result<()> {
FramedRead::new(child.stderr.take().unwrap(), LinesCodec::new()),
);

lines.map_err(anyhow::Error::from).forward(logger).await?;
lines
.ready_chunks(1024) // TODO switch to try_ready_chunks
.map(|v| v.into_iter().collect::<Result<_, _>>())
.map_err(anyhow::Error::from)
.forward(logger)
.await?;
let exit_status = child.wait().await?;

if exit_status.success() {
Expand Down

0 comments on commit 51285fb

Please sign in to comment.