Work towards connection establishment (client side)

This commit is contained in:
Olivier 'reivilibre' 2022-01-17 23:22:48 +00:00
parent 0297dbf64f
commit 13c2904f6d
9 changed files with 167 additions and 34 deletions

3
Cargo.lock generated
View File

@ -849,7 +849,9 @@ dependencies = [
"fuser", "fuser",
"libc", "libc",
"log", "log",
"olivefs_common",
"quinn", "quinn",
"rustls",
"serde", "serde",
"serde_bare", "serde_bare",
"sodiumoxide", "sodiumoxide",
@ -886,6 +888,7 @@ dependencies = [
"env_logger", "env_logger",
"futures-util", "futures-util",
"log", "log",
"olivefs_common",
"quinn", "quinn",
"rcgen", "rcgen",
"rustls", "rustls",

View File

@ -29,7 +29,11 @@ quinn = { version = "0.8.0", features = [] }
# Compression and Encryption # Compression and Encryption
zstd = "0.9.2+zstd.1.5.1" zstd = "0.9.2+zstd.1.5.1"
sodiumoxide = "0.2.7" sodiumoxide = "0.2.7"
rustls = "0.20.2"
# Filesystem # Filesystem
fuser = "0.10.0" fuser = "0.10.0"
libc = "0.2.112" libc = "0.2.112"
# Common
olivefs_common = { path = "../olivefs_common" }

View File

@ -1,30 +1,88 @@
use quinn::Endpoint; use anyhow::anyhow;
use log::info;
use quinn::{Endpoint, RecvStream, SendStream};
use rustls::internal::msgs::codec::Codec;
use rustls::{PrivateKey, RootCertStore};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use tokio::sync::mpsc; use std::sync::Arc;
pub struct Requester {} use olivefs_common::io::read_file;
use olivefs_common::networking::hello_handshake;
use tokio::sync::Mutex;
pub struct Requester {
internal: RequesterInternal,
}
pub struct RequesterInternal { pub struct RequesterInternal {
rx: mpsc::Receiver<()>, /// The overall connection
connection: quinn::Connection, pub connection: quinn::Connection,
// TODO don't know if this should be here...
// /// The control channel
// pub control_channel: (SendStream, RecvStream),
/// A pool of currently-unused streams
pub stream_pool: Arc<Mutex<Vec<(SendStream, RecvStream)>>>,
} }
impl RequesterInternal { impl RequesterInternal {
//pub async fn connect() pub async fn connect(
pub async fn send(&mut self) -> anyhow::Result<()> { ca_path: &Path,
// TODO use with_roots and only use the desired CA! ... cert_path: &Path,
let _x = quinn::ClientConfig::with_native_roots(); cert_key_path: &Path,
let ep = Endpoint::client(SocketAddr::from_str("0.0.0.0:0").unwrap()).unwrap(); server_endpoint: SocketAddr,
let conn = ep ) -> anyhow::Result<RequesterInternal> {
.connect(SocketAddr::from_str("127.0.0.1:5051").unwrap(), "blah") let ca_cert_der = read_file(ca_path).await?;
let ca_cert = rustls::Certificate::read_bytes(&ca_cert_der).ok_or_else(|| anyhow!(""))?;
let client_cert_der = read_file(cert_path).await?;
let client_cert =
rustls::Certificate::read_bytes(&client_cert_der).ok_or_else(|| anyhow!(""))?;
let client_key_der = read_file(cert_key_path).await?;
let client_key = PrivateKey(client_key_der);
let mut root_cert_store = RootCertStore::empty();
root_cert_store.add(&ca_cert)?;
let client_crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_cert_store)
.with_single_cert(vec![client_cert], client_key)?;
let client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
let mut client_endpoint =
Endpoint::client(SocketAddr::from_str("0.0.0.0:0").unwrap()).unwrap();
client_endpoint.set_default_client_config(client_config);
let new_connection = client_endpoint
.connect(server_endpoint, "blah")
.unwrap() .unwrap()
.await .await
.unwrap(); .unwrap();
// The current design of Olivefs means we don't have any incoming streams for the client;
// therefore we have no need to handle them here.
let connection = new_connection.connection;
let (mut control_tx, mut control_rx) = connection.open_bi().await?;
let other_side_version = hello_handshake(
format!("olivefs-{}", env!("CARGO_PKG_VERSION")),
&mut control_tx,
&mut control_rx,
)
.await?;
info!("Connected to {:?} successfully.", other_side_version);
Ok(RequesterInternal {
connection,
stream_pool: Arc::new(Default::default()),
})
}
pub async fn send(&mut self) -> anyhow::Result<()> {
// conn.bi_streams and uni_streams are streams needed to listen out for new streams from the server. // conn.bi_streams and uni_streams are streams needed to listen out for new streams from the server.
// conn.datagrams is similar but for unreliable datagrams // conn.datagrams is similar but for unreliable datagrams
let _bi = conn.connection.open_bi().await?; //let _bi = conn.connection.open_bi().await?;
Ok(()) Ok(())
} }

16
olivefs_common/src/io.rs Normal file
View File

@ -0,0 +1,16 @@
use std::path::Path;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub async fn write_file(path: &Path, content: &[u8]) -> anyhow::Result<()> {
let mut file = File::create(path).await?;
file.write_all(content).await?;
Ok(())
}
pub async fn read_file(path: &Path) -> anyhow::Result<Vec<u8>> {
let mut file = File::open(path).await?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes).await?;
Ok(bytes)
}

View File

@ -1,3 +1,4 @@
pub mod io;
pub mod messages; pub mod messages;
pub mod networking; pub mod networking;

View File

@ -1,7 +1,6 @@
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::Debug; use std::fmt::Debug;
use std::fs::FileType;
use std::time::SystemTime; use std::time::SystemTime;
pub const COMMON_VERSION: &'static str = env!("CARGO_PKG_VERSION"); pub const COMMON_VERSION: &'static str = env!("CARGO_PKG_VERSION");
@ -37,7 +36,7 @@ pub trait DataResponseBase: Serialize + DeserializeOwned + Debug + Clone + 'stat
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
/// Sent by the server in response to a DataCommand on the same data stream. /// Sent by the server in response to a DataCommand on the same data stream.
pub enum DataResponse<R: DataResponseBase> { pub enum DataResponse<R> {
/// The command was successful. /// The command was successful.
Success(R), Success(R),
@ -49,6 +48,25 @@ pub enum DataResponse<R: DataResponseBase> {
}, },
} }
/// Copy of fuser's FileType.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum FileKind {
/// Named pipe (S_IFIFO)
NamedPipe,
/// Character device (S_IFCHR)
CharDevice,
/// Block device (S_IFBLK)
BlockDevice,
/// Directory (S_IFDIR)
Directory,
/// Regular file (S_IFREG)
RegularFile,
/// Symbolic link (S_IFLNK)
Symlink,
/// Unix domain socket (S_IFSOCK)
Socket,
}
/// Copy of fuser's FileAttr. Used to describe a file. /// Copy of fuser's FileAttr. Used to describe a file.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct FileMetadata { pub struct FileMetadata {
@ -67,7 +85,7 @@ pub struct FileMetadata {
// /// Time of creation (macOS only) // /// Time of creation (macOS only)
// pub crtime: SystemTime, // pub crtime: SystemTime,
/// Kind of file (directory, file, pipe, etc) /// Kind of file (directory, file, pipe, etc)
pub kind: FileType, pub kind: FileKind,
/// Permissions /// Permissions
pub perm: u16, pub perm: u16,
/// Number of hard links /// Number of hard links

View File

@ -1,11 +1,15 @@
use quinn::{ReadExactError, RecvStream}; use crate::messages::HelloMessage;
use anyhow::{anyhow, bail};
use quinn::{ReadExactError, RecvStream, SendStream};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::io::AsyncWriteExt;
pub async fn read_control_message<M: DeserializeOwned>( pub async fn read_control_message<M: DeserializeOwned>(
recv_stream: &mut RecvStream, recv_stream: &mut RecvStream,
) -> anyhow::Result<Option<M>> { ) -> anyhow::Result<Option<M>> {
let mut u16_buf = [0u8; 2]; let mut u32_buf = [0u8; 4];
if let Err(err) = recv_stream.read_exact(&mut u16_buf).await { if let Err(err) = recv_stream.read_exact(&mut u32_buf).await {
return if err == ReadExactError::FinishedEarly { return if err == ReadExactError::FinishedEarly {
Ok(None) Ok(None)
} else { } else {
@ -13,7 +17,7 @@ pub async fn read_control_message<M: DeserializeOwned>(
todo!() todo!()
}; };
} }
let control_message_length = u16::from_be_bytes(u16_buf) as usize; let control_message_length = u32::from_be_bytes(u32_buf) as usize;
let mut control_message_bytes: Vec<u8> = vec![0u8; control_message_length]; let mut control_message_bytes: Vec<u8> = vec![0u8; control_message_length];
@ -21,5 +25,43 @@ pub async fn read_control_message<M: DeserializeOwned>(
.read_exact(&mut control_message_bytes[..]) .read_exact(&mut control_message_bytes[..])
.await?; .await?;
Ok(todo!()) Ok(Some(serde_bare::from_slice(&control_message_bytes[..])?))
}
pub async fn send_control_message<M: Serialize>(
send_stream: &mut SendStream,
message: &M,
) -> anyhow::Result<()> {
let data_buf = serde_bare::to_vec(&message)?;
let data_len: u32 = data_buf.len().try_into()?;
send_stream.write_all(&data_len.to_be_bytes()).await?;
send_stream.write_all(&data_buf).await?;
send_stream.flush().await?;
Ok(())
}
pub async fn hello_handshake(
software_version: String,
tx: &mut SendStream,
rx: &mut RecvStream,
) -> anyhow::Result<String> {
let this_side_message = HelloMessage::new(software_version);
send_control_message(tx, &this_side_message).await?;
let other_side_hello_message: HelloMessage = read_control_message(rx)
.await?
.ok_or_else(|| anyhow!("EOF whilst waiting for hello message"))?;
if this_side_message.protocol_version != other_side_hello_message.protocol_version {
bail!(
"Not compatible with remote. Local ({:?}) protocol: {:?}. Remote ({:?}) protocol: {:?}.",
this_side_message.software_version,
this_side_message.protocol_version,
other_side_hello_message.software_version,
other_side_hello_message.protocol_version
);
}
Ok(other_side_hello_message.software_version)
} }

View File

@ -32,3 +32,6 @@ quinn = { version = "0.8.0", features = [] }
zstd = "0.9.2+zstd.1.5.1" zstd = "0.9.2+zstd.1.5.1"
rustls = "0.20.2" rustls = "0.20.2"
rcgen = { version = "0.8.14", features = ["x509-parser"] } rcgen = { version = "0.8.14", features = ["x509-parser"] }
# Common
olivefs_common = { path = "../olivefs_common" }

View File

@ -1,6 +1,7 @@
use anyhow::anyhow; use anyhow::anyhow;
use clap::Parser; use clap::Parser;
use futures_util::StreamExt; use futures_util::StreamExt;
use olivefs_common::io::{read_file, write_file};
use quinn::Endpoint; use quinn::Endpoint;
use rcgen::{BasicConstraints, Certificate, CertificateParams, IsCa, KeyPair}; use rcgen::{BasicConstraints, Certificate, CertificateParams, IsCa, KeyPair};
use rustls::internal::msgs::codec::Codec; use rustls::internal::msgs::codec::Codec;
@ -28,19 +29,6 @@ pub enum Command {
}, },
} }
pub async fn write_file(path: &Path, content: &[u8]) -> anyhow::Result<()> {
let mut file = File::create(path).await?;
file.write_all(content).await?;
Ok(())
}
pub async fn read_file(path: &Path) -> anyhow::Result<Vec<u8>> {
let mut file = File::open(path).await?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes).await?;
Ok(bytes)
}
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let command: Command = Command::parse(); let command: Command = Command::parse();