Serialization :)

This commit is contained in:
FyloZ 2023-03-22 23:27:53 -04:00
parent def10a7912
commit 2f739c3118
Signed by: william
GPG Key ID: 835378AE9AF4AE97
10 changed files with 116 additions and 64 deletions

4
Cargo.lock generated
View File

@ -5,6 +5,9 @@ version = 3
[[package]] [[package]]
name = "client" name = "client"
version = "0.1.0" version = "0.1.0"
dependencies = [
"messages",
]
[[package]] [[package]]
name = "libc" name = "libc"
@ -21,4 +24,5 @@ name = "server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"libc", "libc",
"messages",
] ]

View File

@ -6,3 +6,4 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
messages = { path = "../messages" }

View File

@ -1,25 +1,24 @@
use std::io; use std::io;
use std::net::{SocketAddr, UdpSocket}; use std::io::Write;
use std::thread::sleep; use std::net::{SocketAddr, TcpStream};
use std::time::Duration;
use messages::{any_as_u8_slice, Serializable};
use messages::client_registration::ClientRegistration;
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
let bind_addr = SocketAddr::from(([127, 0, 0, 1], 4432)); let addr = SocketAddr::from(([127, 0, 0, 1], 4433));
let server_addr = SocketAddr::from(([127, 0, 0, 1], 4433)); let mut stream = TcpStream::connect(addr)?;
let socket = UdpSocket::bind(bind_addr)?; let registration = ClientRegistration {
socket.connect(server_addr)?; major_version: 0,
minor_version: 0,
name: "My new client :)".to_string(),
};
let mut buffer: [u8; 128] = [0; 128]; let mut buf = [0u8; 1024];
for i in 1..10 { registration.serialize(&mut buf);
for j in 1..128 {
buffer[j] = j as u8;
}
socket.send(&buffer)?; stream.write(&buf)?;
println!("{}", i);
sleep(Duration::from_millis(1000));
}
Ok(()) Ok(())
} }

View File

@ -1,19 +1,37 @@
use crate::Serializable; use crate::{DeserializationError, Serializable};
struct ClientRegistration { const CR_BUFFER_MIN_CAPACITY: usize = 3;
name: str
// Contains the version using semantic versioning
// The patch version is omitted, because it should not affect the communication between the server and the client
// Therefore, a client and a server with a different patch version should be completely compatible
pub struct ClientRegistration {
pub major_version: u8,
pub minor_version: u8,
pub name: String,
} }
impl Serializable for ClientRegistration { impl Serializable for ClientRegistration {
fn serialize(&self) -> Vec<u8> { fn serialize(&self, buf: &mut [u8]) {
let mut buf: Vec<u8> = Vec::new(); let mut vec_buf: Vec<u8> = Vec::with_capacity(buf.len());
vec_buf.insert(0, self.major_version);
vec_buf.insert(1, self.minor_version);
buf.extend_from_slice(self.name.as_bytes()); let name_bytes = self.name.as_bytes();
vec_buf.extend_from_slice(name_bytes);
buf buf[..vec_buf.len()].copy_from_slice(&vec_buf);
} }
fn deserialize(buf: &[u8]) -> Self { fn deserialize(buf: &[u8]) -> Result<Self, DeserializationError> {
String::from_utf8() if buf.len() < CR_BUFFER_MIN_CAPACITY {
return Err(DeserializationError::MissingData);
}
Ok(ClientRegistration {
major_version: buf[0],
minor_version: buf[1],
name: String::from_utf8_lossy(&buf[2..]).into_owned(),
})
} }
} }

View File

@ -1,6 +1,18 @@
mod client_registration; pub mod client_registration;
trait Serializable { pub trait Serializable where Self: Sized {
fn serialize(&self) -> Vec<u8>; fn serialize(&self, buf: &mut [u8]);
fn deserialize(buf: Vec<u8>) -> Self; fn deserialize(buf: &[u8]) -> Result<Self, DeserializationError>;
}
// From: https://stackoverflow.com/questions/28127165/how-to-convert-struct-to-u8
pub unsafe fn any_as_u8_slice<T: Sized>(p: &T) -> &[u8] {
::core::slice::from_raw_parts(
(p as *const T) as *const u8,
::core::mem::size_of::<T>(),
)
}
pub enum DeserializationError {
MissingData
} }

View File

@ -5,3 +5,4 @@ edition = "2021"
[dependencies] [dependencies]
libc = "0.2.140" libc = "0.2.140"
messages = { path = "../messages" }

22
server/src/client.rs Normal file
View File

@ -0,0 +1,22 @@
use std::net::TcpStream;
static mut NEXT_CLIENT_ID: u8 = 0;
pub struct Client {
pub id: u8,
pub stream: TcpStream
}
impl Client {
pub fn new(stream: TcpStream) -> Self {
unsafe {
let id = NEXT_CLIENT_ID;
NEXT_CLIENT_ID += 1;
Client {
id,
stream,
}
}
}
}

View File

@ -26,11 +26,11 @@ impl Epoll {
} }
} }
pub fn add_read_interest(&self, fd: RawFd, key: u64) -> io::Result<()> { pub fn add_read_interest(&self, fd: RawFd, key: u16) -> io::Result<()> {
add_interest(self.fd, fd, listener_read_event(key)) add_interest(self.fd, fd, listener_read_event(key))
} }
pub fn modify_read_interest(&self, fd: RawFd, key: u64) -> io::Result<()> { pub fn modify_read_interest(&self, fd: RawFd, key: u16) -> io::Result<()> {
modify_interest(self.fd, fd, listener_read_event(key)) modify_interest(self.fd, fd, listener_read_event(key))
} }
@ -97,9 +97,9 @@ fn modify_interest(epoll_fd: RawFd, fd: RawFd, mut event: epoll_event) -> io::Re
Ok(()) Ok(())
} }
fn listener_read_event(key: u64) -> epoll_event { fn listener_read_event(key: u16) -> epoll_event {
epoll_event { epoll_event {
events: READ_FLAGS as u32, events: READ_FLAGS as u32,
u64: key, u64: key as u64,
} }
} }

View File

@ -5,23 +5,7 @@ use crate::tcp_server::TcpServer;
mod epoll; mod epoll;
mod tcp_server; mod tcp_server;
mod client;
#[derive(Debug)]
pub struct RequestContext {
pub stream: TcpStream,
pub content_length: usize,
pub buf: Vec<u8>,
}
impl RequestContext {
fn new(stream: TcpStream) -> Self {
Self {
stream,
buf: Vec::new(),
content_length: 0,
}
}
}
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
let addr = SocketAddr::from(([127, 0, 0, 1], 4433)); let addr = SocketAddr::from(([127, 0, 0, 1], 4433));

View File

@ -1,20 +1,24 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::io::Read;
use std::net::{SocketAddr, TcpListener}; use std::net::{SocketAddr, TcpListener};
use std::os::fd::{AsRawFd, RawFd}; use std::os::fd::{AsRawFd, RawFd};
use messages::client_registration::ClientRegistration;
use messages::Serializable;
use crate::client::Client;
use crate::epoll::{Epoll, is_read_event, is_write_event}; use crate::epoll::{Epoll, is_read_event, is_write_event};
use crate::RequestContext;
const KEY_NEW_CONNECTION: u64 = 100; // Based on: https://www.zupzup.org/epoll-with-rust/index.html
const KEY_NEW_CONNECTION: u16 = u16::MAX;
pub struct TcpServer { pub struct TcpServer {
addr: SocketAddr, addr: SocketAddr,
listener: TcpListener, listener: TcpListener,
listener_fd: RawFd, listener_fd: RawFd,
epoll: Epoll, epoll: Epoll,
request_contexts: HashMap<u64, RequestContext>, request_contexts: HashMap<u8, Client>,
key: u64,
} }
impl TcpServer { impl TcpServer {
@ -33,7 +37,6 @@ impl TcpServer {
listener_fd, listener_fd,
epoll, epoll,
request_contexts: HashMap::new(), request_contexts: HashMap::new(),
key: KEY_NEW_CONNECTION,
}) })
} }
@ -48,9 +51,9 @@ impl TcpServer {
.collect::<Vec<(u32, u64)>>(); .collect::<Vec<(u32, u64)>>();
for (events, u64) in events { for (events, u64) in events {
match *u64 { match *u64 as u16 {
KEY_NEW_CONNECTION => self.accept_connection()?, KEY_NEW_CONNECTION => self.accept_connection()?,
key => self.handle_event(*events, key) key => self.handle_event(*events, key as u8)
} }
} }
} }
@ -60,10 +63,13 @@ impl TcpServer {
match self.listener.accept() { match self.listener.accept() {
Ok((stream, addr)) => { Ok((stream, addr)) => {
stream.set_nonblocking(true)?; stream.set_nonblocking(true)?;
println!("New client: {addr}");
self.key += 1; let fd = stream.as_raw_fd();
self.epoll.add_read_interest(stream.as_raw_fd(), self.key)?; let client = Client::new(stream);
self.request_contexts.insert(self.key, RequestContext::new(stream)); println!("New client: {addr} ({})", client.id);
self.epoll.add_read_interest(fd, client.id as u16)?;
self.request_contexts.insert(client.id, client);
} }
Err(e) => eprintln!("Couldn't accept: {e}") Err(e) => eprintln!("Couldn't accept: {e}")
}; };
@ -71,14 +77,19 @@ impl TcpServer {
self.epoll.modify_read_interest(self.listener_fd, KEY_NEW_CONNECTION) self.epoll.modify_read_interest(self.listener_fd, KEY_NEW_CONNECTION)
} }
fn handle_event(&mut self, events: u32, key: u64) { fn handle_event(&mut self, events: u32, key: u8) {
let mut to_delete = None; let mut to_delete = None;
if let Some(context) = self.request_contexts.get_mut(&key) { if let Some(client) = self.request_contexts.get_mut(&key) {
match events { match events {
v if is_read_event(v) => { v if is_read_event(v) => {
// context.read_cb(key, epoll_fd)?; let mut buf = [0u8; 1024];
let read_length = client.stream.read(&mut buf).expect("Failed to read stream");
let registration = ClientRegistration::deserialize(&buf[..read_length]);
println!("Test");
} }
v if is_write_event(v) => { v if is_write_event(v) => {
println!("Write Event");
// context.write_cb(key, epoll_fd)?; // context.write_cb(key, epoll_fd)?;
to_delete = Some(key); to_delete = Some(key);
} }