From 6f851a0d7bcbec69f87d11be21a6fe07f24ffc59 Mon Sep 17 00:00:00 2001 From: hheik <4469778+hheik@users.noreply.github.com> Date: Thu, 8 Aug 2024 02:10:44 +0300 Subject: [PATCH] Reworked messaging --- src/client/app.rs | 66 +++++++++++--- src/client/request_queue.rs | 10 +-- src/client/ui.rs | 2 +- src/lib.rs | 9 ++ src/os_unix.rs | 174 +++++++++++++++++++----------------- src/protocol.rs | 37 ++++---- src/server.rs | 96 ++++++++++++++++---- src/server/audio_backend.rs | 7 -- src/server/decoder.rs | 2 +- src/server/decoder/opus.rs | 2 +- src/server/playback.rs | 46 +++++++--- 11 files changed, 292 insertions(+), 159 deletions(-) delete mode 100644 src/server/audio_backend.rs diff --git a/src/client/app.rs b/src/client/app.rs index 9168229..f5f1985 100644 --- a/src/client/app.rs +++ b/src/client/app.rs @@ -1,4 +1,5 @@ use std::{ + path::PathBuf, sync::{Arc, Mutex}, time::Duration, }; @@ -7,6 +8,7 @@ use rmp::{ protocol::{Message, MessageType}, ServerState, }; +use serde::Serialize; pub struct App { pub options: AppOptions, @@ -35,32 +37,65 @@ impl App { self.message_queue.lock().unwrap().push(message); } + fn map_and_serialize(&mut self, f: F) -> Option> + where + F: FnOnce(&ServerState) -> Option, + T: Serialize, + { + let data = self.state.lock().unwrap().as_ref().and_then(f); + data.map(|data| bincode::serialize(&data).unwrap()) + } + pub fn connected(&self) -> bool { self.state.lock().unwrap().is_some() } pub fn toggle_shuffle(&mut self) { - self.push_message(Message::new(MessageType::ToggleSuffle, None)); + let body = self.map_and_serialize(|state| Some(!state.playlist_params.shuffle)); + self.push_message(Message { + message_type: MessageType::SetShuffle, + body, + }) } pub fn toggle_next(&mut self) { - self.push_message(Message::new(MessageType::ToggleNext, None)); + let body = self.map_and_serialize(|state| Some(!state.playlist_params.next)); + self.push_message(Message { + message_type: MessageType::SetNext, + body, + }) } pub fn toggle_repeat(&mut self) { - self.push_message(Message::new(MessageType::ToggleRepeat, None)); - } - - pub fn fetch_state(&mut self) { - self.push_message(Message::new(MessageType::StateFetch, None)); - } - - pub fn play(&mut self) { - self.push_message(Message::new(MessageType::PlayTrack, None)); + let body = self.map_and_serialize(|state| Some(!state.playlist_params.repeat)); + self.push_message(Message { + message_type: MessageType::SetRepeat, + body, + }) } pub fn toggle_pause(&mut self) { - self.push_message(Message::new(MessageType::TogglePause, None)); + let body = + self.map_and_serialize(|state| state.player.as_ref().map(|player| !player.is_paused)); + self.push_message(Message { + message_type: MessageType::SetPause, + body, + }); + } + + pub fn fetch_state(&mut self) { + self.push_message(Message { + message_type: MessageType::StateFetch, + body: None, + }); + } + + pub fn play(&mut self, track: PathBuf) { + let body = self.map_and_serialize(|_state| Some(track)); + self.push_message(Message { + message_type: MessageType::PlayTrackFromPath, + body, + }); } pub fn on_key(&mut self, key: char) { @@ -83,14 +118,17 @@ impl App { pub fn on_down(&mut self) {} pub fn on_enter(&mut self) { - self.play(); + self.play("".into()); // TODO: Remove hardcoding } pub fn on_tab(&mut self) {} pub fn on_tick(&mut self, _duration: Duration) { if self.options.sync_state { - self.push_message(Message::new(MessageType::StateFetch, None)); + self.push_message(Message { + message_type: MessageType::StateFetch, + body: None, + }); } } } diff --git a/src/client/request_queue.rs b/src/client/request_queue.rs index 3d901ea..e4ac2a3 100644 --- a/src/client/request_queue.rs +++ b/src/client/request_queue.rs @@ -21,10 +21,10 @@ pub fn request_queue_cleaner( if should_connect { *state.lock().unwrap() = None; stream = Some(connect()); - queue - .lock() - .unwrap() - .push(Message::new(MessageType::StateFetch, None)); + queue.lock().unwrap().push(Message { + message_type: MessageType::StateFetch, + body: None, + }); should_connect = false; } match stream.as_mut() { @@ -57,7 +57,7 @@ pub fn request_queue_cleaner( /// Blocks thread until connected to socket fn connect() -> LocalSocketStream { - let path = rmp::os::get_socket_path().unwrap(); + let path = rmp::os::server::get_socket_path().unwrap(); loop { match LocalSocketStream::connect(path.clone()) { Ok(stream) => return stream, diff --git a/src/client/ui.rs b/src/client/ui.rs index 8c4107b..c58f543 100644 --- a/src/client/ui.rs +++ b/src/client/ui.rs @@ -89,7 +89,7 @@ fn draw_player(f: &mut Frame, state: &ServerState, area: Rect) { ), ( state.playlist_params.shuffle, - vec![Span::from("S").underlined(), Span::from("UFFLE")], + vec![Span::from("S").underlined(), Span::from("HUFFLE")], ), ( state.playlist_params.repeat, diff --git a/src/lib.rs b/src/lib.rs index 57e9581..a73abde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,15 @@ pub mod protocol; #[derive(Serialize, Deserialize, Debug, Default)] pub struct ServerState { pub playlist_params: PlaylistParams, + pub player: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct PlayerState { + pub track: PlaylistElement, + pub is_paused: bool, + pub duration: Option, + pub current_position: Option, } pub type PlaylistElement = PathBuf; diff --git a/src/os_unix.rs b/src/os_unix.rs index 2f0eb7c..face5bb 100644 --- a/src/os_unix.rs +++ b/src/os_unix.rs @@ -1,92 +1,102 @@ -use std::{ - fs, - path::{Path, PathBuf}, - process::{id, Command, Stdio}, -}; +pub mod client { + use std::path::PathBuf; -use super::server::ServerError; - -pub fn reserve_pid() -> Result<(), ServerError> { - let pid_path = get_pid_path()?; - is_running()?; - - fs::write(&pid_path, id().to_string()).map_err(|err| ServerError::Io(err))?; - Command::new("chmod") - .args(&["600", &pid_path.to_string_lossy()]) - .output() - .map_err(|err| ServerError::Io(err))?; - Ok(()) -} - -pub fn is_running() -> Result { - let pid_path = get_pid_path()?; - - match fs::read(&pid_path) { - Ok(old_pid) => { - let old_pid = - String::from_utf8(old_pid).map_err(|err| ServerError::from_debuggable(err))?; - let old_pid = old_pid.trim(); - Ok(Command::new("ps") - .args(&["-p", old_pid]) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .status() - .map_err(|err| ServerError::Io(err))? - .success()) - } - _ => Ok(false), + pub fn get_music_dir() -> PathBuf { + PathBuf::from(std::env!("HOME")).join("Music") } } -pub fn run_in_background() -> Result<(), ServerError> { - let this = std::env::args().next().unwrap(); - Command::new(this) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .args(&["-s"]) - .spawn() - .map_err(|err| ServerError::Io(err))?; - Ok(()) -} +pub mod server { + use std::{ + fs, + path::{Path, PathBuf}, + process::{id, Command, Stdio}, + }; -pub fn kill() -> Result<(), ServerError> { - let pid_path = get_pid_path()?; - let socket_path = get_socket_path()?; - let pid = fs::read(&pid_path).map_err(|_| ServerError::NotRunning)?; - let pid = String::from_utf8(pid).map_err(|err| ServerError::from_debuggable(err))?; - let pid = pid.trim(); - Command::new("kill") - .arg(pid) - .spawn() - .map_err(|err| ServerError::Io(err))?; - Command::new("rm") - .args(&[ - "-f", - &pid_path.to_string_lossy(), - &socket_path.to_string_lossy(), - ]) - .spawn() - .map_err(|err| ServerError::Io(err))?; - Ok(()) -} + use crate::server::ServerError; -pub fn get_socket_path() -> Result { - Ok(get_runtime_dir()?.join("rmp.socket")) -} + pub fn reserve_pid() -> Result<(), ServerError> { + let pid_path = get_pid_path()?; + is_running()?; -fn get_runtime_dir() -> Result { - let uid = String::from_utf8( - Command::new("id") - .arg("-u") + fs::write(&pid_path, id().to_string()).map_err(|err| ServerError::Io(err))?; + Command::new("chmod") + .args(&["600", &pid_path.to_string_lossy()]) .output() - .map_err(|err| ServerError::Io(err))? - .stdout, - ) - .map_err(|err| ServerError::from_debuggable(err))?; - let dir = Path::new("/run/user").join(uid.trim().to_string()); - Ok(dir) -} + .map_err(|err| ServerError::Io(err))?; + Ok(()) + } -fn get_pid_path() -> Result { - Ok(get_runtime_dir()?.join("rmp.pid")) + pub fn is_running() -> Result { + let pid_path = get_pid_path()?; + + match fs::read(&pid_path) { + Ok(old_pid) => { + let old_pid = + String::from_utf8(old_pid).map_err(|err| ServerError::from_debuggable(err))?; + let old_pid = old_pid.trim(); + Ok(Command::new("ps") + .args(&["-p", old_pid]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .map_err(|err| ServerError::Io(err))? + .success()) + } + _ => Ok(false), + } + } + + pub fn run_in_background() -> Result<(), ServerError> { + let this = std::env::args().next().unwrap(); + Command::new(this) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .args(&["-s"]) + .spawn() + .map_err(|err| ServerError::Io(err))?; + Ok(()) + } + + pub fn kill() -> Result<(), ServerError> { + let pid_path = get_pid_path()?; + let socket_path = get_socket_path()?; + let pid = fs::read(&pid_path).map_err(|_| ServerError::NotRunning)?; + let pid = String::from_utf8(pid).map_err(|err| ServerError::from_debuggable(err))?; + let pid = pid.trim(); + Command::new("kill") + .arg(pid) + .spawn() + .map_err(|err| ServerError::Io(err))?; + Command::new("rm") + .args(&[ + "-f", + &pid_path.to_string_lossy(), + &socket_path.to_string_lossy(), + ]) + .spawn() + .map_err(|err| ServerError::Io(err))?; + Ok(()) + } + + pub fn get_socket_path() -> Result { + Ok(get_runtime_dir()?.join("rmp.socket")) + } + + fn get_runtime_dir() -> Result { + let uid = String::from_utf8( + Command::new("id") + .arg("-u") + .output() + .map_err(|err| ServerError::Io(err))? + .stdout, + ) + .map_err(|err| ServerError::from_debuggable(err))?; + let dir = Path::new("/run/user").join(uid.trim().to_string()); + Ok(dir) + } + + fn get_pid_path() -> Result { + Ok(get_runtime_dir()?.join("rmp.pid")) + } } diff --git a/src/protocol.rs b/src/protocol.rs index 84b1677..d48a066 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -3,7 +3,6 @@ use std::{ io::{Read, Write}, }; -use interprocess::local_socket::LocalSocketStream; use serde::{Deserialize, Serialize}; use crate::ServerState; @@ -26,17 +25,20 @@ pub enum MessageError { pub enum MessageType { /// Generic acknowledge Ack = 0, - /// client/server did not know how to handle the request + /// Recipient did not know how to handle the request. NotImplementedAck, - /// Request was invalid + /// Request was not a valid message. For example if the checksum did not match. ProtocolError, + /// Request was a valid message, but it was incorrect. For example the body could be missing if + /// it was expected + ApplicationError, StateFetch, StateResponse, - ToggleSuffle, - ToggleNext, - ToggleRepeat, - PlayTrack, - TogglePause, + SetShuffle, + SetNext, + SetRepeat, + SetPause, + PlayTrackFromPath, } #[derive(Debug, Serialize, Deserialize)] @@ -46,12 +48,9 @@ pub struct Message { } impl Message { - pub fn new(message_type: MessageType, body: Option<&[u8]>) -> Self { - Self { - message_type, - body: body.map(|b| Vec::from(b)), - } - } + // pub fn new(message_type: MessageType, body: Option>) -> Self { + // Self { message_type, body } + // } /// Message format (values are in little-endian): /// offset | size | explanation @@ -89,12 +88,18 @@ impl Display for Message { } } -pub fn send(stream: &mut LocalSocketStream, message: &Message) -> Result<(), std::io::Error> { +pub fn send(stream: &mut T, message: &Message) -> Result<(), std::io::Error> +where + T: Write, +{ stream.write_all(&message.as_bytes())?; Ok(()) } -pub fn receive(stream: &mut LocalSocketStream) -> Result { +pub fn receive(stream: &mut T) -> Result +where + T: Read, +{ let mut magic_buffer = vec![0; HEADER_MAGIC.len()]; if let Err(_) = stream.read_exact(&mut magic_buffer) { return Err(MessageError::ReadError); diff --git a/src/server.rs b/src/server.rs index 8fab1c2..bcefaa0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,6 +8,7 @@ use rmp::{ }; use std::{ fs, + path::PathBuf, sync::{Arc, Mutex}, }; @@ -15,7 +16,6 @@ use crate::CliArgs; use self::playback::{playback_manager, Playback}; -pub mod audio_backend; pub mod decoder; pub mod playback; @@ -34,7 +34,7 @@ impl Server { } pub fn run(args: CliArgs) -> Result<(), ServerError> { - if let Err(err) = os::reserve_pid() { + if let Err(err) = os::server::reserve_pid() { let exit_code = handle_error(err); std::process::exit(exit_code); } @@ -45,6 +45,7 @@ pub fn run(args: CliArgs) -> Result<(), ServerError> { next: args.next, repeat: args.repeat, }, + player: None, }))); let server_clone = server.clone(); @@ -57,14 +58,14 @@ pub fn run(args: CliArgs) -> Result<(), ServerError> { } pub fn kill() { - if let Err(err) = os::kill() { + if let Err(err) = os::server::kill() { let exit_code = handle_error(err); std::process::exit(exit_code); } } pub fn is_running() -> Result { - match os::is_running() { + match os::server::is_running() { Ok(is_running) => Ok(is_running), Err(err) => { let exit_code = handle_error(err); @@ -74,14 +75,14 @@ pub fn is_running() -> Result { } pub fn run_in_background() { - if let Err(err) = os::run_in_background() { + if let Err(err) = os::server::run_in_background() { let exit_code = handle_error(err); std::process::exit(exit_code); } } fn serve(server: Arc>) -> Result<(), ServerError> { - let socket_path = os::get_socket_path()?; + let socket_path = os::server::get_socket_path()?; if socket_path.exists() { fs::remove_file(&socket_path).map_err(|err| ServerError::Io(err))?; } @@ -135,7 +136,11 @@ fn session_handler(mut stream: LocalSocketStream, server: Arc>) { error => { eprintln!("[{session_id}] rx Error {error:?}"); let body = bincode::serialize(&error).unwrap(); - let message = Message::new(MessageType::ProtocolError, Some(&body)); + // TODO: Server-side error logging so internal error data is not sent to client + let message = Message { + message_type: MessageType::ProtocolError, + body: Some(body), + }; eprintln!("[{session_id}] tx {message}"); rmp::protocol::send(&mut stream, &message).unwrap(); } @@ -172,22 +177,77 @@ fn handle_error(err: ServerError) -> i32 { fn route_request(request: &Message, server: &mut Server) -> Result { match request.message_type { MessageType::StateFetch => {} - MessageType::ToggleNext => { - server.state.playlist_params.next = !server.state.playlist_params.next; + MessageType::SetNext => { + let body = match request.body.as_ref() { + Some(body) => body, + None => { + return Ok(Message { + message_type: MessageType::ApplicationError, + body: None, + }) + } + }; + let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?; + server.state.playlist_params.next = new_state; } - MessageType::ToggleSuffle => { - server.state.playlist_params.shuffle = !server.state.playlist_params.shuffle; + MessageType::SetShuffle => { + let body = match request.body.as_ref() { + Some(body) => body, + None => { + return Ok(Message { + message_type: MessageType::ApplicationError, + body: None, + }) + } + }; + let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?; + server.state.playlist_params.shuffle = new_state; } - MessageType::ToggleRepeat => { - server.state.playlist_params.repeat = !server.state.playlist_params.repeat; + MessageType::SetRepeat => { + let body = match request.body.as_ref() { + Some(body) => body, + None => { + return Ok(Message { + message_type: MessageType::ApplicationError, + body: None, + }) + } + }; + let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?; + server.state.playlist_params.repeat = new_state; } - MessageType::PlayTrack => { - server.playback.play_test(); + MessageType::SetPause => { + let body = match request.body.as_ref() { + Some(body) => body, + None => { + return Ok(Message { + message_type: MessageType::ApplicationError, + body: None, + }) + } + }; + let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?; + server.playback.pause(new_state); } - MessageType::TogglePause => { - server.playback.pause_test(); + MessageType::PlayTrackFromPath => { + let body = match request.body.as_ref() { + Some(body) => body, + None => { + return Ok(Message { + message_type: MessageType::ApplicationError, + body: None, + }) + } + }; + let body: PathBuf = bincode::deserialize(&body).map_err(|err| err.to_string())?; + server.playback.play(body); + } + _ => { + return Ok(Message { + message_type: MessageType::NotImplementedAck, + body: None, + }) } - _ => return Ok(Message::new(MessageType::NotImplementedAck, None)), } Message::state_response(&server.state) } diff --git a/src/server/audio_backend.rs b/src/server/audio_backend.rs deleted file mode 100644 index 95663ae..0000000 --- a/src/server/audio_backend.rs +++ /dev/null @@ -1,7 +0,0 @@ -use std::{path::Path, time::Duration}; - -pub trait AudioBackend { - fn seek_and_play(&mut self, file_path: &Path, seek: Duration) -> (); - fn pause_playback(&mut self) -> (); - fn continue_playback(&mut self) -> (); -} diff --git a/src/server/decoder.rs b/src/server/decoder.rs index 9c71324..1167170 100644 --- a/src/server/decoder.rs +++ b/src/server/decoder.rs @@ -1,6 +1,6 @@ // Implement decoders missing from rodio. // At the moment this means: -// - Opus (Ogg, WebM) +// - Opus (WebM) // - Mod // - XM diff --git a/src/server/decoder/opus.rs b/src/server/decoder/opus.rs index f22f3a6..8470cb8 100644 --- a/src/server/decoder/opus.rs +++ b/src/server/decoder/opus.rs @@ -90,7 +90,7 @@ impl Iterator for OpusDecoder { } let item = self.decoded_buffer.get(self.decoded_cursor).cloned(); self.decoded_cursor += 1; - item + item.map(|sample| sample) } } diff --git a/src/server/playback.rs b/src/server/playback.rs index 3b00cba..815db53 100644 --- a/src/server/playback.rs +++ b/src/server/playback.rs @@ -1,9 +1,11 @@ use std::{ + borrow::BorrowMut, fs::File, path::PathBuf, sync::{Arc, Mutex}, }; +use rmp::PlayerState; use rodio::{OutputStream, Sink}; use super::{ @@ -15,8 +17,8 @@ use super::{ static TRACK: &str = ""; enum PlaybackCommand { - Play(PathBuf), - TogglePause, + PlayPath(PathBuf), + SetPause(bool), } pub struct Playback { @@ -32,12 +34,14 @@ impl Default for Playback { } impl Playback { - pub fn play_test(&mut self) { - self.command_queue.push(PlaybackCommand::Play(TRACK.into())) + pub fn play(&mut self, path: PathBuf) { + self.command_queue + .push(PlaybackCommand::PlayPath(TRACK.into())) // TODO: Remove hardcoding } - pub fn pause_test(&mut self) { - self.command_queue.push(PlaybackCommand::TogglePause) + pub fn pause(&mut self, new_state: bool) { + self.command_queue + .push(PlaybackCommand::SetPause(new_state)) } } @@ -45,15 +49,17 @@ pub fn playback_manager(server: Arc>) { let (_stream, stream_handle) = OutputStream::try_default().unwrap(); let sink: Sink = Sink::try_new(&stream_handle).unwrap(); loop { - for command in server.lock().unwrap().playback.command_queue.drain(..) { + let mut server = server.lock().unwrap(); + let commands: Vec = server.playback.command_queue.drain(..).collect(); + for command in commands { match command { - PlaybackCommand::Play(track) => { + PlaybackCommand::PlayPath(track) => { let mut source: Option>> = None; { let file = File::open(&track).unwrap(); if let Ok(decoder) = Decoder::rodio(SourceWrapper::from_file(file)) { - println!("rodio:\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len()); + println!("playback: rodio\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len()); source = Some(decoder); }; } @@ -61,13 +67,19 @@ pub fn playback_manager(server: Arc>) { if source.is_none() { let file = File::open(&track).unwrap(); if let Ok(decoder) = Decoder::custom(SourceWrapper::from_file(file)) { - println!("custom:\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len()); + println!("playback: custom\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len()); source = Some(decoder); } } match source { Some(source) => { + server.state.player = Some(PlayerState { + track, + is_paused: false, + duration: None, + current_position: None, + }); sink.clear(); sink.append(source); sink.play(); @@ -75,12 +87,18 @@ pub fn playback_manager(server: Arc>) { None => println!("No handler found for '{track:?}'"), } } - PlaybackCommand::TogglePause => { - if !sink.empty() { + PlaybackCommand::SetPause(new_state) => { + if !sink.empty() && new_state != sink.is_paused() { if sink.is_paused() { - sink.play() + sink.play(); + if let Some(player) = server.state.player.borrow_mut() { + player.is_paused = false; + } } else { - sink.pause() + sink.pause(); + if let Some(player) = server.state.player.borrow_mut() { + player.is_paused = true; + } } } }