From ac17dea69e0b404491c75589f296b91754828708 Mon Sep 17 00:00:00 2001 From: Ryan Daum Date: Sat, 23 Sep 2023 06:40:56 -0400 Subject: [PATCH] More connectivity stuff: * Add `console-host`, a readline type interface direct from the console, sans network. * Fix 'create' action for websockets login. * Fix idle time / activity. --- Cargo.lock | 26 +++ Cargo.toml | 1 + crates/README.md | 2 + crates/console-host/Cargo.toml | 40 ++++ crates/console-host/src/main.rs | 320 +++++++++++++++++++++++++++++++ crates/daemon/src/connections.rs | 17 ++ crates/daemon/src/rpc_server.rs | 39 +++- crates/web-host/src/ws_host.rs | 2 +- 8 files changed, 442 insertions(+), 5 deletions(-) create mode 100644 crates/console-host/Cargo.toml create mode 100644 crates/console-host/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index c0dd9cb1..da537cef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,6 +1458,32 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "moor-console-host" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bincode", + "clap", + "clap_derive", + "futures", + "futures-util", + "itertools", + "moor-kernel", + "moor-values", + "rpc-common", + "rustyline", + "strum", + "tmq", + "tokio", + "tokio-util", + "tracing", + "tracing-chrome", + "tracing-subscriber", + "uuid", +] + [[package]] name = "moor-daemon" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 85a67940..942c5630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/daemon", "crates/telnet-host", "crates/web-host", + "crates/console-host", "crates/server", "crates/regexpr-binding", ] diff --git a/crates/README.md b/crates/README.md index 07e6c635..32b5a8ad 100644 --- a/crates/README.md +++ b/crates/README.md @@ -12,6 +12,8 @@ Directory layout for `crates/` network `host`s * `web-host` - like the above, but hosts an HTTP server which provides a websocket interface to the system. as well as various web APIs. + * `console-host` - console host which connects as a user to the `daemon` and provides a readline-type interface to the + system. * `server` - a "monolithic" server which links kernel and provides telnet and websocket and repl hosts. * `rpc-common` - crate providing types used by both `daemon` and `host`, for the RPC interface diff --git a/crates/console-host/Cargo.toml b/crates/console-host/Cargo.toml new file mode 100644 index 00000000..ab041f63 --- /dev/null +++ b/crates/console-host/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "moor-console-host" +version = "0.1.0" +edition.workspace = true +repository.workspace = true +license.workspace = true + +[dependencies] +moor-kernel = { path = "../kernel" } +moor-values = { path = "../values" } +rpc-common = { path = "../rpc-common" } + +## Command line arguments parsing. +clap.workspace = true +clap_derive.workspace = true +strum.workspace = true + +# General. +anyhow.workspace = true +futures.workspace = true +futures-util.workspace = true +async-trait.workspace = true +bincode.workspace = true + +## Asynchronous transaction processing & networking +tokio.workspace = true +tokio-util.workspace = true + +## Logging & tracing +tracing.workspace = true +tracing-subscriber.workspace = true +tracing-chrome.workspace = true + +## ZMQ +tmq.workspace = true +uuid.workspace = true +itertools.workspace = true + +## For console +rustyline.workspace = true diff --git a/crates/console-host/src/main.rs b/crates/console-host/src/main.rs new file mode 100644 index 00000000..40b504b9 --- /dev/null +++ b/crates/console-host/src/main.rs @@ -0,0 +1,320 @@ +use anyhow::Error; +use std::process::exit; +use std::time::SystemTime; + +use clap::Parser; +use clap_derive::Parser; +use moor_values::var::objid::Objid; +use rustyline::error::ReadlineError; +use rustyline::DefaultEditor; +use tmq::request; +use tokio::select; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::task::block_in_place; +use tracing::{debug, error, info, trace, warn}; +use uuid::Uuid; + +use rpc_common::pubsub_client::{broadcast_recv, narrative_recv}; +use rpc_common::rpc_client::RpcSendClient; +use rpc_common::{ + BroadcastEvent, ConnectionEvent, RpcRequest, RpcResponse, RpcResult, BROADCAST_TOPIC, +}; + +#[derive(Parser, Debug)] +struct Args { + #[arg( + long, + value_name = "rpc-server", + help = "RPC server address", + default_value = "tcp://0.0.0.0:7899" + )] + rpc_server: String, + + #[arg( + long, + value_name = "narrative-server", + help = "Narrative server address", + default_value = "tcp://0.0.0.0:7898" + )] + narrative_server: String, + + #[arg( + long, + value_name = "username", + help = "Username to use for authentication", + default_value = "Wizard" + )] + username: String, + + #[arg( + long, + value_name = "password", + help = "Password to use for authentication", + default_value = "" + )] + password: String, +} + +async fn establish_connection( + client_id: Uuid, + rpc_client: &mut RpcSendClient, +) -> Result { + return match rpc_client + .make_rpc_call( + client_id, + RpcRequest::ConnectionEstablish("console".to_string()), + ) + .await + { + Ok(RpcResult::Success(RpcResponse::NewConnection(conn_id))) => Ok(conn_id), + Ok(RpcResult::Success(other)) => { + error!("Unexpected response: {:?}", other); + Err(Error::msg("Unexpected response")) + } + Ok(RpcResult::Failure(e)) => { + error!("Failure connecting: {:?}", e); + Err(Error::msg("Failure connecting")) + } + Err(e) => { + error!("Error connecting: {:?}", e); + Err(Error::msg("Error connecting")) + } + }; +} + +async fn perform_auth( + client_id: Uuid, + rpc_client: &mut RpcSendClient, + username: &str, + password: &str, +) -> Result { + // Need to first authenticate with the server. + return match rpc_client + .make_rpc_call( + client_id, + RpcRequest::LoginCommand(vec![ + "connect".to_string(), + username.to_string(), + password.to_string(), + ]), + ) + .await + { + Ok(RpcResult::Success(RpcResponse::LoginResult(Some((connect_type, player))))) => { + info!("Authenticated as {:?} with id {:?}", connect_type, player); + Ok(player) + } + Ok(RpcResult::Success(RpcResponse::LoginResult(None))) => { + error!("Authentication failed"); + Err(Error::msg("Authentication failed")) + } + Ok(RpcResult::Success(other)) => { + error!("Unexpected response: {:?}", other); + Err(Error::msg("Unexpected response")) + } + Ok(RpcResult::Failure(e)) => { + error!("Failure authenticating: {:?}", e); + Err(Error::msg("Failure authenticating")) + } + Err(e) => { + error!("Error authenticating: {:?}", e); + Err(Error::msg("Error authenticating")) + } + }; +} + +async fn handle_console_line(client_id: Uuid, line: &str, rpc_client: &mut RpcSendClient) { + // Lines are either 'eval' or 'command', depending on the mode we're in. + // TODO: The intent here is to do something like Julia's repl interface where they have a pkg + // mode (initiated by initial ] keystroke) and default repl mode. + // For us, our initial keystroke will provoke evaluation through `Eval` but default will be + // to send standard MOO commands. + // But For now, we'll just act as if we're a telnet connection. User can do eval with ; via + // the core. + let line = line.trim(); + match rpc_client + .make_rpc_call(client_id, RpcRequest::Command(line.to_string())) + .await + { + Ok(RpcResult::Success(RpcResponse::CommandComplete)) => { + trace!("Command complete"); + } + Ok(RpcResult::Success(other)) => { + warn!("Unexpected command response: {:?}", other); + } + Ok(RpcResult::Failure(e)) => { + error!("Failure executing command: {:?}", e); + } + Err(e) => { + error!("Error executing command: {:?}", e); + } + } +} + +async fn console_loop( + rpc_server: &str, + narrative_server: &str, + username: &str, + password: &str, +) -> Result<(), anyhow::Error> { + let zmq_ctx = tmq::Context::new(); + + // Establish a connection to the RPC server + let client_id = Uuid::new_v4(); + + let rcp_request_sock = request(&zmq_ctx) + .set_rcvtimeo(100) + .set_sndtimeo(100) + .connect(rpc_server) + .expect("Unable to bind RPC server for connection"); + + let mut rpc_client = RpcSendClient::new(rcp_request_sock); + + let conn_obj_id = establish_connection(client_id, &mut rpc_client).await?; + debug!("Transitional connection ID before auth: {:?}", conn_obj_id); + + // Now authenticate with the server. + let player = perform_auth(client_id, &mut rpc_client, username, password).await?; + + info!("Authenticated as {:?} / {}", username, player); + + // Spawn a thread to listen for events on the narrative pubsub channel, and send them to the + // console. + let narrative_subscriber = tmq::subscribe(&zmq_ctx) + .connect(narrative_server) + .expect("Unable to connect to narrative pubsub server"); + let mut narrative_subscriber = narrative_subscriber + .subscribe(client_id.as_bytes()) + .expect("Unable to subscribe to narrative pubsub server"); + let output_loop = tokio::spawn(async move { + loop { + match narrative_recv(client_id, &mut narrative_subscriber).await { + Ok(ConnectionEvent::Narrative(_, msg)) => { + println!("{}", msg.event()); + } + Ok(ConnectionEvent::SystemMessage(o, msg)) => { + eprintln!("SYSMSG: {}: {}", o, msg); + } + Ok(ConnectionEvent::Disconnect()) => { + error!("Received disconnect event; Session ending."); + return; + } + Err(e) => { + error!("Error receiving narrative event: {:?}; Session ending.", e); + return; + } + } + } + }); + + let broadcast_subscriber = tmq::subscribe(&zmq_ctx) + .connect(narrative_server) + .expect("Unable to connect to narrative pubsub server"); + let mut broadcast_subscriber = broadcast_subscriber + .subscribe(BROADCAST_TOPIC) + .expect("Unable to subscribe to narrative pubsub server"); + let broadcast_rcp_request_sock = request(&zmq_ctx) + .set_rcvtimeo(100) + .set_sndtimeo(100) + .connect(rpc_server) + .expect("Unable to bind RPC server for connection"); + let mut broadcast_rpc_client = RpcSendClient::new(broadcast_rcp_request_sock); + let broadcast_loop = tokio::spawn(async move { + loop { + match broadcast_recv(&mut broadcast_subscriber).await { + Ok(BroadcastEvent::PingPong(_)) => { + if let Err(e) = broadcast_rpc_client + .make_rpc_call(client_id, RpcRequest::Pong(SystemTime::now())) + .await + { + error!("Error sending pong: {:?}", e); + return; + } + } + Err(e) => { + error!("Error receiving broadcast event: {:?}; Session ending.", e); + return; + } + } + } + }); + + let edit_loop = tokio::spawn(async move { + let mut rl = DefaultEditor::new().unwrap(); + loop { + // TODO: unprovoked output from the narrative stream screws up the prompt midstream, + // but we have no real way to signal to this loop that it should newline for + // cleanliness. Need to figure out something for this. + let output = block_in_place(|| rl.readline("> ")); + match output { + Ok(line) => { + rl.add_history_entry(line.clone()) + .expect("Could not add history"); + handle_console_line(client_id, &line, &mut rpc_client).await; + } + Err(ReadlineError::Eof) => { + println!(""); + break; + } + Err(ReadlineError::Interrupted) => { + println!("^C"); + continue; + } + Err(e) => { + println!("Error: {e:?}"); + break; + } + } + } + }); + + select! { + _ = output_loop => { + info!("ZMQ client loop exited, stopping..."); + } + _ = broadcast_loop => { + info!("Broadcast loop exited, stopping..."); + } + _ = edit_loop => { + info!("Edit loop exited, stopping..."); + } + } + Ok(()) +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<(), anyhow::Error> { + let args: Args = Args::parse(); + + let main_subscriber = tracing_subscriber::fmt() + .compact() + .with_file(true) + .with_line_number(true) + .with_thread_ids(true) + .with_target(false) + .with_max_level(tracing::Level::INFO) + .finish(); + tracing::subscriber::set_global_default(main_subscriber) + .expect("Unable to set configure logging"); + + let mut hup_signal = + signal(SignalKind::hangup()).expect("Unable to register HUP signal handler"); + let mut stop_signal = + signal(SignalKind::interrupt()).expect("Unable to register STOP signal handler"); + + select! { + _ = console_loop(&args.rpc_server, args.narrative_server.as_str(), + &args.username, &args.password) => { + info!("console session exited, quitting..."); + exit(0); + } + _ = hup_signal.recv() => { + info!("HUP received, quitting..."); + exit(0); + }, + _ = stop_signal.recv() => { + info!("STOP received, quitting..."); + exit(0); + } + } +} diff --git a/crates/daemon/src/connections.rs b/crates/daemon/src/connections.rs index 2b6dba60..eefa6d81 100644 --- a/crates/daemon/src/connections.rs +++ b/crates/daemon/src/connections.rs @@ -184,6 +184,23 @@ impl Connections { }); } + pub(crate) async fn activity_for_client( + &self, + client_id: Uuid, + connobj: Objid, + ) -> Result<(), anyhow::Error> { + let mut inner = self.connections_list.write().unwrap(); + let connection_record = inner + .connections_client + .get_mut(&connobj) + .ok_or_else(|| anyhow::anyhow!("No connection for player: {}", connobj))? + .iter_mut() + .find(|cr| cr.client_id == client_id) + .ok_or_else(|| anyhow::anyhow!("No connection record for client: {}", client_id))?; + connection_record.last_activity = SystemTime::now(); + Ok(()) + } + /// Update the last ping time for a client / connection. pub(crate) async fn notify_is_alive( &self, diff --git a/crates/daemon/src/rpc_server.rs b/crates/daemon/src/rpc_server.rs index 8d22ac44..54d4fb5f 100644 --- a/crates/daemon/src/rpc_server.rs +++ b/crates/daemon/src/rpc_server.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::{Instant, SystemTime}; -use anyhow::{Context, Error}; +use anyhow::{bail, Context, Error}; use futures_util::SinkExt; use itertools::Itertools; use metrics_macros::increment_counter; @@ -228,6 +228,9 @@ impl RpcServer { async fn last_activity_for(&self, player: Objid) -> Result { let connections = self.connections.connection_records_for(player).await?; + if connections.is_empty() { + bail!("No connections for player: {}", player); + } // Grab the most recent connection record (they are sorted by last_activity, so last item). Ok(connections.last().unwrap().last_activity) } @@ -321,7 +324,20 @@ impl RpcServer { connection: Objid, args: Vec, ) -> Result { - debug!("Performing login for client: {}", client_id); + increment_counter!("rpc_server.perform_login"); + + // TODO: change result of login to return this information, rather than just Objid, so + // we're not dependent on this. + let connect_type = if args.get(0) == Some(&"create".to_string()) { + ConnectType::Created + } else { + ConnectType::Connected + }; + + debug!( + "Performing {:?} login for client: {}", + connect_type, client_id + ); let Ok(session) = self.clone().new_session(client_id, connection).await else { increment_counter!("rpc_server.perform_login.create_session_failed"); return Err(RpcError::CreateSessionFailed); @@ -401,7 +417,7 @@ impl RpcServer { trace!(?player, "Submitting user_connected task"); if let Err(e) = self .clone() - .submit_connected_task(client_id, player, ConnectType::Connected) + .submit_connected_task(client_id, player, connect_type) .await { error!(error = ?e, "Error submitting user_connected task"); @@ -411,7 +427,7 @@ impl RpcServer { } increment_counter!("rpc_server.perform_login.success"); - Ok(LoginResult(Some((ConnectType::Connected, player)))) + Ok(LoginResult(Some((connect_type, player)))) } async fn submit_connected_task( @@ -426,6 +442,8 @@ impl RpcServer { .await .with_context(|| "could not create 'connected' task session for player")?; + increment_counter!("rpc_server.submit_connected_task"); + let connected_verb = match initiation_type { ConnectType::Connected => "user_connected".to_string(), ConnectType::Reconnected => "user_reconnected".to_string(), @@ -456,6 +474,16 @@ impl RpcServer { return Err(RpcError::CreateSessionFailed); }; + if let Err(e) = self + .connections + .activity_for_client(client_id, connection) + .await + { + warn!("Unable to update client connection activity: {}", e); + }; + + increment_counter!("rpc_server.perform_command"); + // TODO: try to submit to do_command first and only parse_command after that fails. debug!(command, ?client_id, ?connection, "Submitting command task"); let task_id = match self @@ -535,6 +563,8 @@ impl RpcServer { increment_counter!("rpc_server.perform_command.create_session_failed"); return Err(RpcError::CreateSessionFailed); }; + increment_counter!("rpc_server.perform_out_of_band"); + let command_components = parse_into_words(command.as_str()); let _ = match self .clone() @@ -570,6 +600,7 @@ impl RpcServer { return Err(RpcError::CreateSessionFailed); }; + increment_counter!("rpc_server.eval"); let task_id = match self .clone() .scheduler diff --git a/crates/web-host/src/ws_host.rs b/crates/web-host/src/ws_host.rs index 03aabde3..3d0612ec 100644 --- a/crates/web-host/src/ws_host.rs +++ b/crates/web-host/src/ws_host.rs @@ -259,7 +259,7 @@ pub async fn ws_create_handler( ConnectInfo(addr): ConnectInfo, State(ws_host): State, ) -> impl IntoResponse { - increment_counter!("ws_host.new_connection"); + increment_counter!("ws_host.new_creation"); info!("Connection from {}", addr); ws.on_upgrade(move |socket| async move { ws_host