Reworked messaging

master
hheik 2024-08-08 02:10:44 +03:00
parent d3eb72ca31
commit 6f851a0d7b
11 changed files with 292 additions and 159 deletions

View File

@ -1,4 +1,5 @@
use std::{
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
@ -7,6 +8,7 @@ use rmp::{
protocol::{Message, MessageType},
ServerState,
};
use serde::Serialize;
pub struct App {
pub options: AppOptions,
@ -35,32 +37,65 @@ impl App {
self.message_queue.lock().unwrap().push(message);
}
fn map_and_serialize<F, T>(&mut self, f: F) -> Option<Vec<u8>>
where
F: FnOnce(&ServerState) -> Option<T>,
T: Serialize,
{
let data = self.state.lock().unwrap().as_ref().and_then(f);
data.map(|data| bincode::serialize(&data).unwrap())
}
pub fn connected(&self) -> bool {
self.state.lock().unwrap().is_some()
}
pub fn toggle_shuffle(&mut self) {
self.push_message(Message::new(MessageType::ToggleSuffle, None));
let body = self.map_and_serialize(|state| Some(!state.playlist_params.shuffle));
self.push_message(Message {
message_type: MessageType::SetShuffle,
body,
})
}
pub fn toggle_next(&mut self) {
self.push_message(Message::new(MessageType::ToggleNext, None));
let body = self.map_and_serialize(|state| Some(!state.playlist_params.next));
self.push_message(Message {
message_type: MessageType::SetNext,
body,
})
}
pub fn toggle_repeat(&mut self) {
self.push_message(Message::new(MessageType::ToggleRepeat, None));
}
pub fn fetch_state(&mut self) {
self.push_message(Message::new(MessageType::StateFetch, None));
}
pub fn play(&mut self) {
self.push_message(Message::new(MessageType::PlayTrack, None));
let body = self.map_and_serialize(|state| Some(!state.playlist_params.repeat));
self.push_message(Message {
message_type: MessageType::SetRepeat,
body,
})
}
pub fn toggle_pause(&mut self) {
self.push_message(Message::new(MessageType::TogglePause, None));
let body =
self.map_and_serialize(|state| state.player.as_ref().map(|player| !player.is_paused));
self.push_message(Message {
message_type: MessageType::SetPause,
body,
});
}
pub fn fetch_state(&mut self) {
self.push_message(Message {
message_type: MessageType::StateFetch,
body: None,
});
}
pub fn play(&mut self, track: PathBuf) {
let body = self.map_and_serialize(|_state| Some(track));
self.push_message(Message {
message_type: MessageType::PlayTrackFromPath,
body,
});
}
pub fn on_key(&mut self, key: char) {
@ -83,14 +118,17 @@ impl App {
pub fn on_down(&mut self) {}
pub fn on_enter(&mut self) {
self.play();
self.play("".into()); // TODO: Remove hardcoding
}
pub fn on_tab(&mut self) {}
pub fn on_tick(&mut self, _duration: Duration) {
if self.options.sync_state {
self.push_message(Message::new(MessageType::StateFetch, None));
self.push_message(Message {
message_type: MessageType::StateFetch,
body: None,
});
}
}
}

View File

@ -21,10 +21,10 @@ pub fn request_queue_cleaner(
if should_connect {
*state.lock().unwrap() = None;
stream = Some(connect());
queue
.lock()
.unwrap()
.push(Message::new(MessageType::StateFetch, None));
queue.lock().unwrap().push(Message {
message_type: MessageType::StateFetch,
body: None,
});
should_connect = false;
}
match stream.as_mut() {
@ -57,7 +57,7 @@ pub fn request_queue_cleaner(
/// Blocks thread until connected to socket
fn connect() -> LocalSocketStream {
let path = rmp::os::get_socket_path().unwrap();
let path = rmp::os::server::get_socket_path().unwrap();
loop {
match LocalSocketStream::connect(path.clone()) {
Ok(stream) => return stream,

View File

@ -89,7 +89,7 @@ fn draw_player<B: Backend>(f: &mut Frame<B>, state: &ServerState, area: Rect) {
),
(
state.playlist_params.shuffle,
vec![Span::from("S").underlined(), Span::from("UFFLE")],
vec![Span::from("S").underlined(), Span::from("HUFFLE")],
),
(
state.playlist_params.repeat,

View File

@ -10,6 +10,15 @@ pub mod protocol;
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct ServerState {
pub playlist_params: PlaylistParams,
pub player: Option<PlayerState>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct PlayerState {
pub track: PlaylistElement,
pub is_paused: bool,
pub duration: Option<f32>,
pub current_position: Option<f32>,
}
pub type PlaylistElement = PathBuf;

View File

@ -1,10 +1,19 @@
pub mod client {
use std::path::PathBuf;
pub fn get_music_dir() -> PathBuf {
PathBuf::from(std::env!("HOME")).join("Music")
}
}
pub mod server {
use std::{
fs,
path::{Path, PathBuf},
process::{id, Command, Stdio},
};
use super::server::ServerError;
use crate::server::ServerError;
pub fn reserve_pid() -> Result<(), ServerError> {
let pid_path = get_pid_path()?;
@ -90,3 +99,4 @@ fn get_runtime_dir() -> Result<PathBuf, ServerError> {
fn get_pid_path() -> Result<PathBuf, ServerError> {
Ok(get_runtime_dir()?.join("rmp.pid"))
}
}

View File

@ -3,7 +3,6 @@ use std::{
io::{Read, Write},
};
use interprocess::local_socket::LocalSocketStream;
use serde::{Deserialize, Serialize};
use crate::ServerState;
@ -26,17 +25,20 @@ pub enum MessageError {
pub enum MessageType {
/// Generic acknowledge
Ack = 0,
/// client/server did not know how to handle the request
/// Recipient did not know how to handle the request.
NotImplementedAck,
/// Request was invalid
/// Request was not a valid message. For example if the checksum did not match.
ProtocolError,
/// Request was a valid message, but it was incorrect. For example the body could be missing if
/// it was expected
ApplicationError,
StateFetch,
StateResponse,
ToggleSuffle,
ToggleNext,
ToggleRepeat,
PlayTrack,
TogglePause,
SetShuffle,
SetNext,
SetRepeat,
SetPause,
PlayTrackFromPath,
}
#[derive(Debug, Serialize, Deserialize)]
@ -46,12 +48,9 @@ pub struct Message {
}
impl Message {
pub fn new(message_type: MessageType, body: Option<&[u8]>) -> Self {
Self {
message_type,
body: body.map(|b| Vec::from(b)),
}
}
// pub fn new(message_type: MessageType, body: Option<Vec<u8>>) -> Self {
// Self { message_type, body }
// }
/// Message format (values are in little-endian):
/// offset | size | explanation
@ -89,12 +88,18 @@ impl Display for Message {
}
}
pub fn send(stream: &mut LocalSocketStream, message: &Message) -> Result<(), std::io::Error> {
pub fn send<T>(stream: &mut T, message: &Message) -> Result<(), std::io::Error>
where
T: Write,
{
stream.write_all(&message.as_bytes())?;
Ok(())
}
pub fn receive(stream: &mut LocalSocketStream) -> Result<Message, MessageError> {
pub fn receive<T>(stream: &mut T) -> Result<Message, MessageError>
where
T: Read,
{
let mut magic_buffer = vec![0; HEADER_MAGIC.len()];
if let Err(_) = stream.read_exact(&mut magic_buffer) {
return Err(MessageError::ReadError);

View File

@ -8,6 +8,7 @@ use rmp::{
};
use std::{
fs,
path::PathBuf,
sync::{Arc, Mutex},
};
@ -15,7 +16,6 @@ use crate::CliArgs;
use self::playback::{playback_manager, Playback};
pub mod audio_backend;
pub mod decoder;
pub mod playback;
@ -34,7 +34,7 @@ impl Server {
}
pub fn run(args: CliArgs) -> Result<(), ServerError> {
if let Err(err) = os::reserve_pid() {
if let Err(err) = os::server::reserve_pid() {
let exit_code = handle_error(err);
std::process::exit(exit_code);
}
@ -45,6 +45,7 @@ pub fn run(args: CliArgs) -> Result<(), ServerError> {
next: args.next,
repeat: args.repeat,
},
player: None,
})));
let server_clone = server.clone();
@ -57,14 +58,14 @@ pub fn run(args: CliArgs) -> Result<(), ServerError> {
}
pub fn kill() {
if let Err(err) = os::kill() {
if let Err(err) = os::server::kill() {
let exit_code = handle_error(err);
std::process::exit(exit_code);
}
}
pub fn is_running() -> Result<bool, ServerError> {
match os::is_running() {
match os::server::is_running() {
Ok(is_running) => Ok(is_running),
Err(err) => {
let exit_code = handle_error(err);
@ -74,14 +75,14 @@ pub fn is_running() -> Result<bool, ServerError> {
}
pub fn run_in_background() {
if let Err(err) = os::run_in_background() {
if let Err(err) = os::server::run_in_background() {
let exit_code = handle_error(err);
std::process::exit(exit_code);
}
}
fn serve(server: Arc<Mutex<Server>>) -> Result<(), ServerError> {
let socket_path = os::get_socket_path()?;
let socket_path = os::server::get_socket_path()?;
if socket_path.exists() {
fs::remove_file(&socket_path).map_err(|err| ServerError::Io(err))?;
}
@ -135,7 +136,11 @@ fn session_handler(mut stream: LocalSocketStream, server: Arc<Mutex<Server>>) {
error => {
eprintln!("[{session_id}] rx Error {error:?}");
let body = bincode::serialize(&error).unwrap();
let message = Message::new(MessageType::ProtocolError, Some(&body));
// TODO: Server-side error logging so internal error data is not sent to client
let message = Message {
message_type: MessageType::ProtocolError,
body: Some(body),
};
eprintln!("[{session_id}] tx {message}");
rmp::protocol::send(&mut stream, &message).unwrap();
}
@ -172,22 +177,77 @@ fn handle_error(err: ServerError) -> i32 {
fn route_request(request: &Message, server: &mut Server) -> Result<Message, String> {
match request.message_type {
MessageType::StateFetch => {}
MessageType::ToggleNext => {
server.state.playlist_params.next = !server.state.playlist_params.next;
MessageType::SetNext => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
MessageType::ToggleSuffle => {
server.state.playlist_params.shuffle = !server.state.playlist_params.shuffle;
};
let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.state.playlist_params.next = new_state;
}
MessageType::ToggleRepeat => {
server.state.playlist_params.repeat = !server.state.playlist_params.repeat;
MessageType::SetShuffle => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
MessageType::PlayTrack => {
server.playback.play_test();
};
let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.state.playlist_params.shuffle = new_state;
}
MessageType::TogglePause => {
server.playback.pause_test();
MessageType::SetRepeat => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
};
let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.state.playlist_params.repeat = new_state;
}
MessageType::SetPause => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
};
let new_state: bool = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.playback.pause(new_state);
}
MessageType::PlayTrackFromPath => {
let body = match request.body.as_ref() {
Some(body) => body,
None => {
return Ok(Message {
message_type: MessageType::ApplicationError,
body: None,
})
}
};
let body: PathBuf = bincode::deserialize(&body).map_err(|err| err.to_string())?;
server.playback.play(body);
}
_ => {
return Ok(Message {
message_type: MessageType::NotImplementedAck,
body: None,
})
}
_ => return Ok(Message::new(MessageType::NotImplementedAck, None)),
}
Message::state_response(&server.state)
}

View File

@ -1,7 +0,0 @@
use std::{path::Path, time::Duration};
pub trait AudioBackend {
fn seek_and_play(&mut self, file_path: &Path, seek: Duration) -> ();
fn pause_playback(&mut self) -> ();
fn continue_playback(&mut self) -> ();
}

View File

@ -1,6 +1,6 @@
// Implement decoders missing from rodio.
// At the moment this means:
// - Opus (Ogg, WebM)
// - Opus (WebM)
// - Mod
// - XM

View File

@ -90,7 +90,7 @@ impl Iterator for OpusDecoder {
}
let item = self.decoded_buffer.get(self.decoded_cursor).cloned();
self.decoded_cursor += 1;
item
item.map(|sample| sample)
}
}

View File

@ -1,9 +1,11 @@
use std::{
borrow::BorrowMut,
fs::File,
path::PathBuf,
sync::{Arc, Mutex},
};
use rmp::PlayerState;
use rodio::{OutputStream, Sink};
use super::{
@ -15,8 +17,8 @@ use super::{
static TRACK: &str = "";
enum PlaybackCommand {
Play(PathBuf),
TogglePause,
PlayPath(PathBuf),
SetPause(bool),
}
pub struct Playback {
@ -32,12 +34,14 @@ impl Default for Playback {
}
impl Playback {
pub fn play_test(&mut self) {
self.command_queue.push(PlaybackCommand::Play(TRACK.into()))
pub fn play(&mut self, path: PathBuf) {
self.command_queue
.push(PlaybackCommand::PlayPath(TRACK.into())) // TODO: Remove hardcoding
}
pub fn pause_test(&mut self) {
self.command_queue.push(PlaybackCommand::TogglePause)
pub fn pause(&mut self, new_state: bool) {
self.command_queue
.push(PlaybackCommand::SetPause(new_state))
}
}
@ -45,15 +49,17 @@ pub fn playback_manager(server: Arc<Mutex<Server>>) {
let (_stream, stream_handle) = OutputStream::try_default().unwrap();
let sink: Sink = Sink::try_new(&stream_handle).unwrap();
loop {
for command in server.lock().unwrap().playback.command_queue.drain(..) {
let mut server = server.lock().unwrap();
let commands: Vec<PlaybackCommand> = server.playback.command_queue.drain(..).collect();
for command in commands {
match command {
PlaybackCommand::Play(track) => {
PlaybackCommand::PlayPath(track) => {
let mut source: Option<Box<dyn DecoderImpl<Item = i16>>> = None;
{
let file = File::open(&track).unwrap();
if let Ok(decoder) = Decoder::rodio(SourceWrapper::from_file(file)) {
println!("rodio:\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len());
println!("playback: rodio\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len());
source = Some(decoder);
};
}
@ -61,13 +67,19 @@ pub fn playback_manager(server: Arc<Mutex<Server>>) {
if source.is_none() {
let file = File::open(&track).unwrap();
if let Ok(decoder) = Decoder::custom(SourceWrapper::from_file(file)) {
println!("custom:\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len());
println!("playback: custom\n\tsample_rate: {:?}\n\ttotal_duration: {:?}\n\tchannels: {:?}\n\tcurrent_frame_len: {:?}", decoder.sample_rate(), decoder.total_duration(), decoder.channels(), decoder.current_frame_len());
source = Some(decoder);
}
}
match source {
Some(source) => {
server.state.player = Some(PlayerState {
track,
is_paused: false,
duration: None,
current_position: None,
});
sink.clear();
sink.append(source);
sink.play();
@ -75,12 +87,18 @@ pub fn playback_manager(server: Arc<Mutex<Server>>) {
None => println!("No handler found for '{track:?}'"),
}
}
PlaybackCommand::TogglePause => {
if !sink.empty() {
PlaybackCommand::SetPause(new_state) => {
if !sink.empty() && new_state != sink.is_paused() {
if sink.is_paused() {
sink.play()
sink.play();
if let Some(player) = server.state.player.borrow_mut() {
player.is_paused = false;
}
} else {
sink.pause()
sink.pause();
if let Some(player) = server.state.player.borrow_mut() {
player.is_paused = true;
}
}
}
}