Flesh out a bit of both sides, ready for issuing commands

This commit is contained in:
Olivier 'reivilibre' 2022-01-20 09:08:29 +00:00
parent ac18b830e7
commit a298b42672
9 changed files with 123 additions and 14 deletions

View File

@ -7,8 +7,10 @@ use libc::{ENOSYS, EPERM};
use log::{debug, warn};
use std::ffi::OsStr;
use crate::Requester;
use std::os::raw::c_int;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
pub mod encryption;
@ -20,6 +22,8 @@ pub struct OliveFilesystemSettings {}
// TODO support multiple filesystems per FUSE fs so that operations between multiple of them
// can be made efficient.
pub struct OliveFilesystem {
pub requester: Arc<Requester>,
pub tokio_runtime: tokio::runtime::Handle,
pub settings: OliveFilesystemSettings,
}
@ -27,7 +31,7 @@ impl Filesystem for OliveFilesystem {
/// Initialize filesystem.
/// Called before any other filesystem method.
/// The kernel module connection can be configured using the KernelConfig object
fn init(&mut self, _req: &Request<'_>, _config: &mut KernelConfig) -> Result<(), c_int> {
fn init(&mut self, req: &Request<'_>, _config: &mut KernelConfig) -> Result<(), c_int> {
// TODO config has some interesting values.
Ok(())
}

View File

@ -1,11 +1,15 @@
use crate::configuration::OlivefsClientConfiguration;
use crate::filesystem::{OliveFilesystem, OliveFilesystemSettings};
use crate::requester::Requester;
use anyhow::{anyhow, Context};
use clap::Parser;
use env_logger::Env;
use olivefs_common::io::read_file;
use std::net::ToSocketAddrs;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::task::spawn_blocking;
pub mod configuration;
pub mod filesystem;
@ -44,7 +48,7 @@ async fn main() -> anyhow::Result<()> {
.next()
.ok_or_else(|| anyhow!("No socket addresses found for connect_to string. Perhaps no DNS resolution?"))?;
requester::RequesterInternal::connect(
let requester_internal = requester::RequesterInternal::connect(
&config.connection.ca_certificate,
&config.connection.client_certificate,
&config.connection.client_key,
@ -52,7 +56,11 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
let requester = Requester::new(requester_internal);
let fs = OliveFilesystem {
requester: Arc::new(requester),
tokio_runtime: Handle::current(),
settings: OliveFilesystemSettings {},
};
@ -66,7 +74,11 @@ async fn main() -> anyhow::Result<()> {
.wait()
.await?;
fuser::mount2(fs, mount_at, &[]).context("Whilst mounting fuse filesystem")?;
spawn_blocking(|| -> anyhow::Result<()> {
fuser::mount2(fs, mount_at, &[]).context("Whilst mounting fuse filesystem")?;
Ok(())
})
.await??;
}
}

View File

@ -12,7 +12,7 @@ use std::str::FromStr;
use std::sync::Arc;
use olivefs_common::io::read_file;
use olivefs_common::messages::{DataCommand, DataResponse};
use olivefs_common::messages::{DataCommand, DataResponse, VnodeId};
use olivefs_common::networking::{
hello_handshake, read_bare_message, send_bare_message, ALPN_PROTOCOL,
};
@ -135,3 +135,13 @@ impl RequesterInternal {
.await
}
}
impl Requester {
pub fn new(internal: RequesterInternal) -> Requester {
Requester { internal }
}
pub async fn getattr(&mut self, _vnode: VnodeId) -> anyhow::Result<DataResponse<()>> {
todo!()
}
}

View File

@ -7,5 +7,7 @@ server_key = "ca/server.key"
server_certificate = "ca/server.crt"
ca_certificate = "ca/ca.crt"
[clients.bread]
[service]
root = "/home/rei/Music"
[clients.bread]

View File

@ -6,6 +6,8 @@ use std::path::PathBuf;
pub struct OlivefsServerConfiguration {
pub listen: ListenConfiguration,
pub service: ServiceConfiguration,
#[serde(default)]
pub clients: BTreeMap<String, ClientConfiguration>,
}
@ -28,5 +30,11 @@ pub struct ListenConfiguration {
pub ca_certificate: PathBuf,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
/// Will be replaced by 'volumes'/multiple filesystems in a future version...?
pub struct ServiceConfiguration {
pub root: PathBuf,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClientConfiguration {}

View File

@ -6,6 +6,7 @@ use clap::Parser;
use env_logger::Env;
use olivefs_common::io::read_file;
use std::path::PathBuf;
use std::sync::Arc;
pub mod generate_certificates;
@ -41,7 +42,7 @@ async fn main() -> anyhow::Result<()> {
let config: OlivefsServerConfiguration = toml::from_slice(&config_bytes)
.context("Whilst trying to parse configuration file")?;
listen(&config).await?;
listen(Arc::new(config)).await?;
}
Command::GenerateCertificate { ca_dir, name } => {
generate_certificates(ca_dir, name).await?;

View File

@ -1,4 +1,6 @@
use crate::configuration::OlivefsServerConfiguration;
use crate::server::connections::handle_new_streams;
use crate::server::file_access::{ClientInformation, ClientSpecificState, ServerwideState};
use anyhow::{anyhow, Context};
use futures_util::StreamExt;
use log::{error, info, warn};
@ -16,7 +18,7 @@ use x509_parser::traits::FromDer;
pub mod connections;
pub mod file_access;
pub async fn listen(config: &OlivefsServerConfiguration) -> anyhow::Result<()> {
pub async fn listen(config: Arc<OlivefsServerConfiguration>) -> anyhow::Result<()> {
let socket_addr = config
.listen
.listen_to
@ -27,6 +29,8 @@ pub async fn listen(config: &OlivefsServerConfiguration) -> anyhow::Result<()> {
anyhow!("No socket addresses found for connect_to string. Perhaps no DNS resolution?")
})?;
let server_state = Arc::new(ServerwideState::default());
let mut root_cert_store = RootCertStore::empty();
let ca_cert_der = read_file(&config.listen.ca_certificate).await?;
let ca_cert = rustls::Certificate(ca_cert_der);
@ -54,8 +58,10 @@ pub async fn listen(config: &OlivefsServerConfiguration) -> anyhow::Result<()> {
info!("Listening on {:?}.", socket_addr);
while let Some(next_connecting) = incoming.next().await {
let server_state = server_state.clone();
let config = config.clone();
tokio::spawn(async {
if let Err(error) = establish_connection(next_connecting).await {
if let Err(error) = establish_connection(next_connecting, server_state, config).await {
error!("Error in connection: {:?}", error);
}
});
@ -64,7 +70,11 @@ pub async fn listen(config: &OlivefsServerConfiguration) -> anyhow::Result<()> {
Ok(())
}
pub async fn establish_connection(connecting: Connecting) -> anyhow::Result<()> {
pub async fn establish_connection(
connecting: Connecting,
server_state: Arc<ServerwideState>,
config: Arc<OlivefsServerConfiguration>,
) -> anyhow::Result<()> {
let mut new_conn = connecting.await?;
info!(
@ -115,7 +125,7 @@ pub async fn establish_connection(connecting: Connecting) -> anyhow::Result<()>
}
}
info!("Accepted names of the client: {:?}", client_names);
info!("Names of the client: {:?}", client_names);
let (mut incoming_tx, mut incoming_rx) = new_conn
.bi_streams
@ -131,5 +141,28 @@ pub async fn establish_connection(connecting: Connecting) -> anyhow::Result<()>
.await?;
info!("Handshaked client successfully: {:?}", client_version);
// Determine the client's name
let chosen_name = client_names
.iter()
.filter(|k| config.clients.contains_key(*k))
.next()
.ok_or_else(|| anyhow!("No valid client names."))?;
let client_config = &config.clients[chosen_name];
let root = config.service.root.clone();
// Now determine what the client is allowed to do
let client_info = Arc::new(ClientInformation { root });
let client_state = Arc::new(ClientSpecificState::default());
tokio::spawn(async {
if let Err(error) =
handle_new_streams(new_conn, client_info, client_state, server_state).await
{
error!("Error handling new streams: {:?}", error);
}
});
Ok(())
}

View File

@ -1,17 +1,51 @@
use crate::server::file_access::{ClientInformation, ClientSpecificState, ServerwideState};
use anyhow::Context;
use futures_util::StreamExt;
use log::error;
use olivefs_common::messages::DataCommand;
use olivefs_common::networking::read_bare_message;
use quinn::{RecvStream, SendStream};
use quinn::{NewConnection, RecvStream, SendStream};
use std::sync::Arc;
pub async fn handle_command_stream(mut tx: SendStream, mut rx: RecvStream) -> anyhow::Result<()> {
pub async fn handle_command_stream(
_tx: SendStream,
mut rx: RecvStream,
_client_info: Arc<ClientInformation>,
_client_state: Arc<ClientSpecificState>,
_server_state: Arc<ServerwideState>,
) -> anyhow::Result<()> {
while let Some(command) = read_bare_message::<DataCommand>(&mut rx)
.await
.context("Whilst waiting for Data Command")?
{
match command {
DataCommand::GetAttr { vnode } => {}
DataCommand::GetAttr { vnode: _ } => {}
}
}
Ok(())
}
pub async fn handle_new_streams(
mut connection: NewConnection,
client_info: Arc<ClientInformation>,
client_state: Arc<ClientSpecificState>,
server_state: Arc<ServerwideState>,
) -> anyhow::Result<()> {
while let Some(stream) = connection.bi_streams.next().await {
let (tx, rx) = stream?;
let client_info = client_info.clone();
let client_state = client_state.clone();
let server_state = server_state.clone();
tokio::spawn(async {
if let Err(error) =
handle_command_stream(tx, rx, client_info, client_state, server_state).await
{
error!("Command stream handler failed: {:?}", error);
}
});
}
Ok(())
}

View File

@ -4,15 +4,20 @@ use std::sync::Arc;
use tokio::sync::RwLock;
/// Server-wide state that all clients might need to mess with
#[derive(Default)]
pub struct ServerwideState {
// TODO file locks — don't know if we need to manually handle conflicts between clients?
}
/// Static information about the client, including what they can access.
pub struct ClientInformation {}
pub struct ClientInformation {
/// The root path of the volume we are serving to this client.
pub root: PathBuf,
}
/// Client-specific state that we should clean up once the client vanishes.
/// Should not be accessible by other clients.
#[derive(Default)]
pub struct ClientSpecificState {
/// Inodes that we are telling the client about.
/// They can eventually be forgotten.