diff --git a/Cargo.lock b/Cargo.lock index 401fdc3..3242989 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -849,7 +849,9 @@ dependencies = [ "fuser", "libc", "log", + "olivefs_common", "quinn", + "rustls", "serde", "serde_bare", "sodiumoxide", @@ -886,6 +888,7 @@ dependencies = [ "env_logger", "futures-util", "log", + "olivefs_common", "quinn", "rcgen", "rustls", diff --git a/olivefs/Cargo.toml b/olivefs/Cargo.toml index 463dd04..6131380 100644 --- a/olivefs/Cargo.toml +++ b/olivefs/Cargo.toml @@ -29,7 +29,11 @@ quinn = { version = "0.8.0", features = [] } # Compression and Encryption zstd = "0.9.2+zstd.1.5.1" sodiumoxide = "0.2.7" +rustls = "0.20.2" # Filesystem fuser = "0.10.0" libc = "0.2.112" + +# Common +olivefs_common = { path = "../olivefs_common" } diff --git a/olivefs/src/requester.rs b/olivefs/src/requester.rs index 9a7673b..7d72989 100644 --- a/olivefs/src/requester.rs +++ b/olivefs/src/requester.rs @@ -1,30 +1,88 @@ -use quinn::Endpoint; +use anyhow::anyhow; +use log::info; +use quinn::{Endpoint, RecvStream, SendStream}; +use rustls::internal::msgs::codec::Codec; +use rustls::{PrivateKey, RootCertStore}; use std::net::SocketAddr; +use std::path::Path; use std::str::FromStr; -use tokio::sync::mpsc; +use std::sync::Arc; -pub struct Requester {} +use olivefs_common::io::read_file; +use olivefs_common::networking::hello_handshake; +use tokio::sync::Mutex; + +pub struct Requester { + internal: RequesterInternal, +} pub struct RequesterInternal { - rx: mpsc::Receiver<()>, - connection: quinn::Connection, + /// The overall connection + pub connection: quinn::Connection, + + // TODO don't know if this should be here... + // /// The control channel + // pub control_channel: (SendStream, RecvStream), + /// A pool of currently-unused streams + pub stream_pool: Arc>>, } impl RequesterInternal { - //pub async fn connect() - pub async fn send(&mut self) -> anyhow::Result<()> { - // TODO use with_roots and only use the desired CA! ... - let _x = quinn::ClientConfig::with_native_roots(); - let ep = Endpoint::client(SocketAddr::from_str("0.0.0.0:0").unwrap()).unwrap(); - let conn = ep - .connect(SocketAddr::from_str("127.0.0.1:5051").unwrap(), "blah") + pub async fn connect( + ca_path: &Path, + cert_path: &Path, + cert_key_path: &Path, + server_endpoint: SocketAddr, + ) -> anyhow::Result { + let ca_cert_der = read_file(ca_path).await?; + let ca_cert = rustls::Certificate::read_bytes(&ca_cert_der).ok_or_else(|| anyhow!(""))?; + let client_cert_der = read_file(cert_path).await?; + let client_cert = + rustls::Certificate::read_bytes(&client_cert_der).ok_or_else(|| anyhow!(""))?; + let client_key_der = read_file(cert_key_path).await?; + let client_key = PrivateKey(client_key_der); + + let mut root_cert_store = RootCertStore::empty(); + root_cert_store.add(&ca_cert)?; + + let client_crypto = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_cert_store) + .with_single_cert(vec![client_cert], client_key)?; + let client_config = quinn::ClientConfig::new(Arc::new(client_crypto)); + let mut client_endpoint = + Endpoint::client(SocketAddr::from_str("0.0.0.0:0").unwrap()).unwrap(); + client_endpoint.set_default_client_config(client_config); + let new_connection = client_endpoint + .connect(server_endpoint, "blah") .unwrap() .await .unwrap(); + + // The current design of Olivefs means we don't have any incoming streams for the client; + // therefore we have no need to handle them here. + let connection = new_connection.connection; + + let (mut control_tx, mut control_rx) = connection.open_bi().await?; + let other_side_version = hello_handshake( + format!("olivefs-{}", env!("CARGO_PKG_VERSION")), + &mut control_tx, + &mut control_rx, + ) + .await?; + info!("Connected to {:?} successfully.", other_side_version); + + Ok(RequesterInternal { + connection, + stream_pool: Arc::new(Default::default()), + }) + } + + pub async fn send(&mut self) -> anyhow::Result<()> { // conn.bi_streams and uni_streams are streams needed to listen out for new streams from the server. // conn.datagrams is similar but for unreliable datagrams - let _bi = conn.connection.open_bi().await?; + //let _bi = conn.connection.open_bi().await?; Ok(()) } diff --git a/olivefs_common/src/io.rs b/olivefs_common/src/io.rs new file mode 100644 index 0000000..3681011 --- /dev/null +++ b/olivefs_common/src/io.rs @@ -0,0 +1,16 @@ +use std::path::Path; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +pub async fn write_file(path: &Path, content: &[u8]) -> anyhow::Result<()> { + let mut file = File::create(path).await?; + file.write_all(content).await?; + Ok(()) +} + +pub async fn read_file(path: &Path) -> anyhow::Result> { + let mut file = File::open(path).await?; + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes).await?; + Ok(bytes) +} diff --git a/olivefs_common/src/lib.rs b/olivefs_common/src/lib.rs index 984e0d3..40e7df8 100644 --- a/olivefs_common/src/lib.rs +++ b/olivefs_common/src/lib.rs @@ -1,3 +1,4 @@ +pub mod io; pub mod messages; pub mod networking; diff --git a/olivefs_common/src/messages.rs b/olivefs_common/src/messages.rs index 9ba5205..a41af7c 100644 --- a/olivefs_common/src/messages.rs +++ b/olivefs_common/src/messages.rs @@ -1,7 +1,6 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -use std::fs::FileType; use std::time::SystemTime; pub const COMMON_VERSION: &'static str = env!("CARGO_PKG_VERSION"); @@ -37,7 +36,7 @@ pub trait DataResponseBase: Serialize + DeserializeOwned + Debug + Clone + 'stat #[derive(Serialize, Deserialize, Debug, Clone)] /// Sent by the server in response to a DataCommand on the same data stream. -pub enum DataResponse { +pub enum DataResponse { /// The command was successful. Success(R), @@ -49,6 +48,25 @@ pub enum DataResponse { }, } +/// Copy of fuser's FileType. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum FileKind { + /// Named pipe (S_IFIFO) + NamedPipe, + /// Character device (S_IFCHR) + CharDevice, + /// Block device (S_IFBLK) + BlockDevice, + /// Directory (S_IFDIR) + Directory, + /// Regular file (S_IFREG) + RegularFile, + /// Symbolic link (S_IFLNK) + Symlink, + /// Unix domain socket (S_IFSOCK) + Socket, +} + /// Copy of fuser's FileAttr. Used to describe a file. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct FileMetadata { @@ -67,7 +85,7 @@ pub struct FileMetadata { // /// Time of creation (macOS only) // pub crtime: SystemTime, /// Kind of file (directory, file, pipe, etc) - pub kind: FileType, + pub kind: FileKind, /// Permissions pub perm: u16, /// Number of hard links diff --git a/olivefs_common/src/networking.rs b/olivefs_common/src/networking.rs index 40ac161..720520f 100644 --- a/olivefs_common/src/networking.rs +++ b/olivefs_common/src/networking.rs @@ -1,11 +1,15 @@ -use quinn::{ReadExactError, RecvStream}; +use crate::messages::HelloMessage; +use anyhow::{anyhow, bail}; +use quinn::{ReadExactError, RecvStream, SendStream}; use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio::io::AsyncWriteExt; pub async fn read_control_message( recv_stream: &mut RecvStream, ) -> anyhow::Result> { - let mut u16_buf = [0u8; 2]; - if let Err(err) = recv_stream.read_exact(&mut u16_buf).await { + let mut u32_buf = [0u8; 4]; + if let Err(err) = recv_stream.read_exact(&mut u32_buf).await { return if err == ReadExactError::FinishedEarly { Ok(None) } else { @@ -13,7 +17,7 @@ pub async fn read_control_message( todo!() }; } - let control_message_length = u16::from_be_bytes(u16_buf) as usize; + let control_message_length = u32::from_be_bytes(u32_buf) as usize; let mut control_message_bytes: Vec = vec![0u8; control_message_length]; @@ -21,5 +25,43 @@ pub async fn read_control_message( .read_exact(&mut control_message_bytes[..]) .await?; - Ok(todo!()) + Ok(Some(serde_bare::from_slice(&control_message_bytes[..])?)) +} + +pub async fn send_control_message( + send_stream: &mut SendStream, + message: &M, +) -> anyhow::Result<()> { + let data_buf = serde_bare::to_vec(&message)?; + let data_len: u32 = data_buf.len().try_into()?; + + send_stream.write_all(&data_len.to_be_bytes()).await?; + send_stream.write_all(&data_buf).await?; + send_stream.flush().await?; + + Ok(()) +} + +pub async fn hello_handshake( + software_version: String, + tx: &mut SendStream, + rx: &mut RecvStream, +) -> anyhow::Result { + let this_side_message = HelloMessage::new(software_version); + send_control_message(tx, &this_side_message).await?; + let other_side_hello_message: HelloMessage = read_control_message(rx) + .await? + .ok_or_else(|| anyhow!("EOF whilst waiting for hello message"))?; + + if this_side_message.protocol_version != other_side_hello_message.protocol_version { + bail!( + "Not compatible with remote. Local ({:?}) protocol: {:?}. Remote ({:?}) protocol: {:?}.", + this_side_message.software_version, + this_side_message.protocol_version, + other_side_hello_message.software_version, + other_side_hello_message.protocol_version + ); + } + + Ok(other_side_hello_message.software_version) } diff --git a/olivefsd/Cargo.toml b/olivefsd/Cargo.toml index 3a5318a..8eb0907 100644 --- a/olivefsd/Cargo.toml +++ b/olivefsd/Cargo.toml @@ -32,3 +32,6 @@ quinn = { version = "0.8.0", features = [] } zstd = "0.9.2+zstd.1.5.1" rustls = "0.20.2" rcgen = { version = "0.8.14", features = ["x509-parser"] } + +# Common +olivefs_common = { path = "../olivefs_common" } diff --git a/olivefsd/src/main.rs b/olivefsd/src/main.rs index f7055ba..e62ec91 100644 --- a/olivefsd/src/main.rs +++ b/olivefsd/src/main.rs @@ -1,6 +1,7 @@ use anyhow::anyhow; use clap::Parser; use futures_util::StreamExt; +use olivefs_common::io::{read_file, write_file}; use quinn::Endpoint; use rcgen::{BasicConstraints, Certificate, CertificateParams, IsCa, KeyPair}; use rustls::internal::msgs::codec::Codec; @@ -28,19 +29,6 @@ pub enum Command { }, } -pub async fn write_file(path: &Path, content: &[u8]) -> anyhow::Result<()> { - let mut file = File::create(path).await?; - file.write_all(content).await?; - Ok(()) -} - -pub async fn read_file(path: &Path) -> anyhow::Result> { - let mut file = File::open(path).await?; - let mut bytes = Vec::new(); - file.read_to_end(&mut bytes).await?; - Ok(bytes) -} - #[tokio::main] async fn main() -> anyhow::Result<()> { let command: Command = Command::parse();