mutually exclude submissions of the same chunks
This commit is contained in:
parent
dc9885947c
commit
9fff39d66a
@ -4,6 +4,8 @@ use serde::{Deserialize, Serialize};
|
|||||||
|
|
||||||
use crate::chunking::calculate_chunkid;
|
use crate::chunking::calculate_chunkid;
|
||||||
use crate::definitions::{ChunkId, PointerData};
|
use crate::definitions::{ChunkId, PointerData};
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::sync::{Condvar, Mutex};
|
||||||
|
|
||||||
pub mod compression;
|
pub mod compression;
|
||||||
pub mod encryption;
|
pub mod encryption;
|
||||||
@ -99,11 +101,17 @@ impl RawPile for Box<dyn RawPile> {
|
|||||||
|
|
||||||
pub struct Pile<R: RawPile> {
|
pub struct Pile<R: RawPile> {
|
||||||
pub raw_pile: R,
|
pub raw_pile: R,
|
||||||
|
pub racy_submission_mutex: Mutex<HashSet<ChunkId>>,
|
||||||
|
pub racy_submission_condvar: Condvar,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: RawPile> Pile<R> {
|
impl<R: RawPile> Pile<R> {
|
||||||
pub fn new(raw_pile: R) -> Self {
|
pub fn new(raw_pile: R) -> Self {
|
||||||
Pile { raw_pile }
|
Pile {
|
||||||
|
raw_pile,
|
||||||
|
racy_submission_mutex: Mutex::new(Default::default()),
|
||||||
|
racy_submission_condvar: Default::default(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(clarity, features): have a special kind of error for verification failures
|
// TODO(clarity, features): have a special kind of error for verification failures
|
||||||
@ -182,8 +190,24 @@ impl<R: RawPile> Pile<R> {
|
|||||||
|
|
||||||
pub fn submit_chunk(&self, chunk_data: &[u8]) -> anyhow::Result<ChunkId> {
|
pub fn submit_chunk(&self, chunk_data: &[u8]) -> anyhow::Result<ChunkId> {
|
||||||
let chunk_id = calculate_chunkid(chunk_data);
|
let chunk_id = calculate_chunkid(chunk_data);
|
||||||
if !self.chunk_exists(&chunk_id)? {
|
|
||||||
self.write_chunk(&chunk_id, chunk_data)?;
|
let mut racy_submissions = self.racy_submission_mutex.lock().unwrap();
|
||||||
|
if racy_submissions.insert(chunk_id) {
|
||||||
|
drop(racy_submissions);
|
||||||
|
if !self.chunk_exists(&chunk_id)? {
|
||||||
|
self.write_chunk(&chunk_id, chunk_data)?;
|
||||||
|
}
|
||||||
|
racy_submissions = self.racy_submission_mutex.lock().unwrap();
|
||||||
|
racy_submissions.remove(&chunk_id);
|
||||||
|
// wake up anyone who might be waiting for this chunk
|
||||||
|
self.racy_submission_condvar.notify_all();
|
||||||
|
} else {
|
||||||
|
loop {
|
||||||
|
racy_submissions = self.racy_submission_condvar.wait(racy_submissions).unwrap();
|
||||||
|
if !racy_submissions.contains(&chunk_id) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(chunk_id)
|
Ok(chunk_id)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user