v0.1.6: Amalthea
This release contains PostgresSink can be used like:
stream
.enumerate()
.map(|(idx, m)| {
m.map(|e| {
let doc = SpotifyDocument::new(e.payload_view::<str>().unwrap().unwrap().into());
println!("Processing document ID: {}", idx);
let query = "INSERT INTO
spotify (song_url)
VALUES ($1)";
CPostgresRow::new(query.into(), vec![doc.song_url])
})
.ok_or(CallystoError::GeneralError("No payload".into()))
})
.forward(
CPostgresSink::new("postgres://testuser:testpassword@localhost/testdb", 4, 0).unwrap(),
)
.await?;