Compare commits

...

2 Commits

Author SHA1 Message Date
hheik d8eaea70ca First working monolith version with audio playback 2024-08-22 01:02:36 +03:00
hheik 160c44e6ef Added (unimplemented) log level option 2024-08-19 15:40:58 +03:00
8 changed files with 142 additions and 436 deletions

View File

@ -1,15 +1,8 @@
use std::{
error::Error,
sync::{Arc, Mutex},
time::Duration,
};
use std::{error::Error, time::Duration};
use crate::CliArgs;
use self::{
app::{App, AppOptions},
request_queue::request_queue_cleaner,
};
use self::app::{App, AppOptions};
pub mod app;
pub mod crossterm;
@ -17,29 +10,11 @@ pub mod request_queue;
pub mod ui;
pub fn run(args: CliArgs) -> Result<(), Box<dyn Error>> {
let message_queue = Arc::new(Mutex::new(vec![]));
let server_state = Arc::new(Mutex::new(None));
let options = AppOptions {
title: "rmp - Rust Music Player".into(),
enhanced_graphics: args.enhanced_graphics,
sync_state: args.sync_state,
};
let app = App {
options,
should_quit: false,
state: server_state.clone(),
message_queue: message_queue.clone(),
};
let thread_builder = std::thread::Builder::new().name("request_queue".into());
thread_builder
.spawn(move || {
request_queue_cleaner(
Duration::from_millis(args.message_rate),
message_queue.clone(),
server_state.clone(),
)
})
.unwrap();
let app = App::new(options);
crossterm::run(app, Duration::from_millis(args.tick_rate))?;
Ok(())
}

View File

@ -1,25 +1,18 @@
use std::{
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
use std::{path::PathBuf, time::Duration};
use rmp::{
protocol::{Message, MessageType},
ServerState,
};
use serde::Serialize;
use crate::server::playback::Playback;
use rmp::TrackChangeOptions;
pub struct App {
pub options: AppOptions,
pub should_quit: bool,
pub message_queue: Arc<Mutex<Vec<Message>>>,
pub state: Arc<Mutex<Option<ServerState>>>,
pub playback: Playback,
pub track_change_options: TrackChangeOptions,
}
pub struct AppOptions {
pub title: String,
pub sync_state: bool,
pub enhanced_graphics: bool,
}
@ -28,74 +21,29 @@ impl App {
Self {
options,
should_quit: false,
message_queue: Arc::new(Mutex::new(vec![])),
state: Arc::new(Mutex::new(None)),
playback: Playback::new(),
track_change_options: Default::default(),
}
}
fn push_message(&mut self, message: Message) {
self.message_queue.lock().unwrap().push(message);
}
fn map_and_serialize<F, T>(&mut self, f: F) -> Option<Vec<u8>>
where
F: FnOnce(&ServerState) -> Option<T>,
T: Serialize,
{
let data = self.state.lock().unwrap().as_ref().and_then(f);
data.map(|data| bincode::serialize(&data).unwrap())
}
pub fn connected(&self) -> bool {
self.state.lock().unwrap().is_some()
}
pub fn toggle_shuffle(&mut self) {
let body = self.map_and_serialize(|state| Some(!state.playlist_params.shuffle));
self.push_message(Message {
message_type: MessageType::SetShuffle,
body,
})
self.track_change_options.shuffle = !self.track_change_options.shuffle;
}
pub fn toggle_next(&mut self) {
let body = self.map_and_serialize(|state| Some(!state.playlist_params.next));
self.push_message(Message {
message_type: MessageType::SetNext,
body,
})
self.track_change_options.next = !self.track_change_options.next;
}
pub fn toggle_repeat(&mut self) {
let body = self.map_and_serialize(|state| Some(!state.playlist_params.repeat));
self.push_message(Message {
message_type: MessageType::SetRepeat,
body,
})
self.track_change_options.repeat = !self.track_change_options.repeat;
}
pub fn toggle_pause(&mut self) {
let body =
self.map_and_serialize(|state| state.player.as_ref().map(|player| !player.is_paused));
self.push_message(Message {
message_type: MessageType::SetPause,
body,
});
}
pub fn fetch_state(&mut self) {
self.push_message(Message {
message_type: MessageType::StateFetch,
body: None,
});
self.playback.toggle_pause();
}
pub fn play(&mut self, track: PathBuf) {
let body = self.map_and_serialize(|_state| Some(track));
self.push_message(Message {
message_type: MessageType::PlayTrackFromPath,
body,
});
self.playback.play_immediate(track);
}
pub fn on_key(&mut self, key: char) {
@ -123,12 +71,5 @@ impl App {
pub fn on_tab(&mut self) {}
pub fn on_tick(&mut self, _duration: Duration) {
if self.options.sync_state {
self.push_message(Message {
message_type: MessageType::StateFetch,
body: None,
});
}
}
pub fn on_tick(&mut self, _duration: Duration) {}
}

View File

@ -33,7 +33,7 @@ pub fn run(app: App, tick_rate: Duration) -> Result<(), Box<dyn Error>> {
terminal.show_cursor()?;
if let Err(err) = res {
println!("{err:?}");
eprintln!("{err:?}");
}
Ok(())
@ -54,7 +54,6 @@ fn run_app<B: Backend>(
if crossterm::event::poll(timeout)? {
if let Event::Key(key) = event::read()? {
if key.kind == KeyEventKind::Press {
if app.connected() {
match key.code {
KeyCode::Char(c) => app.on_key(c),
KeyCode::Left => app.on_left(),
@ -65,17 +64,6 @@ fn run_app<B: Backend>(
KeyCode::Tab => app.on_tab(),
_ => {}
}
} else {
// Allow quitting while in "Not connected" screen
match key.code {
KeyCode::Char(c) => {
if c == 'q' {
app.should_quit = true;
}
}
_ => (),
}
}
}
}
}

View File

@ -2,20 +2,15 @@ use ratatui::{
prelude::*,
widgets::{block::Title, *},
};
use rmp::ServerState;
use super::app::App;
pub fn draw<B: Backend>(f: &mut Frame<B>, app: &mut App) {
if let Some(state) = app.state.lock().unwrap().as_ref() {
let chunks = Layout::default()
.constraints([Constraint::Min(3), Constraint::Length(2)].as_ref())
.split(f.size());
draw_playlist(f, state, chunks[0]);
draw_player(f, state, chunks[1]);
} else {
draw_no_connection(f);
}
draw_playlist(f, app, chunks[0]);
draw_player(f, app, chunks[1]);
}
static PRIMARY_COLOR: Color = Color::Rgb(200, 150, 70);
@ -47,7 +42,7 @@ fn draw_no_connection<B: Backend>(f: &mut Frame<B>) {
);
}
fn draw_playlist<B: Backend>(f: &mut Frame<B>, _state: &ServerState, area: Rect) {
fn draw_playlist<B: Backend>(f: &mut Frame<B>, _app: &App, area: Rect) {
let playlist = List::new(vec![])
.block(
Block::default()
@ -58,11 +53,11 @@ fn draw_playlist<B: Backend>(f: &mut Frame<B>, _state: &ServerState, area: Rect)
f.render_widget(playlist, area);
}
fn draw_player<B: Backend>(f: &mut Frame<B>, state: &ServerState, area: Rect) {
fn draw_player<B: Backend>(f: &mut Frame<B>, app: &App, area: Rect) {
fn decorate_bool(span: Span, value: bool) -> Span {
match value {
true => return span.bg(PRIMARY_COLOR).fg(PRIMARY_CONTRAST),
false => return span.fg(CLEAR_CONTRAST),
true => span.bg(PRIMARY_COLOR).fg(PRIMARY_CONTRAST),
false => span.fg(CLEAR_CONTRAST),
}
}
@ -80,7 +75,7 @@ fn draw_player<B: Backend>(f: &mut Frame<B>, state: &ServerState, area: Rect) {
// Horrible to look at, worse to write
let param_titles: Vec<Title> = vec![
(
state.playlist_params.next,
app.track_change_options.next,
vec![
Span::from("NE"),
Span::from("X").underlined(),
@ -88,11 +83,11 @@ fn draw_player<B: Backend>(f: &mut Frame<B>, state: &ServerState, area: Rect) {
],
),
(
state.playlist_params.shuffle,
app.track_change_options.shuffle,
vec![Span::from("S").underlined(), Span::from("HUFFLE")],
),
(
state.playlist_params.repeat,
app.track_change_options.repeat,
vec![Span::from("R").underlined(), Span::from("EPEAT")],
),
]

View File

@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{path::PathBuf, str::FromStr};
use serde::{Deserialize, Serialize};
@ -9,7 +9,7 @@ pub mod protocol;
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct ServerState {
pub playlist_params: PlaylistParams,
pub track_change_options: TrackChangeOptions,
pub player: Option<PlayerState>,
}
@ -48,13 +48,13 @@ pub struct QueuePlaylist {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct PlaylistParams {
pub struct TrackChangeOptions {
pub shuffle: bool,
pub next: bool,
pub repeat: bool,
}
impl Default for PlaylistParams {
impl Default for TrackChangeOptions {
fn default() -> Self {
Self {
shuffle: false,
@ -64,6 +64,26 @@ impl Default for PlaylistParams {
}
}
#[derive(Clone, Copy, Debug, Default)]
pub enum LogLevel {
Quiet = 0,
Error = 1,
#[default]
All = 2,
}
impl FromStr for LogLevel {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"quiet" => Ok(Self::Quiet),
"error" => Ok(Self::Error),
"all" => Ok(Self::All),
_ => Err(()),
}
}
}
pub mod server {
use std::{fmt::Debug, path::PathBuf};

View File

@ -4,7 +4,7 @@ pub mod client;
pub mod server;
/// rmp: Rust Music Player
#[derive(Debug, FromArgs)]
#[derive(Debug, FromArgs, Clone)]
pub struct CliArgs {
/// run the server
#[argh(switch, short = 's')]
@ -45,27 +45,14 @@ pub struct CliArgs {
/// whether unicode symbols are used to improve the overall look of the app
#[argh(option, default = "true")]
enhanced_graphics: bool,
/// server-side log level (quiet, error, all)
#[argh(option)]
log_level: Option<String>,
}
fn main() {
let args: CliArgs = argh::from_env();
if args.exit {
server::kill();
return;
}
if args.server {
if server::is_running().unwrap() {
println!("Server is already running");
} else {
server::run(args).unwrap();
}
return;
}
if !args.client_only && !server::is_running().unwrap() {
server::run_in_background();
}
client::run(args).unwrap();
}

View File

@ -1,36 +1,14 @@
use interprocess::local_socket::{LocalSocketListener, LocalSocketStream};
use rmp::{
os,
protocol::{Message, MessageType},
server::ServerError,
PlaylistParams, ServerState,
};
use std::{
fs,
path::PathBuf,
sync::{Arc, Mutex},
};
use rmp::{os, server::ServerError, ServerState, TrackChangeOptions};
use std::sync::{Arc, Mutex};
use crate::CliArgs;
use self::playback::{playback_manager, Playback};
pub mod decoder;
pub mod playback;
#[derive(Default)]
pub struct Server {
pub state: ServerState,
pub playback: Playback,
}
impl Server {
pub fn from_state(state: ServerState) -> Self {
Self {
state,
playback: Default::default(),
}
}
}
pub fn run(args: CliArgs) -> Result<(), ServerError> {
@ -39,22 +17,19 @@ pub fn run(args: CliArgs) -> Result<(), ServerError> {
std::process::exit(exit_code);
}
let server = Arc::new(Mutex::new(Server::from_state(ServerState {
playlist_params: PlaylistParams {
let server = Arc::new(Mutex::new(Server {
state: ServerState {
track_change_options: TrackChangeOptions {
shuffle: args.shuffle,
next: args.next,
repeat: args.repeat,
},
player: None,
})));
},
..Default::default()
}));
let server_clone = server.clone();
std::thread::Builder::new()
.name("playback_manager".into())
.spawn(move || playback_manager(server_clone))
.unwrap();
serve(server)
Ok(())
}
pub fn kill() {
@ -81,74 +56,6 @@ pub fn run_in_background() {
}
}
fn serve(server: Arc<Mutex<Server>>) -> Result<(), ServerError> {
let socket_path = os::server::get_socket_path()?;
if socket_path.exists() {
fs::remove_file(&socket_path).map_err(|err| ServerError::Io(err))?;
}
let socket = LocalSocketListener::bind(socket_path).map_err(|err| ServerError::Io(err))?;
println!("Waiting for connections...");
let mut session_counter = 0;
for message in socket.incoming() {
match message {
Ok(stream) => {
session_counter += 1;
let thread_builder =
std::thread::Builder::new().name(format!("session_{session_counter}"));
let server = server.clone();
thread_builder
.spawn(move || session_handler(stream, server))
.unwrap();
}
Err(err) => {
let exit_code = handle_error(ServerError::Io(err));
std::process::exit(exit_code);
}
};
}
println!("Reached the end of an infinite loop?");
Ok(())
}
fn session_handler(mut stream: LocalSocketStream, server: Arc<Mutex<Server>>) {
let thread = std::thread::current();
let session_id = thread.name().unwrap_or("<unnamed>");
println!("[{session_id}] session created");
loop {
match rmp::protocol::receive(&mut stream) {
Ok(message) => {
println!("[{session_id}] rx {message}");
match route_request(&message, &mut server.lock().unwrap()) {
Err(err) => {
eprintln!("[{session_id}] rx Error: {err}");
}
Ok(response) => {
println!("[{session_id}] tx {response}");
rmp::protocol::send(&mut stream, &response).unwrap();
}
}
}
Err(error) => match error {
rmp::protocol::MessageError::ReadError => {
println!("[{session_id}] session terminated");
return;
}
error => {
eprintln!("[{session_id}] rx Error {error:?}");
let body = bincode::serialize(&error).unwrap();
// TODO: Server-side error logging so internal error data is not sent to client
let message = Message {
message_type: MessageType::ProtocolError,
body: Some(body),
};
eprintln!("[{session_id}] tx {message}");
rmp::protocol::send(&mut stream, &message).unwrap();
}
},
}
}
}
fn handle_error(err: ServerError) -> i32 {
match &err {
ServerError::Other(msg) => {
@ -173,81 +80,3 @@ fn handle_error(err: ServerError) -> i32 {
}
}
}
fn route_request(request: &Message, server: &mut Server) -> Result<Message, String> {
match request.message_type {
MessageType::StateFetch => {}
MessageType::SetNext => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
};
let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.state.playlist_params.next = new_state;
}
MessageType::SetShuffle => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
};
let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.state.playlist_params.shuffle = new_state;
}
MessageType::SetRepeat => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
};
let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.state.playlist_params.repeat = new_state;
}
MessageType::SetPause => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
};
let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.playback.pause(new_state);
}
MessageType::PlayTrackFromPath => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
};
let body: PathBuf = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.playback.play(body);
}
_ => {
return Ok(Message {
message_type: MessageType::NotImplementedAck,
body: None,
})
}
}
Message::state_response(&server.state)
}

View File

@ -1,108 +1,79 @@
use std::{
borrow::BorrowMut,
fs::File,
path::PathBuf,
sync::{Arc, Mutex},
};
use std::{fs::File, path::PathBuf};
use rmp::PlayerState;
use rodio::{OutputStream, Sink};
use rodio::{OutputStream, OutputStreamHandle, Sink};
use super::{
decoder::{Decoder, DecoderImpl, SourceWrapper},
Server,
};
// HACK: hard-coded path to track
static TRACK: &str = "";
enum PlaybackCommand {
PlayPath(PathBuf),
SetPause(bool),
}
use super::decoder::{Decoder, DecoderImpl, SourceWrapper};
pub struct Playback {
command_queue: Vec<PlaybackCommand>,
/// These must not be dropped before the sink
_stream: OutputStream,
_stream_handle: OutputStreamHandle,
sink: Sink,
}
impl Default for Playback {
fn default() -> Self {
Self {
command_queue: vec![],
}
}
pub enum PlaybackState {
Empty,
Playing,
Paused,
}
impl Playback {
pub fn play(&mut self, path: PathBuf) {
self.command_queue
.push(PlaybackCommand::PlayPath(TRACK.into())) // TODO: Remove hardcoding
pub fn new() -> Self {
let (_stream, _stream_handle) = OutputStream::try_default().unwrap();
let sink: Sink = Sink::try_new(&_stream_handle).unwrap();
Self {
_stream,
_stream_handle,
sink,
}
}
pub fn pause(&mut self, new_state: bool) {
self.command_queue
.push(PlaybackCommand::SetPause(new_state))
pub fn get_state(&self) -> PlaybackState {
// TODO: We know if it's playing but not what is playing. Figure it out later
if self.sink.empty() {
return PlaybackState::Empty;
}
match self.sink.is_paused() {
true => PlaybackState::Paused,
false => PlaybackState::Playing,
}
}
}
pub fn playback_manager(server: Arc<Mutex<Server>>) {
let (_stream, stream_handle) = OutputStream::try_default().unwrap();
let sink: Sink = Sink::try_new(&stream_handle).unwrap();
loop {
let mut server = server.lock().unwrap();
let commands: Vec<PlaybackCommand> = server.playback.command_queue.drain(..).collect();
for command in commands {
match command {
PlaybackCommand::PlayPath(track) => {
/// Clear the queue and start playback immediately.
pub fn play_immediate(&mut self, path: PathBuf) {
let mut source: Option<Box<dyn DecoderImpl<Item = i16>>> = None;
{
let file = File::open(&track).unwrap();
let file = File::open(&path).unwrap();
if let Ok(decoder) = Decoder::rodio(SourceWrapper::from_file(file)) {
println!("playback: rodio\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len());
source = Some(decoder);
};
}
if source.is_none() {
let file = File::open(&track).unwrap();
let file = File::open(&path).unwrap();
if let Ok(decoder) = Decoder::custom(SourceWrapper::from_file(file)) {
println!("playback: custom\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len());
source = Some(decoder);
}
}
match source {
Some(source) => {
server.state.player = Some(PlayerState {
track,
is_paused: false,
duration: None,
current_position: None,
});
sink.clear();
sink.append(source);
sink.play();
}
None => println!("No handler found for '{track:?}'"),
}
}
PlaybackCommand::SetPause(new_state) => {
if !sink.empty() && new_state != sink.is_paused() {
if sink.is_paused() {
sink.play();
if let Some(player) = server.state.player.borrow_mut() {
player.is_paused = false;
}
} else {
sink.pause();
if let Some(player) = server.state.player.borrow_mut() {
player.is_paused = true;
}
}
self.sink.clear();
self.sink.append(source);
self.sink.play();
}
None => eprintln!("No handler found for '{path:?}'"),
}
}
/// Toggle playback pause if possible.
pub fn toggle_pause(&mut self) {
match self.get_state() {
PlaybackState::Empty => (),
PlaybackState::Playing => self.sink.pause(),
PlaybackState::Paused => self.sink.play(),
}
}
}