diff --git a/Cargo.lock b/Cargo.lock index 89eb5c1..493af61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -203,6 +218,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -211,6 +227,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -229,10 +273,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -392,11 +442,13 @@ dependencies = [ name = "inv_sig_helper_rust" version = "0.1.0" dependencies = [ + "futures", "lazy-regex", "regex", "reqwest", "rquickjs", "tokio", + "tokio-util", "tub", ] @@ -1059,7 +1111,9 @@ checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", + "futures-util", "pin-project-lite", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 832c5a5..f51f9e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,9 @@ edition = "2021" [dependencies] regex = "1.10.4" rquickjs = {version = "0.6.0", features=["futures", "parallel"]} -tokio = { version = "1.37.0", features = ["full", "net", "macros", "rt-multi-thread", "io-std", "io-util"] } +tokio = { version = "1.37.0", features = ["full", "net", "macros", "rt-multi-thread", "io-std", "io-util", "mio"] } reqwest = "0.12.4" lazy-regex = "3.1.0" tub = "0.3.7" +tokio-util = { version = "0.7.10", features=["futures-io", "futures-util", "codec"]} +futures = "0.3.30" diff --git a/src/jobs.rs b/src/jobs.rs index 36f6884..187b87b 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -1,3 +1,4 @@ +use futures::{Sink, SinkExt, Stream, StreamExt}; use rquickjs::{async_with, AsyncContext, AsyncRuntime}; use std::{num::NonZeroUsize, sync::Arc, thread::available_parallelism}; use tokio::{io::AsyncWriteExt, runtime::Handle, sync::Mutex, task::block_in_place}; @@ -5,6 +6,7 @@ use tub::Pool; use crate::{ consts::NSIG_FUNCTION_NAME, + opcode::OpcodeResponse, player::{fetch_update, FetchUpdateStatus}, }; @@ -109,44 +111,27 @@ impl GlobalState { } } -macro_rules! write_failure { - ($s:ident, $r:ident) => { - $s.write_u32($r).await; - $s.write_u16(0x0000).await; - }; -} - pub async fn process_fetch_update( state: Arc, stream: Arc>, request_id: u32, ) where - W: tokio::io::AsyncWrite + Unpin + Send, + W: SinkExt + Unpin + Send, { let cloned_writer = stream.clone(); - let mut writer; - let global_state = state.clone(); + let status = fetch_update(global_state).await; - match fetch_update(global_state).await { - Ok(_x) => { - writer = cloned_writer.lock().await; - writer.write_u32(request_id).await; - // sync code to tell the client the player had updated - writer.write_u16(0xF44F).await; - println!("Successfully updated the player"); - } - Err(FetchUpdateStatus::PlayerAlreadyUpdated) => { - writer = cloned_writer.lock().await; - writer.write_u32(request_id).await; - writer.write_u16(0xFFFF).await; - } - Err(_x) => { - writer = cloned_writer.lock().await; - writer.write_u32(request_id).await; - writer.write_u16(0).await; - } - } + let mut writer = cloned_writer.lock().await; + writer + .send(OpcodeResponse { + opcode: JobOpcode::ForceUpdate, + request_id, + update_status: status, + signature: Default::default(), + signature_timestamp: Default::default(), + }) + .await; } pub async fn process_decrypt_n_signature( @@ -155,7 +140,7 @@ pub async fn process_decrypt_n_signature( stream: Arc>, request_id: u32, ) where - W: tokio::io::AsyncWrite + Unpin + Send, + W: SinkExt + Unpin + Send, { let cloned_writer = stream.clone(); let global_state = state.clone(); @@ -179,7 +164,13 @@ pub async fn process_decrypt_n_signature( println!("JavaScript interpreter error (nsig code): {}", n); } writer = cloned_writer.lock().await; - write_failure!(writer, request_id); + writer.send(OpcodeResponse { + opcode: JobOpcode::DecryptNSignature, + request_id, + update_status: Ok(Default::default()), + signature: String::new(), + signature_timestamp: Default::default() + }).await; return; } } @@ -190,7 +181,7 @@ pub async fn process_decrypt_n_signature( let mut call_string: String = String::new(); call_string += NSIG_FUNCTION_NAME; call_string += "(\""; - call_string += &sig; + call_string += &sig.replace("\"", "\\\""); call_string += "\")"; let decrypted_string = match ctx.eval::(call_string) { @@ -202,19 +193,26 @@ pub async fn process_decrypt_n_signature( println!("JavaScript interpreter error (nsig code): {}", n); } writer = cloned_writer.lock().await; - write_failure!(writer, request_id); + writer.send(OpcodeResponse { + opcode: JobOpcode::DecryptNSignature, + request_id, + update_status: Ok(Default::default()), + signature: String::new(), + signature_timestamp: Default::default() + }).await; return; } }; writer = cloned_writer.lock().await; - writer.write_u32(request_id).await; - writer.write_u16(u16::try_from(decrypted_string.len()).unwrap()).await; - writer.write_all(decrypted_string.as_bytes()).await; - - println!("Decrypted signature: {}", decrypted_string); - + writer.send(OpcodeResponse { + opcode: JobOpcode::DecryptNSignature, + request_id, + update_status: Ok(Default::default()), + signature: decrypted_string, + signature_timestamp: Default::default() + }).await; }) .await; } @@ -225,7 +223,7 @@ pub async fn process_decrypt_signature( stream: Arc>, request_id: u32, ) where - W: tokio::io::AsyncWrite + Unpin + Send, + W: SinkExt + Unpin + Send, { let cloned_writer = stream.clone(); let global_state = state.clone(); @@ -248,7 +246,13 @@ pub async fn process_decrypt_signature( println!("JavaScript interpreter error (sig code): {}", n); } writer = cloned_writer.lock().await; - write_failure!(writer, request_id); + writer.send(OpcodeResponse { + opcode: JobOpcode::DecryptSignature, + request_id, + update_status: Ok(Default::default()), + signature: String::new(), + signature_timestamp: Default::default() + }).await; return; } } @@ -260,7 +264,7 @@ pub async fn process_decrypt_signature( let mut call_string: String = String::new(); call_string += sig_function_name; call_string += "(\""; - call_string += &sig; + call_string += &sig.replace("\"", "\\\""); call_string += "\")"; drop(player_info); @@ -274,19 +278,26 @@ pub async fn process_decrypt_signature( println!("JavaScript interpreter error (sig code): {}", n); } writer = cloned_writer.lock().await; - write_failure!(writer, request_id); + writer.send(OpcodeResponse { + opcode: JobOpcode::DecryptSignature, + request_id, + update_status: Ok(Default::default()), + signature: String::new(), + signature_timestamp: Default::default() + }).await; return; } }; writer = cloned_writer.lock().await; - writer.write_u32(request_id).await; - writer.write_u16(u16::try_from(decrypted_string.len()).unwrap()).await; - writer.write_all(decrypted_string.as_bytes()).await; - - println!("Decrypted signature: {}", decrypted_string); - + writer.send(OpcodeResponse { + opcode: JobOpcode::DecryptSignature, + request_id, + update_status: Ok(Default::default()), + signature: decrypted_string, + signature_timestamp: Default::default(), + }).await; }) .await; } @@ -296,7 +307,7 @@ pub async fn process_get_signature_timestamp( stream: Arc>, request_id: u32, ) where - W: tokio::io::AsyncWrite + Unpin + Send, + W: SinkExt + Unpin + Send, { let cloned_writer = stream.clone(); let global_state = state.clone(); @@ -305,7 +316,13 @@ pub async fn process_get_signature_timestamp( let timestamp = player_info.signature_timestamp; let mut writer = cloned_writer.lock().await; - - writer.write_u32(request_id).await; - writer.write_u64(timestamp).await; + writer + .send(OpcodeResponse { + opcode: JobOpcode::GetSignatureTimestamp, + request_id, + update_status: Ok(Default::default()), + signature: String::new(), + signature_timestamp: timestamp, + }) + .await; } diff --git a/src/main.rs b/src/main.rs index 124e3f8..5017405 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,25 @@ mod consts; mod jobs; +mod opcode; mod player; +use ::futures::StreamExt; use consts::DEFAULT_SOCK_PATH; use jobs::{process_decrypt_n_signature, process_fetch_update, GlobalState, JobOpcode}; +use opcode::OpcodeDecoder; use player::fetch_update; -use std::{env::args, io::Error, sync::Arc}; +use std::{env::args, future, io::Error, sync::Arc}; use tokio::{ fs::remove_file, - io::{self, AsyncReadExt, BufReader, BufWriter}, - net::{UnixListener, UnixStream}, + io::{ + self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, BufReader, + BufWriter, Interest, Ready, + }, + net::{TcpListener, UnixListener, UnixStream}, sync::Mutex, + task::{futures, spawn_blocking}, }; +use tokio_util::codec::{Decoder, Framed, FramedRead, FramedWrite}; use crate::jobs::{process_decrypt_signature, process_get_signature_timestamp}; @@ -32,10 +40,6 @@ macro_rules! eof_fail { match $res { Ok(value) => value, Err(e) => { - if (e.kind() == io::ErrorKind::UnexpectedEof) { - $stream.get_ref().readable().await?; - continue; - } println!("An error occurred while parsing the current request: {}", e); break; } @@ -54,11 +58,11 @@ async fn main() { // have to please rust let state: Arc = Arc::new(GlobalState::new()); - let socket = match UnixListener::bind(socket_url) { + let socket: UnixListener = match UnixListener::bind(socket_url) { Ok(x) => x, Err(x) => { if x.kind() == std::io::ErrorKind::AddrInUse { - remove_file(socket_url).await.unwrap(); + remove_file(socket_url); UnixListener::bind(socket_url).unwrap() } else { println!("Error occurred while trying to bind: {}", x); @@ -78,81 +82,82 @@ async fn main() { let (socket, _addr) = socket.accept().await.unwrap(); let cloned_state = state.clone(); - tokio::spawn(async { + tokio::spawn(async move { process_socket(cloned_state, socket).await; }); } } -async fn process_socket(state: Arc, socket: UnixStream) -> Result<(), Error> { +async fn process_socket(state: Arc, socket: UnixStream) { let (rd, wr) = socket.into_split(); - let wrapped_readstream = Arc::new(Mutex::new(BufReader::new(rd))); - let wrapped_writestream = Arc::new(Mutex::new(BufWriter::new(wr))); + let decoder = OpcodeDecoder {}; - let cloned_readstream = wrapped_readstream.clone(); - let mut inside_readstream = cloned_readstream.lock().await; + let sink = FramedWrite::new(wr, decoder); + let mut stream = FramedRead::new(rd, decoder); - loop { - inside_readstream.get_ref().readable().await?; + let arc_sink = Arc::new(Mutex::new(sink)); + while let Some(opcode_res) = stream.next().await { + match opcode_res { + Ok(opcode) => { + println!("Received job: {}", opcode.opcode); - let cloned_writestream = wrapped_writestream.clone(); - - let opcode_byte: u8 = eof_fail!(inside_readstream.read_u8().await, inside_readstream); - let opcode: JobOpcode = opcode_byte.into(); - let request_id: u32 = eof_fail!(inside_readstream.read_u32().await, inside_readstream); - - println!("Received job: {}", opcode); - match opcode { - JobOpcode::ForceUpdate => { - let cloned_state = state.clone(); - let cloned_stream = cloned_writestream.clone(); - tokio::spawn(async move { - process_fetch_update(cloned_state, cloned_stream, request_id).await; - }); + match opcode.opcode { + JobOpcode::ForceUpdate => { + let cloned_state = state.clone(); + let cloned_sink = arc_sink.clone(); + tokio::spawn(async move { + process_fetch_update(cloned_state, cloned_sink, opcode.request_id) + .await; + }); + } + JobOpcode::DecryptNSignature => { + let cloned_state = state.clone(); + let cloned_sink = arc_sink.clone(); + tokio::spawn(async move { + process_decrypt_n_signature( + cloned_state, + opcode.signature, + cloned_sink, + opcode.request_id, + ) + .await; + }); + } + JobOpcode::DecryptSignature => { + let cloned_state = state.clone(); + let cloned_sink = arc_sink.clone(); + tokio::spawn(async move { + process_decrypt_signature( + cloned_state, + opcode.signature, + cloned_sink, + opcode.request_id, + ) + .await; + }); + } + JobOpcode::GetSignatureTimestamp => { + let cloned_state = state.clone(); + let cloned_sink = arc_sink.clone(); + tokio::spawn(async move { + process_get_signature_timestamp( + cloned_state, + cloned_sink, + opcode.request_id, + ) + .await; + }); + } + _ => { + continue; + } + } } - JobOpcode::DecryptNSignature => { - let sig_size: usize = usize::from(eof_fail!( - inside_readstream.read_u16().await, - inside_readstream - )); - let mut buf = vec![0u8; sig_size]; - - break_fail!(inside_readstream.read_exact(&mut buf).await); - - let str = break_fail!(String::from_utf8(buf)); - let cloned_state = state.clone(); - let cloned_stream = cloned_writestream.clone(); - tokio::spawn(async move { - process_decrypt_n_signature(cloned_state, str, cloned_stream, request_id).await; - }); + Err(x) => { + println!("I/O error: {:?}", x); + break; } - JobOpcode::DecryptSignature => { - let sig_size: usize = usize::from(eof_fail!( - inside_readstream.read_u16().await, - inside_readstream - )); - let mut buf = vec![0u8; sig_size]; - - break_fail!(inside_readstream.read_exact(&mut buf).await); - - let str = break_fail!(String::from_utf8(buf)); - let cloned_state = state.clone(); - let cloned_stream = cloned_writestream.clone(); - tokio::spawn(async move { - process_decrypt_signature(cloned_state, str, cloned_stream, request_id).await; - }); - } - JobOpcode::GetSignatureTimestamp => { - let cloned_state = state.clone(); - let cloned_stream = cloned_writestream.clone(); - tokio::spawn(async move { - process_get_signature_timestamp(cloned_state, cloned_stream, request_id).await; - }); - } - _ => {} } } - - Ok(()) } diff --git a/src/opcode.rs b/src/opcode.rs new file mode 100644 index 0000000..69833c6 --- /dev/null +++ b/src/opcode.rs @@ -0,0 +1,116 @@ +use std::io::ErrorKind; + +use tokio_util::{ + bytes::{Buf, BufMut}, + codec::{Decoder, Encoder}, +}; + +use crate::{jobs::JobOpcode, player::FetchUpdateStatus}; + +#[derive(Copy, Clone)] +pub struct OpcodeDecoder {} + +pub struct Opcode { + pub opcode: JobOpcode, + pub request_id: u32, + + pub signature: String, +} + +pub struct OpcodeResponse { + pub opcode: JobOpcode, + pub request_id: u32, + + pub update_status: Result<(), FetchUpdateStatus>, + pub signature: String, + pub signature_timestamp: u64, +} + +impl Decoder for OpcodeDecoder { + type Item = Opcode; + type Error = std::io::Error; + + fn decode( + &mut self, + src: &mut tokio_util::bytes::BytesMut, + ) -> Result, Self::Error> { + if 5 > src.len() { + return Ok(None); + } + + let opcode_byte: u8 = src[0]; + let opcode: JobOpcode = opcode_byte.into(); + let request_id: u32 = u32::from_be_bytes(src[1..5].try_into().unwrap()); + + match opcode { + JobOpcode::ForceUpdate | JobOpcode::GetSignatureTimestamp => { + src.advance(5); + Ok(Some(Opcode { + opcode, + request_id, + signature: Default::default(), + })) + } + JobOpcode::DecryptSignature | JobOpcode::DecryptNSignature => { + if 7 > src.len() { + return Ok(None); + } + + let sig_size: u16 = (src[5] as u16) << 8 | (src[6] as u16); + + if usize::from(sig_size) > src.len() { + return Ok(None); + } + + let sig: String = + match String::from_utf8(src[7..(usize::from(sig_size) + 7)].to_vec()) { + Ok(x) => x, + Err(x) => { + return Err(std::io::Error::new( + ErrorKind::InvalidData, + x.utf8_error(), + )); + } + }; + + src.advance(7 + sig.len()); + + Ok(Some(Opcode { + opcode, + request_id, + signature: sig, + })) + } + _ => Err(std::io::Error::new(ErrorKind::InvalidInput, "")), + } + } +} + +impl Encoder for OpcodeDecoder { + type Error = std::io::Error; + fn encode( + &mut self, + item: OpcodeResponse, + dst: &mut tokio_util::bytes::BytesMut, + ) -> Result<(), Self::Error> { + dst.put_u32(item.request_id); + match item.opcode { + JobOpcode::ForceUpdate => match item.update_status { + Ok(_x) => dst.put_u16(0xF44F), + Err(FetchUpdateStatus::PlayerAlreadyUpdated) => dst.put_u16(0xFFFF), + Err(_x) => dst.put_u16(0x0000), + }, + JobOpcode::DecryptSignature | JobOpcode::DecryptNSignature => { + dst.put_u16(u16::try_from(item.signature.len()).unwrap()); + if !item.signature.is_empty() { + dst.put_slice(item.signature.as_bytes()); + } + } + JobOpcode::GetSignatureTimestamp => { + dst.put_u64(item.signature_timestamp); + } + _ => {} + } + Ok(()) + } +}