Compare commits

...

4 Commits

Author SHA1 Message Date
hheik 484d0a0bf2 Implemented client/server protocol 2024-02-21 19:26:26 +02:00
hheik fdb279d431 Reworking application structure 2024-02-19 23:48:17 +02:00
hheik 3dc5b89ee5 Added automatically starting server with local socket communication 2023-09-29 22:51:23 +03:00
hheik 0c3abd32a3 Started implementing server-client split 2023-09-28 15:56:47 +03:00
13 changed files with 771 additions and 323 deletions

View File

@ -7,5 +7,9 @@ edition = "2021"
[dependencies] [dependencies]
argh = "0.1.12" argh = "0.1.12"
bincode = "1.3.3"
crc32fast = "1.3.2"
crossterm = "0.27.0" crossterm = "0.27.0"
interprocess = "1.2.1"
ratatui = "0.23.0" ratatui = "0.23.0"
serde = "1.0.196"

View File

@ -1,3 +0,0 @@
fn main() {
rmp::client::run().unwrap();
}

View File

@ -1,24 +1,38 @@
use argh::FromArgs; use std::{
use std::{error::Error, time::Duration}; error::Error,
sync::{Arc, Mutex},
time::Duration,
};
use crate::CliArgs;
use self::{app::App, request_queue::request_queue_cleaner};
pub mod app; pub mod app;
pub mod crossterm; pub mod crossterm;
pub mod request_queue;
pub mod ui; pub mod ui;
/// Demo pub fn run(args: CliArgs) -> Result<(), Box<dyn Error>> {
#[derive(Debug, FromArgs)] let message_queue = Arc::new(Mutex::new(vec![]));
struct Cli { let server_state = Arc::new(Mutex::new(None));
/// time in ms between two ticks. let app = App {
#[argh(option, default = "250")] title: "rmp - Rust Music Player".into(),
tick_rate: u64, enhanced_graphics: args.enhanced_graphics,
/// whether unicode symbols are used to improve the overall look of the app should_quit: false,
#[argh(option, default = "true")] state: server_state.clone(),
enhanced_graphics: bool, message_queue: message_queue.clone(),
} };
let thread_builder = std::thread::Builder::new().name("request_queue".into());
pub fn run() -> Result<(), Box<dyn Error>> { thread_builder
let cli: Cli = argh::from_env(); .spawn(move || {
let tick_rate = Duration::from_millis(cli.tick_rate); request_queue_cleaner(
crossterm::run(tick_rate, cli.enhanced_graphics)?; Duration::from_millis(args.message_rate),
message_queue.clone(),
server_state.clone(),
)
})
.unwrap();
crossterm::run(app, Duration::from_millis(args.tick_rate))?;
Ok(()) Ok(())
} }

View File

@ -1,226 +1,62 @@
use std::{ops::Deref, time::Duration}; use std::{
sync::{Arc, Mutex},
time::Duration,
};
const FILE_PATHS: [&str; 0] = []; use rmp::{
protocol::{Message, MessageType},
#[derive(Debug, Default, Clone, Eq, PartialEq, Hash)] ServerState,
pub struct ListState { };
offset: usize,
selected: Option<usize>,
}
impl ListState {
pub fn offset(&self) -> usize {
self.offset
}
pub fn offset_mut(&mut self) -> &mut usize {
&mut self.offset
}
pub fn with_selected(mut self, selected: Option<usize>) -> Self {
self.selected = selected;
self
}
pub fn with_offset(mut self, offset: usize) -> Self {
self.offset = offset;
self
}
pub fn selected(&self) -> Option<usize> {
self.selected
}
pub fn select(&mut self, index: Option<usize>) {
self.selected = index;
if index.is_none() {
self.offset = 0;
}
}
}
pub struct StatefulList<T> {
pub state: ListState,
pub items: Vec<T>,
}
impl<T> StatefulList<T>
where
T: Deref + Eq,
{
pub fn with_items(items: Vec<T>) -> StatefulList<T> {
StatefulList {
state: ListState::default(),
items,
}
}
pub fn next(&mut self) {
let i = match self.state.selected() {
Some(i) => {
if i >= self.items.len() - 1 {
0
} else {
i + 1
}
}
None => 0,
};
self.state.select(Some(i));
}
pub fn previous(&mut self) {
let i = match self.state.selected() {
Some(i) => {
if i == 0 {
self.items.len() - 1
} else {
i - 1
}
}
None => 0,
};
self.state.select(Some(i));
}
pub fn index_of(&self, item: T) -> Option<usize> {
self.items.iter().position(|element| *element == item)
}
pub fn current(&self) -> Option<&T> {
self.state
.selected
.map_or(None, |index| self.items.get(index))
}
pub fn current_mut(&mut self) -> Option<&mut T> {
self.state
.selected
.map_or(None, |index| self.items.get_mut(index))
}
}
// TODO: Implement this
#[derive(Default, Clone)]
pub struct TrackMetadata {
pub duration: Option<Duration>,
}
pub struct PlayerState {
pub currently_playing: Option<String>,
pub is_playing: bool,
pub time_ratio: f64,
}
impl Default for PlayerState {
fn default() -> Self {
Self {
currently_playing: None,
is_playing: false,
time_ratio: 0.0,
}
}
}
impl PlayerState {
pub fn play(&mut self, path: &str) {
self.currently_playing = Some(path.to_string());
self.is_playing = true;
self.time_ratio = 0.0;
// TODO: Handle audio
}
pub fn toggle_pause(&mut self) {
self.is_playing = !self.is_playing;
}
}
pub struct App { pub struct App {
pub title: String, pub title: String,
pub playlist: StatefulList<String>,
pub player_state: PlayerState,
pub should_quit: bool, pub should_quit: bool,
pub enhanced_graphics: bool, pub enhanced_graphics: bool,
pub message_queue: Arc<Mutex<Vec<Message>>>,
pub state: Arc<Mutex<Option<ServerState>>>,
} }
impl App { impl App {
pub fn new(title: &str, enhanced_graphics: bool) -> Self { pub fn new(title: &str, enhanced_graphics: bool) -> Self {
let mut playlist = StatefulList::<String> {
state: ListState::default(),
items: FILE_PATHS.iter().map(|path| path.to_string()).collect(),
};
playlist.state.selected = if playlist.items.len() > 0 {
Some(0)
} else {
None
};
Self { Self {
title: title.to_string(), title: title.to_string(),
playlist,
player_state: PlayerState::default(),
should_quit: false, should_quit: false,
enhanced_graphics, enhanced_graphics,
message_queue: Arc::new(Mutex::new(vec![])),
state: Arc::new(Mutex::new(None)),
} }
} }
pub fn play_next(&mut self) { fn push_message(&mut self, message: Message) {
let current = if let Some(currently_playing) = self.player_state.currently_playing.clone() { self.message_queue.lock().unwrap().push(message);
if let Some(current_index) = self
.playlist
.items
.iter()
.position(|path| *path == currently_playing)
{
current_index
} else {
return;
}
} else {
return;
};
let track = if current >= self.playlist.items.len() - 1 {
self.playlist.items[0].as_str()
} else {
self.playlist.items[current + 1].as_str()
};
self.player_state.play(track);
} }
pub fn play_previous(&mut self) { pub fn connected(&self) -> bool {
let current = if let Some(currently_playing) = self.player_state.currently_playing.clone() { self.state.lock().unwrap().is_some()
if let Some(current_index) = self
.playlist
.items
.iter()
.position(|path| *path == currently_playing)
{
current_index
} else {
return;
}
} else {
return;
};
let track = if current <= 0 {
self.playlist.items[self.playlist.items.len() - 1].as_str()
} else {
self.playlist.items[current - 1].as_str()
};
self.player_state.play(track);
} }
pub fn replay_current(&mut self) { pub fn toggle_shuffle(&mut self) {
if let Some(current) = self.playlist.current() { self.push_message(Message::new(MessageType::ToggleSuffle, None));
self.player_state.play(current) }
}
pub fn toggle_next(&mut self) {
self.push_message(Message::new(MessageType::ToggleNext, None));
}
pub fn toggle_repeat(&mut self) {
self.push_message(Message::new(MessageType::ToggleRepeat, None));
}
pub fn fetch_state(&mut self) {
self.push_message(Message::new(MessageType::StateFetch, None));
} }
pub fn on_key(&mut self, key: char) { pub fn on_key(&mut self, key: char) {
match key { match key {
'b' => self.play_previous(), 'S' => self.toggle_shuffle(),
'n' => self.play_next(), 'X' => self.toggle_next(),
'R' => self.toggle_repeat(),
'q' => self.should_quit = true, 'q' => self.should_quit = true,
' ' => self.player_state.toggle_pause(),
_ => (), _ => (),
} }
} }
@ -229,33 +65,13 @@ impl App {
pub fn on_right(&mut self) {} pub fn on_right(&mut self) {}
pub fn on_up(&mut self) { pub fn on_up(&mut self) {}
if self.playlist.items.len() > 0 {
self.playlist.previous()
}
}
pub fn on_down(&mut self) { pub fn on_down(&mut self) {}
if self.playlist.items.len() > 0 {
self.playlist.next()
}
}
pub fn on_enter(&mut self) { pub fn on_enter(&mut self) {}
if let Some(path) = self.playlist.current() {
self.player_state.play(path)
}
}
pub fn on_tab(&mut self) {} pub fn on_tab(&mut self) {}
pub fn on_tick(&mut self, duration: Duration) { pub fn on_tick(&mut self, duration: Duration) {}
let time_mod = if self.player_state.is_playing {
0.25
} else {
0.0
};
self.player_state.time_ratio =
(self.player_state.time_ratio + duration.as_secs_f64() * time_mod) % 1.0
}
} }

View File

@ -13,7 +13,7 @@ use ratatui::prelude::*;
use super::{app::App, ui}; use super::{app::App, ui};
pub fn run(tick_rate: Duration, enhanced_graphics: bool) -> Result<(), Box<dyn Error>> { pub fn run(app: App, tick_rate: Duration) -> Result<(), Box<dyn Error>> {
// setup terminal // setup terminal
enable_raw_mode()?; enable_raw_mode()?;
let mut stdout = io::stdout(); let mut stdout = io::stdout();
@ -21,8 +21,6 @@ pub fn run(tick_rate: Duration, enhanced_graphics: bool) -> Result<(), Box<dyn E
let backend = CrosstermBackend::new(stdout); let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?; let mut terminal = Terminal::new(backend)?;
// create app and run it
let app = App::new("rmp - Rust Music Player", enhanced_graphics);
let res = run_app(&mut terminal, app, tick_rate); let res = run_app(&mut terminal, app, tick_rate);
// restore terminal // restore terminal
@ -56,19 +54,32 @@ fn run_app<B: Backend>(
if crossterm::event::poll(timeout)? { if crossterm::event::poll(timeout)? {
if let Event::Key(key) = event::read()? { if let Event::Key(key) = event::read()? {
if key.kind == KeyEventKind::Press { if key.kind == KeyEventKind::Press {
match key.code { if app.connected() {
KeyCode::Char(c) => app.on_key(c), match key.code {
KeyCode::Left => app.on_left(), KeyCode::Char(c) => app.on_key(c),
KeyCode::Up => app.on_up(), KeyCode::Left => app.on_left(),
KeyCode::Right => app.on_right(), KeyCode::Up => app.on_up(),
KeyCode::Down => app.on_down(), KeyCode::Right => app.on_right(),
KeyCode::Enter => app.on_enter(), KeyCode::Down => app.on_down(),
KeyCode::Tab => app.on_tab(), KeyCode::Enter => app.on_enter(),
_ => {} KeyCode::Tab => app.on_tab(),
_ => {}
}
} else {
// Allow quitting while in "Not connected" screen
match key.code {
KeyCode::Char(c) => {
if c == 'q' {
app.should_quit = true;
}
}
_ => (),
}
} }
} }
} }
} }
if last_tick.elapsed() >= tick_rate { if last_tick.elapsed() >= tick_rate {
app.on_tick(last_tick.elapsed()); app.on_tick(last_tick.elapsed());
last_tick = Instant::now(); last_tick = Instant::now();

View File

@ -0,0 +1,94 @@
use std::{
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use interprocess::local_socket::LocalSocketStream;
use rmp::{
protocol::{self, Message, MessageError, MessageType},
ServerState,
};
pub fn request_queue_cleaner(
message_rate: Duration,
queue: Arc<Mutex<Vec<Message>>>,
state: Arc<Mutex<Option<ServerState>>>,
) {
let mut last_tick = Instant::now();
let mut should_connect = true;
let mut stream: Option<LocalSocketStream> = None;
loop {
if should_connect {
*state.lock().unwrap() = None;
stream = Some(connect());
should_connect = false;
}
match stream.as_mut() {
Some(mut stream) => {
for request in queue.lock().unwrap().drain(..) {
if let Err(_) = protocol::send(&mut stream, &request) {
should_connect = true;
continue;
}
if let Ok(response) = protocol::receive(&mut stream) {
match route_response(&response, &mut state.lock().unwrap()) {
Err(error) => {
eprintln!("{error:?}")
}
_ => {}
}
}
}
// HACK: keep updating state
queue
.lock()
.unwrap()
.push(Message::new(MessageType::StateFetch, None));
let sleep_duration = message_rate
.checked_sub(last_tick.elapsed())
.unwrap_or_else(|| Duration::from_secs(0));
std::thread::sleep(sleep_duration);
last_tick = Instant::now();
}
None => should_connect = true,
}
}
}
/// Blocks thread until connected to socket
fn connect() -> LocalSocketStream {
let path = rmp::os::get_socket_path().unwrap();
loop {
match LocalSocketStream::connect(path.clone()) {
Ok(stream) => return stream,
Err(_) => {}
}
std::thread::sleep(Duration::from_millis(100));
}
}
fn route_response(response: &Message, state: &mut Option<ServerState>) -> Result<(), String> {
match response.message_type {
MessageType::StateResponse => {
let body = response.body.as_ref().ok_or("Missing response body")?;
let response: ServerState =
bincode::deserialize(&body).map_err(|err| err.to_string())?;
*state = Some(response);
}
MessageType::NotImplementedAck => {
eprintln!("Server doesn't implement message")
}
MessageType::ProtocolError => {
let body = response.body.as_ref().ok_or("Missing response body")?;
let response: MessageError =
bincode::deserialize(&body).map_err(|err| err.to_string())?;
eprintln!("Server claims protocol error: {response:?}");
}
message_type => {
eprintln!("Message handling not implemented for client: {message_type:?}");
}
}
Ok(())
}

View File

@ -1,54 +1,47 @@
use ratatui::{prelude::*, widgets::*}; use ratatui::{prelude::*, widgets::*};
use std::path::Path;
use super::app::App; use super::app::App;
pub fn draw<B: Backend>(f: &mut Frame<B>, app: &mut App) { pub fn draw<B: Backend>(f: &mut Frame<B>, app: &mut App) {
let chunks = Layout::default() if app.connected() {
.constraints([Constraint::Min(3), Constraint::Length(2)].as_ref()) let chunks = Layout::default()
.split(f.size()); .constraints([Constraint::Min(3), Constraint::Length(2)].as_ref())
draw_playlist(f, app, chunks[0]); .split(f.size());
draw_player(f, app, chunks[1]); draw_playlist(f, app, chunks[0]);
draw_player(f, app, chunks[1]);
} else {
draw_no_connection(f);
}
} }
static PRIMARY_COLOR: Color = Color::Rgb(200, 150, 70); static PRIMARY_COLOR: Color = Color::Rgb(200, 150, 70);
static SECONDARY_COLOR: Color = Color::Rgb(200, 200, 200); static SECONDARY_COLOR: Color = Color::Rgb(200, 200, 200);
fn draw_no_connection<B: Backend>(f: &mut Frame<B>) {
let message = "Not connected";
let width = message.len() as u16 + 4;
let height = 3;
let x = (f.size().width as i16 - width as i16).max(0) as u16 / 2;
let y = (f.size().height as i16 - height as i16).max(0) as u16 / 2;
let area = Rect {
x,
y,
width: u16::min(width, f.size().width - x),
height: u16::min(height, f.size().height - y),
};
f.render_widget(
Paragraph::new(message)
.fg(SECONDARY_COLOR)
.block(Block::default().borders(Borders::ALL).fg(PRIMARY_COLOR))
.alignment(Alignment::Center),
area,
);
}
fn draw_playlist<B: Backend>(f: &mut Frame<B>, app: &App, area: Rect) { fn draw_playlist<B: Backend>(f: &mut Frame<B>, app: &App, area: Rect) {
let tracks: Vec<_> = app let playlist = List::new(vec![])
.playlist
.items
.iter()
.enumerate()
.map(|(index, path)| {
let selected = app
.playlist
.state
.selected()
.map_or(false, |selected| index == selected);
let playing = app
.player_state
.currently_playing
.clone()
.map_or(false, |currently_playing| currently_playing == *path);
let mut style = Style::default();
match (selected, playing) {
(true, false) => {
style.fg = Some(Color::Black);
style.bg = Some(PRIMARY_COLOR);
}
(false, true) => style.fg = Some(PRIMARY_COLOR),
(true, true) => {
style.fg = None;
style.bg = Some(PRIMARY_COLOR);
}
(_, _) => (),
}
let content = Span::from(format_track_name(path));
ListItem::new(content).set_style(style)
})
.collect();
let playlist = List::new(tracks)
.block( .block(
Block::default() Block::default()
.borders(Borders::ALL) .borders(Borders::ALL)
@ -59,22 +52,11 @@ fn draw_playlist<B: Backend>(f: &mut Frame<B>, app: &App, area: Rect) {
} }
fn draw_player<B: Backend>(f: &mut Frame<B>, app: &App, area: Rect) { fn draw_player<B: Backend>(f: &mut Frame<B>, app: &App, area: Rect) {
let title_content = match app.player_state.currently_playing.as_ref() { let title_content = vec![
Some(playing) => { Span::from("[ "),
let symbol = match app.player_state.is_playing { Span::from("??").fg(PRIMARY_COLOR),
true => Span::from("||").fg(PRIMARY_COLOR), Span::from(" ]"),
false => Span::from("|>").fg(Color::Black).bg(PRIMARY_COLOR), ];
};
vec![
Span::from("[ "),
symbol,
Span::from(" "),
Span::from(format_track_name(playing.as_str())),
Span::from(" ]"),
]
}
None => vec![Span::from(" Nothing selected ")],
};
let player = Gauge::default() let player = Gauge::default()
.block( .block(
@ -85,19 +67,8 @@ fn draw_player<B: Backend>(f: &mut Frame<B>, app: &App, area: Rect) {
.title_position(block::Position::Top), .title_position(block::Position::Top),
) )
.gauge_style(Style::default().fg(PRIMARY_COLOR)) .gauge_style(Style::default().fg(PRIMARY_COLOR))
.ratio(app.player_state.time_ratio) .ratio(0.25)
.label(if app.player_state.is_playing { .label("[ PAUSED ]");
""
} else {
"[ PAUSED ]"
});
f.render_widget(player, area) f.render_widget(player, area)
} }
fn format_track_name(path: &str) -> String {
match Path::new(path).file_stem() {
Some(file_name) => file_name.to_string_lossy().to_string(),
None => path.to_string(),
}
}

View File

@ -1 +1,76 @@
pub mod client; use std::path::PathBuf;
use serde::{Deserialize, Serialize};
#[cfg(target_family = "unix")]
#[path = "os_unix.rs"]
pub mod os;
pub mod protocol;
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct ServerState {
pub playlist_params: PlaylistParams,
pub directory_playlist: Option<DirectoryPlaylist>,
pub queue_playlist: QueuePlaylist,
}
pub type PlaylistElement = PathBuf;
pub enum PlaylistType {
Directory,
Queue,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Playlist {
pub items: Vec<PlaylistElement>,
pub current: Option<usize>,
}
impl Playlist {}
#[derive(Serialize, Deserialize, Debug)]
pub struct DirectoryPlaylist {
pub directory: PathBuf,
pub playlist: Playlist,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct QueuePlaylist {
pub playlist: Playlist,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct PlaylistParams {
pub shuffle: bool,
pub next: bool,
pub repeat: bool,
}
impl Default for PlaylistParams {
fn default() -> Self {
Self {
shuffle: false,
next: true,
repeat: true,
}
}
}
pub mod server {
use std::{fmt::Debug, path::PathBuf};
#[derive(Debug)]
pub enum ServerError {
Other(String),
Io(std::io::Error),
AlreadyStarted,
MissingRuntimeDir(PathBuf),
}
impl ServerError {
pub fn from_debuggable(err: impl Debug) -> Self {
Self::Other(format!("Unexpected error: {err:?}").to_string())
}
}
}

63
src/main.rs Normal file
View File

@ -0,0 +1,63 @@
use argh::FromArgs;
pub mod client;
pub mod server;
/// rmp: Rust Music Player
#[derive(Debug, FromArgs)]
pub struct CliArgs {
/// run the server
#[argh(switch, short = 's')]
server: bool,
/// randomize next track?
#[argh(option, default = "false")]
shuffle: bool,
/// change track after the current one is finished?
#[argh(option, default = "true")]
next: bool,
/// repeat the playlist (or track if 'next' is disabled) after it's finished
#[argh(option, default = "true")]
repeat: bool,
/// kill server
#[argh(switch, short = 'x')]
exit: bool,
/// don't start server even if it's not running
#[argh(switch, short = 'c')]
client_only: bool,
/// time in ms between two ticks.
#[argh(option, default = "250")]
tick_rate: u64,
/// interval in ms for clearing the request queue.
#[argh(option, default = "50")]
message_rate: u64,
/// whether unicode symbols are used to improve the overall look of the app
#[argh(option, default = "true")]
enhanced_graphics: bool,
}
fn main() {
let args: CliArgs = argh::from_env();
if args.exit {
server::kill();
return;
}
if args.server {
server::run(args).unwrap();
return;
}
if !args.client_only && !server::is_running().unwrap() {
server::run_in_background();
}
client::run(args).unwrap();
}

92
src/os_unix.rs Normal file
View File

@ -0,0 +1,92 @@
use std::{
fs,
path::{Path, PathBuf},
process::{id, Command, Stdio},
};
use super::server::ServerError;
pub fn reserve_pid() -> Result<(), ServerError> {
let pid_path = get_pid_path()?;
is_running()?;
fs::write(&pid_path, id().to_string()).map_err(|err| ServerError::Io(err))?;
Command::new("chmod")
.args(&["600", &pid_path.to_string_lossy()])
.output()
.map_err(|err| ServerError::Io(err))?;
Ok(())
}
pub fn is_running() -> Result<bool, ServerError> {
let pid_path = get_pid_path()?;
match fs::read(&pid_path) {
Ok(old_pid) => {
let old_pid =
String::from_utf8(old_pid).map_err(|err| ServerError::from_debuggable(err))?;
let old_pid = old_pid.trim();
Ok(Command::new("ps")
.args(&["-p", old_pid])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map_err(|err| ServerError::Io(err))?
.success())
}
_ => Ok(false),
}
}
pub fn run_in_background() -> Result<(), ServerError> {
let this = std::env::args().next().unwrap();
Command::new(this)
.stdout(Stdio::null())
.stderr(Stdio::null())
.args(&["-s"])
.spawn()
.map_err(|err| ServerError::Io(err))?;
Ok(())
}
pub fn kill() -> Result<(), ServerError> {
let pid_path = get_pid_path()?;
let socket_path = get_socket_path()?;
let pid = String::from_utf8(fs::read(&pid_path).map_err(|err| ServerError::Io(err))?)
.map_err(|err| ServerError::from_debuggable(err))?;
let pid = pid.trim();
Command::new("kill")
.arg(pid)
.spawn()
.map_err(|err| ServerError::Io(err))?;
Command::new("rm")
.args(&[
"-f",
&pid_path.to_string_lossy(),
&socket_path.to_string_lossy(),
])
.spawn()
.map_err(|err| ServerError::Io(err))?;
Ok(())
}
pub fn get_socket_path() -> Result<PathBuf, ServerError> {
Ok(get_runtime_dir()?.join("rmp.socket"))
}
fn get_runtime_dir() -> Result<PathBuf, ServerError> {
let uid = String::from_utf8(
Command::new("id")
.arg("-u")
.output()
.map_err(|err| ServerError::Io(err))?
.stdout,
)
.map_err(|err| ServerError::from_debuggable(err))?;
let dir = Path::new("/run/user").join(uid.trim().to_string());
Ok(dir)
}
fn get_pid_path() -> Result<PathBuf, ServerError> {
Ok(get_runtime_dir()?.join("rmp.pid"))
}

130
src/protocol.rs Normal file
View File

@ -0,0 +1,130 @@
use std::{
fmt::Display,
io::{Read, Write},
};
use interprocess::local_socket::LocalSocketStream;
use serde::{Deserialize, Serialize};
use crate::ServerState;
/// Prefix messages with this header
pub const HEADER_MAGIC: [u8; 4] = [0xCA, 0xFE, 0xBA, 0xBE];
/// Maximum allowed body size
pub const MAX_BODY_LENGTH: usize = 10 * 1024 * 1024;
#[derive(Debug, Serialize, Deserialize)]
pub enum MessageError {
HeaderMismatch,
BodySizeLimit,
ChecksumMismatch,
ReadError,
DeserializationError,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Deserialize, Serialize)]
pub enum MessageType {
/// Generic acknowledge
Ack = 0,
/// client/server did not know how to handle the request
NotImplementedAck,
/// Request was invalid
ProtocolError,
StateFetch,
StateResponse,
ToggleSuffle,
ToggleNext,
ToggleRepeat,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Message {
pub message_type: MessageType,
pub body: Option<Vec<u8>>,
}
impl Message {
pub fn new(message_type: MessageType, body: Option<&[u8]>) -> Self {
Self {
message_type,
body: body.map(|b| Vec::from(b)),
}
}
/// Message format (values are in little-endian):
/// offset | size | explanation
/// -------+------+-----------
/// 0x00 | u32 | HEADER_MAGIC
/// 0x04 | u32 | Body checksum
/// 0x08 | u32 | Body length
/// 0x12 | ? | Body
fn as_bytes(&self) -> Vec<u8> {
let magic = &HEADER_MAGIC[..];
let body = &bincode::serialize(self).unwrap();
let checksum = &crc32fast::hash(&body).to_le_bytes();
let body_length = &(body.len() as u32).to_le_bytes();
[magic, checksum, body_length, body].concat()
}
pub fn state_response(server_state: &ServerState) -> Result<Self, String> {
Ok(Self {
message_type: MessageType::StateResponse,
body: Some(bincode::serialize(server_state).map_err(|err| err.to_string())?),
})
}
}
impl Display for Message {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?}\t{}",
self.message_type,
self.body
.as_ref()
.map_or("(no body)".into(), |body| format!("{} B", body.len()))
)
}
}
pub fn send(stream: &mut LocalSocketStream, message: &Message) -> Result<(), std::io::Error> {
stream.write_all(&message.as_bytes())?;
Ok(())
}
pub fn receive(stream: &mut LocalSocketStream) -> Result<Message, MessageError> {
let mut magic_buffer = vec![0; HEADER_MAGIC.len()];
if let Err(_) = stream.read_exact(&mut magic_buffer) {
return Err(MessageError::ReadError);
}
if magic_buffer != HEADER_MAGIC {
return Err(MessageError::HeaderMismatch);
}
let mut checksum_buffer = [0; 4];
if let Err(_) = stream.read_exact(&mut checksum_buffer) {
return Err(MessageError::ReadError);
}
let expected_checksum = u32::from_le_bytes(checksum_buffer);
let mut body_length_buffer = [0; 4];
if let Err(_) = stream.read_exact(&mut body_length_buffer) {
return Err(MessageError::ReadError);
}
let expected_body_length = u32::from_le_bytes(body_length_buffer) as usize;
if expected_body_length > MAX_BODY_LENGTH {
return Err(MessageError::BodySizeLimit);
}
let mut body_buffer = vec![0; expected_body_length];
if let Err(_) = stream.read_exact(&mut body_buffer) {
return Err(MessageError::ReadError);
}
if crc32fast::hash(&body_buffer) != expected_checksum {
return Err(MessageError::ChecksumMismatch);
}
bincode::deserialize(&body_buffer).map_err(|_| MessageError::DeserializationError)
}

174
src/server.rs Normal file
View File

@ -0,0 +1,174 @@
use interprocess::local_socket::{LocalSocketListener, LocalSocketStream};
use rmp::{
os,
protocol::{Message, MessageType},
server::ServerError,
PlaylistParams, ServerState,
};
use std::{
fs,
sync::{Arc, Mutex},
};
use crate::CliArgs;
pub mod audio_backend;
#[derive(Debug)]
pub struct Server {
pub state: ServerState,
}
impl Server {
pub fn from_state(state: ServerState) -> Self {
Self { state }
}
}
pub fn run(args: CliArgs) -> Result<(), ServerError> {
if let Err(err) = os::reserve_pid() {
let exit_code = handle_error(err);
std::process::exit(exit_code);
}
serve(ServerState {
playlist_params: PlaylistParams {
shuffle: args.shuffle,
next: args.next,
repeat: args.repeat,
},
..Default::default()
})
}
pub fn kill() {
if let Err(err) = os::kill() {
let exit_code = handle_error(err);
std::process::exit(exit_code);
}
}
pub fn is_running() -> Result<bool, ServerError> {
match os::is_running() {
Ok(is_running) => Ok(is_running),
Err(err) => {
let exit_code = handle_error(err);
std::process::exit(exit_code);
}
}
}
pub fn run_in_background() {
if let Err(err) = os::run_in_background() {
let exit_code = handle_error(err);
std::process::exit(exit_code);
}
}
fn serve(state: ServerState) -> Result<(), ServerError> {
let socket_path = os::get_socket_path()?;
if socket_path.exists() {
fs::remove_file(&socket_path).map_err(|err| ServerError::Io(err))?;
}
let socket = LocalSocketListener::bind(socket_path).map_err(|err| ServerError::Io(err))?;
let server = Arc::new(Mutex::new(Server::from_state(state)));
println!("Waiting for connections...");
let mut session_counter = 0;
for message in socket.incoming() {
match message {
Ok(stream) => {
session_counter += 1;
let thread_builder =
std::thread::Builder::new().name(format!("session_{session_counter}"));
let server = server.clone();
thread_builder
.spawn(move || session_handler(stream, server))
.unwrap();
}
Err(err) => {
let exit_code = handle_error(ServerError::Io(err));
std::process::exit(exit_code);
}
};
}
println!("Reached the end of an infinite loop?");
Ok(())
}
fn session_handler(mut stream: LocalSocketStream, server: Arc<Mutex<Server>>) {
let thread = std::thread::current();
let session_id = thread.name().unwrap_or("<unnamed>");
println!("[{session_id}] session created");
loop {
match rmp::protocol::receive(&mut stream) {
Ok(message) => {
println!("[{session_id}] rx {message}");
match route_request(&message, &mut server.lock().unwrap()) {
Err(err) => {
eprintln!("[{session_id}] rx Error: {err}");
}
Ok(response) => {
println!("[{session_id}] tx {response}");
rmp::protocol::send(&mut stream, &response).unwrap();
}
}
}
Err(error) => match error {
rmp::protocol::MessageError::ReadError => {
println!("[{session_id}] session terminated");
return;
}
error => {
eprintln!("[{session_id}] rx {error:?}");
let body = bincode::serialize(&error).unwrap();
let message = Message::new(MessageType::ProtocolError, Some(&body));
println!("[{session_id}] tx {message}");
rmp::protocol::send(&mut stream, &message).unwrap();
}
},
}
}
}
fn handle_error(err: ServerError) -> i32 {
match &err {
ServerError::Other(msg) => {
eprintln!("Unknown error: {msg}");
1
}
ServerError::Io(err) => {
eprintln!("IO error: {err}");
2
}
ServerError::AlreadyStarted => {
eprintln!("Server already running");
100
}
ServerError::MissingRuntimeDir(path) => {
eprintln!("Missing runtime directory: {}", path.to_string_lossy());
101
}
}
}
fn route_request(request: &Message, server: &mut Server) -> Result<Message, String> {
match request.message_type {
MessageType::StateFetch => {
return Message::state_response(&server.state);
}
MessageType::ToggleNext => {
server.state.playlist_params.next = !server.state.playlist_params.next;
return Message::state_response(&server.state);
}
MessageType::ToggleSuffle => {
server.state.playlist_params.shuffle = !server.state.playlist_params.shuffle;
return Message::state_response(&server.state);
}
MessageType::ToggleRepeat => {
server.state.playlist_params.repeat = !server.state.playlist_params.repeat;
return Message::state_response(&server.state);
}
_ => {}
}
Ok(Message::new(MessageType::NotImplementedAck, None))
}

View File

@ -0,0 +1,7 @@
use std::{path::Path, time::Duration};
pub trait AudioBackend {
fn seek_and_play(&mut self, file_path: &Path, seek: Duration) -> ();
fn pause_playback(&mut self) -> ();
fn continue_playback(&mut self) -> ();
}