From d8eaea70ca740eb393fd1858f5ccf5d6a703526f Mon Sep 17 00:00:00 2001 From: hheik <4469778+hheik@users.noreply.github.com> Date: Thu, 22 Aug 2024 01:02:36 +0300 Subject: [PATCH] First working monolith version with audio playback --- src/client.rs | 31 +------ src/client/app.rs | 87 +++--------------- src/client/crossterm.rs | 32 +++---- src/client/ui.rs | 29 +++--- src/lib.rs | 6 +- src/main.rs | 19 +--- src/server.rs | 192 +--------------------------------------- src/server/playback.rs | 149 +++++++++++++------------------ 8 files changed, 107 insertions(+), 438 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4163740..d30800b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,15 +1,8 @@ -use std::{ - error::Error, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{error::Error, time::Duration}; use crate::CliArgs; -use self::{ - app::{App, AppOptions}, - request_queue::request_queue_cleaner, -}; +use self::app::{App, AppOptions}; pub mod app; pub mod crossterm; @@ -17,29 +10,11 @@ pub mod request_queue; pub mod ui; pub fn run(args: CliArgs) -> Result<(), Box> { - let message_queue = Arc::new(Mutex::new(vec![])); - let server_state = Arc::new(Mutex::new(None)); let options = AppOptions { title: "rmp - Rust Music Player".into(), enhanced_graphics: args.enhanced_graphics, - sync_state: args.sync_state, }; - let app = App { - options, - should_quit: false, - state: server_state.clone(), - message_queue: message_queue.clone(), - }; - let thread_builder = std::thread::Builder::new().name("request_queue".into()); - thread_builder - .spawn(move || { - request_queue_cleaner( - Duration::from_millis(args.message_rate), - message_queue.clone(), - server_state.clone(), - ) - }) - .unwrap(); + let app = App::new(options); crossterm::run(app, Duration::from_millis(args.tick_rate))?; Ok(()) } diff --git a/src/client/app.rs b/src/client/app.rs index f5f1985..509665f 100644 --- a/src/client/app.rs +++ b/src/client/app.rs @@ -1,25 +1,18 @@ -use std::{ - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{path::PathBuf, time::Duration}; -use rmp::{ - protocol::{Message, MessageType}, - ServerState, -}; -use serde::Serialize; +use crate::server::playback::Playback; + +use rmp::TrackChangeOptions; pub struct App { pub options: AppOptions, pub should_quit: bool, - pub message_queue: Arc>>, - pub state: Arc>>, + pub playback: Playback, + pub track_change_options: TrackChangeOptions, } pub struct AppOptions { pub title: String, - pub sync_state: bool, pub enhanced_graphics: bool, } @@ -28,74 +21,29 @@ impl App { Self { options, should_quit: false, - message_queue: Arc::new(Mutex::new(vec![])), - state: Arc::new(Mutex::new(None)), + playback: Playback::new(), + track_change_options: Default::default(), } } - fn push_message(&mut self, message: Message) { - self.message_queue.lock().unwrap().push(message); - } - - fn map_and_serialize(&mut self, f: F) -> Option> - where - F: FnOnce(&ServerState) -> Option, - T: Serialize, - { - let data = self.state.lock().unwrap().as_ref().and_then(f); - data.map(|data| bincode::serialize(&data).unwrap()) - } - - pub fn connected(&self) -> bool { - self.state.lock().unwrap().is_some() - } - pub fn toggle_shuffle(&mut self) { - let body = self.map_and_serialize(|state| Some(!state.playlist_params.shuffle)); - self.push_message(Message { - message_type: MessageType::SetShuffle, - body, - }) + self.track_change_options.shuffle = !self.track_change_options.shuffle; } pub fn toggle_next(&mut self) { - let body = self.map_and_serialize(|state| Some(!state.playlist_params.next)); - self.push_message(Message { - message_type: MessageType::SetNext, - body, - }) + self.track_change_options.next = !self.track_change_options.next; } pub fn toggle_repeat(&mut self) { - let body = self.map_and_serialize(|state| Some(!state.playlist_params.repeat)); - self.push_message(Message { - message_type: MessageType::SetRepeat, - body, - }) + self.track_change_options.repeat = !self.track_change_options.repeat; } pub fn toggle_pause(&mut self) { - let body = - self.map_and_serialize(|state| state.player.as_ref().map(|player| !player.is_paused)); - self.push_message(Message { - message_type: MessageType::SetPause, - body, - }); - } - - pub fn fetch_state(&mut self) { - self.push_message(Message { - message_type: MessageType::StateFetch, - body: None, - }); + self.playback.toggle_pause(); } pub fn play(&mut self, track: PathBuf) { - let body = self.map_and_serialize(|_state| Some(track)); - self.push_message(Message { - message_type: MessageType::PlayTrackFromPath, - body, - }); + self.playback.play_immediate(track); } pub fn on_key(&mut self, key: char) { @@ -123,12 +71,5 @@ impl App { pub fn on_tab(&mut self) {} - pub fn on_tick(&mut self, _duration: Duration) { - if self.options.sync_state { - self.push_message(Message { - message_type: MessageType::StateFetch, - body: None, - }); - } - } + pub fn on_tick(&mut self, _duration: Duration) {} } diff --git a/src/client/crossterm.rs b/src/client/crossterm.rs index bccb149..17a726a 100644 --- a/src/client/crossterm.rs +++ b/src/client/crossterm.rs @@ -33,7 +33,7 @@ pub fn run(app: App, tick_rate: Duration) -> Result<(), Box> { terminal.show_cursor()?; if let Err(err) = res { - println!("{err:?}"); + eprintln!("{err:?}"); } Ok(()) @@ -54,27 +54,15 @@ fn run_app( if crossterm::event::poll(timeout)? { if let Event::Key(key) = event::read()? { if key.kind == KeyEventKind::Press { - if app.connected() { - match key.code { - KeyCode::Char(c) => app.on_key(c), - KeyCode::Left => app.on_left(), - KeyCode::Up => app.on_up(), - KeyCode::Right => app.on_right(), - KeyCode::Down => app.on_down(), - KeyCode::Enter => app.on_enter(), - KeyCode::Tab => app.on_tab(), - _ => {} - } - } else { - // Allow quitting while in "Not connected" screen - match key.code { - KeyCode::Char(c) => { - if c == 'q' { - app.should_quit = true; - } - } - _ => (), - } + match key.code { + KeyCode::Char(c) => app.on_key(c), + KeyCode::Left => app.on_left(), + KeyCode::Up => app.on_up(), + KeyCode::Right => app.on_right(), + KeyCode::Down => app.on_down(), + KeyCode::Enter => app.on_enter(), + KeyCode::Tab => app.on_tab(), + _ => {} } } } diff --git a/src/client/ui.rs b/src/client/ui.rs index c58f543..a2b3f3b 100644 --- a/src/client/ui.rs +++ b/src/client/ui.rs @@ -2,20 +2,15 @@ use ratatui::{ prelude::*, widgets::{block::Title, *}, }; -use rmp::ServerState; use super::app::App; pub fn draw(f: &mut Frame, app: &mut App) { - if let Some(state) = app.state.lock().unwrap().as_ref() { - let chunks = Layout::default() - .constraints([Constraint::Min(3), Constraint::Length(2)].as_ref()) - .split(f.size()); - draw_playlist(f, state, chunks[0]); - draw_player(f, state, chunks[1]); - } else { - draw_no_connection(f); - } + let chunks = Layout::default() + .constraints([Constraint::Min(3), Constraint::Length(2)].as_ref()) + .split(f.size()); + draw_playlist(f, app, chunks[0]); + draw_player(f, app, chunks[1]); } static PRIMARY_COLOR: Color = Color::Rgb(200, 150, 70); @@ -47,7 +42,7 @@ fn draw_no_connection(f: &mut Frame) { ); } -fn draw_playlist(f: &mut Frame, _state: &ServerState, area: Rect) { +fn draw_playlist(f: &mut Frame, _app: &App, area: Rect) { let playlist = List::new(vec![]) .block( Block::default() @@ -58,11 +53,11 @@ fn draw_playlist(f: &mut Frame, _state: &ServerState, area: Rect) f.render_widget(playlist, area); } -fn draw_player(f: &mut Frame, state: &ServerState, area: Rect) { +fn draw_player(f: &mut Frame, app: &App, area: Rect) { fn decorate_bool(span: Span, value: bool) -> Span { match value { - true => return span.bg(PRIMARY_COLOR).fg(PRIMARY_CONTRAST), - false => return span.fg(CLEAR_CONTRAST), + true => span.bg(PRIMARY_COLOR).fg(PRIMARY_CONTRAST), + false => span.fg(CLEAR_CONTRAST), } } @@ -80,7 +75,7 @@ fn draw_player(f: &mut Frame, state: &ServerState, area: Rect) { // Horrible to look at, worse to write let param_titles: Vec = vec![ ( - state.playlist_params.next, + app.track_change_options.next, vec![ Span::from("NE"), Span::from("X").underlined(), @@ -88,11 +83,11 @@ fn draw_player<B: Backend>(f: &mut Frame<B>, state: &ServerState, area: Rect) { ], ), ( - state.playlist_params.shuffle, + app.track_change_options.shuffle, vec![Span::from("S").underlined(), Span::from("HUFFLE")], ), ( - state.playlist_params.repeat, + app.track_change_options.repeat, vec![Span::from("R").underlined(), Span::from("EPEAT")], ), ] diff --git a/src/lib.rs b/src/lib.rs index 8fe5859..0cd750d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ pub mod protocol; #[derive(Serialize, Deserialize, Debug, Default)] pub struct ServerState { - pub playlist_params: PlaylistParams, + pub track_change_options: TrackChangeOptions, pub player: Option<PlayerState>, } @@ -48,13 +48,13 @@ pub struct QueuePlaylist { } #[derive(Serialize, Deserialize, Debug)] -pub struct PlaylistParams { +pub struct TrackChangeOptions { pub shuffle: bool, pub next: bool, pub repeat: bool, } -impl Default for PlaylistParams { +impl Default for TrackChangeOptions { fn default() -> Self { Self { shuffle: false, diff --git a/src/main.rs b/src/main.rs index 3040118..1020581 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ pub mod client; pub mod server; /// rmp: Rust Music Player -#[derive(Debug, FromArgs)] +#[derive(Debug, FromArgs, Clone)] pub struct CliArgs { /// run the server #[argh(switch, short = 's')] @@ -54,22 +54,5 @@ pub struct CliArgs { fn main() { let args: CliArgs = argh::from_env(); - if args.exit { - server::kill(); - return; - } - - if args.server { - if server::is_running().unwrap() { - println!("Server is already running"); - } else { - server::run(args).unwrap(); - } - return; - } - - if !args.client_only && !server::is_running().unwrap() { - server::run_in_background(); - } client::run(args).unwrap(); } diff --git a/src/server.rs b/src/server.rs index f42045a..c15cabd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,45 +1,14 @@ -use interprocess::local_socket::{LocalSocketListener, LocalSocketStream}; - -use rmp::{ - os, - protocol::{Message, MessageType}, - server::ServerError, - LogLevel, PlaylistParams, ServerState, -}; -use std::{ - fs, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use rmp::{os, server::ServerError, ServerState, TrackChangeOptions}; +use std::sync::{Arc, Mutex}; use crate::CliArgs; -use self::playback::{playback_manager, Playback}; - pub mod decoder; pub mod playback; -#[derive(Debug, Default)] -pub struct ServerConfig { - pub log_level: LogLevel, -} - -impl ServerConfig { - pub fn from_args(args: CliArgs) -> Self { - Self { - log_level: args - .log_level - .and_then(|level| level.parse().ok()) - .unwrap_or_default(), - } - } -} - #[derive(Default)] pub struct Server { pub state: ServerState, - pub config: ServerConfig, - pub playback: Playback, } pub fn run(args: CliArgs) -> Result<(), ServerError> { @@ -50,24 +19,17 @@ pub fn run(args: CliArgs) -> Result<(), ServerError> { let server = Arc::new(Mutex::new(Server { state: ServerState { - playlist_params: PlaylistParams { + track_change_options: TrackChangeOptions { shuffle: args.shuffle, next: args.next, repeat: args.repeat, }, player: None, }, - config: ServerConfig::from_args(args), ..Default::default() })); - let server_clone = server.clone(); - std::thread::Builder::new() - .name("playback_manager".into()) - .spawn(move || playback_manager(server_clone)) - .unwrap(); - - serve(server) + Ok(()) } pub fn kill() { @@ -94,74 +56,6 @@ pub fn run_in_background() { } } -fn serve(server: Arc<Mutex<Server>>) -> Result<(), ServerError> { - let socket_path = os::server::get_socket_path()?; - if socket_path.exists() { - fs::remove_file(&socket_path).map_err(|err| ServerError::Io(err))?; - } - let socket = LocalSocketListener::bind(socket_path).map_err(|err| ServerError::Io(err))?; - println!("Waiting for connections..."); - let mut session_counter = 0; - for message in socket.incoming() { - match message { - Ok(stream) => { - session_counter += 1; - let thread_builder = - std::thread::Builder::new().name(format!("session_{session_counter}")); - let server = server.clone(); - thread_builder - .spawn(move || session_handler(stream, server)) - .unwrap(); - } - Err(err) => { - let exit_code = handle_error(ServerError::Io(err)); - std::process::exit(exit_code); - } - }; - } - println!("Reached the end of an infinite loop?"); - Ok(()) -} - -fn session_handler(mut stream: LocalSocketStream, server: Arc<Mutex<Server>>) { - let thread = std::thread::current(); - let session_id = thread.name().unwrap_or("<unnamed>"); - println!("[{session_id}] session created"); - loop { - match rmp::protocol::receive(&mut stream) { - Ok(message) => { - println!("[{session_id}] rx {message}"); - match route_request(&message, &mut server.lock().unwrap()) { - Err(err) => { - eprintln!("[{session_id}] rx Error: {err}"); - } - Ok(response) => { - println!("[{session_id}] tx {response}"); - rmp::protocol::send(&mut stream, &response).unwrap(); - } - } - } - Err(error) => match error { - rmp::protocol::MessageError::ReadError => { - println!("[{session_id}] session terminated"); - return; - } - error => { - eprintln!("[{session_id}] rx Error {error:?}"); - let body = bincode::serialize(&error).unwrap(); - // TODO: Server-side error logging so internal error data is not sent to client - let message = Message { - message_type: MessageType::ProtocolError, - body: Some(body), - }; - eprintln!("[{session_id}] tx {message}"); - rmp::protocol::send(&mut stream, &message).unwrap(); - } - }, - } - } -} - fn handle_error(err: ServerError) -> i32 { match &err { ServerError::Other(msg) => { @@ -186,81 +80,3 @@ fn handle_error(err: ServerError) -> i32 { } } } - -fn route_request(request: &Message, server: &mut Server) -> Result<Message, String> { - match request.message_type { - MessageType::StateFetch => {} - MessageType::SetNext => { - let body = match request.body.as_ref() { - Some(body) => body, - None => { - return Ok(Message { - message_type: MessageType::ApplicationError, - body: None, - }) - } - }; - let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?; - server.state.playlist_params.next = new_state; - } - MessageType::SetShuffle => { - let body = match request.body.as_ref() { - Some(body) => body, - None => { - return Ok(Message { - message_type: MessageType::ApplicationError, - body: None, - }) - } - }; - let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?; - server.state.playlist_params.shuffle = new_state; - } - MessageType::SetRepeat => { - let body = match request.body.as_ref() { - Some(body) => body, - None => { - return Ok(Message { - message_type: MessageType::ApplicationError, - body: None, - }) - } - }; - let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?; - server.state.playlist_params.repeat = new_state; - } - MessageType::SetPause => { - let body = match request.body.as_ref() { - Some(body) => body, - None => { - return Ok(Message { - message_type: MessageType::ApplicationError, - body: None, - }) - } - }; - let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?; - server.playback.pause(new_state); - } - MessageType::PlayTrackFromPath => { - let body = match request.body.as_ref() { - Some(body) => body, - None => { - return Ok(Message { - message_type: MessageType::ApplicationError, - body: None, - }) - } - }; - let body: PathBuf = bincode::deserialize(&body).map_err(|err| err.to_string())?; - server.playback.play(body); - } - _ => { - return Ok(Message { - message_type: MessageType::NotImplementedAck, - body: None, - }) - } - } - Message::state_response(&server.state) -} diff --git a/src/server/playback.rs b/src/server/playback.rs index 815db53..38f5c39 100644 --- a/src/server/playback.rs +++ b/src/server/playback.rs @@ -1,108 +1,79 @@ -use std::{ - borrow::BorrowMut, - fs::File, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::{fs::File, path::PathBuf}; -use rmp::PlayerState; -use rodio::{OutputStream, Sink}; +use rodio::{OutputStream, OutputStreamHandle, Sink}; -use super::{ - decoder::{Decoder, DecoderImpl, SourceWrapper}, - Server, -}; - -// HACK: hard-coded path to track -static TRACK: &str = ""; - -enum PlaybackCommand { - PlayPath(PathBuf), - SetPause(bool), -} +use super::decoder::{Decoder, DecoderImpl, SourceWrapper}; pub struct Playback { - command_queue: Vec<PlaybackCommand>, + /// These must not be dropped before the sink + _stream: OutputStream, + _stream_handle: OutputStreamHandle, + sink: Sink, } -impl Default for Playback { - fn default() -> Self { - Self { - command_queue: vec![], - } - } +pub enum PlaybackState { + Empty, + Playing, + Paused, } impl Playback { - pub fn play(&mut self, path: PathBuf) { - self.command_queue - .push(PlaybackCommand::PlayPath(TRACK.into())) // TODO: Remove hardcoding + pub fn new() -> Self { + let (_stream, _stream_handle) = OutputStream::try_default().unwrap(); + let sink: Sink = Sink::try_new(&_stream_handle).unwrap(); + + Self { + _stream, + _stream_handle, + sink, + } } - pub fn pause(&mut self, new_state: bool) { - self.command_queue - .push(PlaybackCommand::SetPause(new_state)) + pub fn get_state(&self) -> PlaybackState { + // TODO: We know if it's playing but not what is playing. Figure it out later + if self.sink.empty() { + return PlaybackState::Empty; + } + match self.sink.is_paused() { + true => PlaybackState::Paused, + false => PlaybackState::Playing, + } } -} -pub fn playback_manager(server: Arc<Mutex<Server>>) { - let (_stream, stream_handle) = OutputStream::try_default().unwrap(); - let sink: Sink = Sink::try_new(&stream_handle).unwrap(); - loop { - let mut server = server.lock().unwrap(); - let commands: Vec<PlaybackCommand> = server.playback.command_queue.drain(..).collect(); - for command in commands { - match command { - PlaybackCommand::PlayPath(track) => { - let mut source: Option<Box<dyn DecoderImpl<Item = i16>>> = None; + /// Clear the queue and start playback immediately. + pub fn play_immediate(&mut self, path: PathBuf) { + let mut source: Option<Box<dyn DecoderImpl<Item = i16>>> = None; - { - let file = File::open(&track).unwrap(); - if let Ok(decoder) = Decoder::rodio(SourceWrapper::from_file(file)) { - println!("playback: rodio\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len()); - source = Some(decoder); - }; - } + { + let file = File::open(&path).unwrap(); + if let Ok(decoder) = Decoder::rodio(SourceWrapper::from_file(file)) { + source = Some(decoder); + }; + } - if source.is_none() { - let file = File::open(&track).unwrap(); - if let Ok(decoder) = Decoder::custom(SourceWrapper::from_file(file)) { - println!("playback: custom\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len()); - source = Some(decoder); - } - } - - match source { - Some(source) => { - server.state.player = Some(PlayerState { - track, - is_paused: false, - duration: None, - current_position: None, - }); - sink.clear(); - sink.append(source); - sink.play(); - } - None => println!("No handler found for '{track:?}'"), - } - } - PlaybackCommand::SetPause(new_state) => { - if !sink.empty() && new_state != sink.is_paused() { - if sink.is_paused() { - sink.play(); - if let Some(player) = server.state.player.borrow_mut() { - player.is_paused = false; - } - } else { - sink.pause(); - if let Some(player) = server.state.player.borrow_mut() { - player.is_paused = true; - } - } - } - } + if source.is_none() { + let file = File::open(&path).unwrap(); + if let Ok(decoder) = Decoder::custom(SourceWrapper::from_file(file)) { + source = Some(decoder); } } + + match source { + Some(source) => { + self.sink.clear(); + self.sink.append(source); + self.sink.play(); + } + None => eprintln!("No handler found for '{path:?}'"), + } + } + + /// Toggle playback pause if possible. + pub fn toggle_pause(&mut self) { + match self.get_state() { + PlaybackState::Empty => (), + PlaybackState::Playing => self.sink.pause(), + PlaybackState::Paused => self.sink.play(), + } } }