1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
//! Checking of hashes of [`crate::resource`]s.
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fmt::{self, Debug};
use std::time::Duration;
use crate::{event, resource, utils, Manager, SelectedPier, Uuid};
/// A check to affirm the selected resources contain the same data.
///
/// For this to work, we assume the resulting hash is unique.
///
/// # Eq implementation
///
/// If the [receiver](Self::recipient) is the same, this is considered equal.
/// Only one of these conversations should be communicated at once, therefore the filter doesn't
/// matter.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[must_use]
pub struct Request {
pier: Uuid,
resources: resource::Matcher,
/// The actual timestamp
cutoff_timestamp: Duration,
/// The offset to get `cutoff_timestamp` from `now`.
/// To get when this request was created, take `cutoff_timestamp + offset`.
offset: Duration,
}
impl Request {
/// Creates a new request which tells `pier` to return all resources according to the `filter`.
/// Targets `now() - timestamp` as the rewind position.
///
/// `timestamp` should be before the middle of the log lifetime of the two piers conversing.
pub(crate) fn new(
pier: SelectedPier,
filter: resource::Matcher,
timestamp_offset: Duration,
) -> Self {
Self {
pier: pier.uuid(),
resources: filter,
cutoff_timestamp: utils::dur_now().saturating_sub(timestamp_offset),
offset: timestamp_offset,
}
}
/// Get the receiver's UUID.
#[inline]
pub fn recipient(&self) -> Uuid {
self.pier
}
/// Test if `resource` is included in the requested hash check.
#[inline]
#[must_use]
pub fn matches(&self, resource: &str) -> bool {
self.resources.matches(resource)
}
/// Get the request's cutoff offset.
#[inline]
pub(crate) fn cutoff_offset(&self) -> Duration {
self.offset
}
/// Get an unwinder to unwind the resources you will add to the map before calling
/// [`crate::Manager::apply_hash_check_reply`].
///
/// If you start doing this now, before getting the response, keep in mind you have to check
/// [`Response::different_cutoff`]. If that returns `true`, you have to call
/// [`Response::unwinder`] and start the process over again.
#[inline]
pub fn unwinder<'a>(&self, manager: &'a Manager) -> ResponseHashRewinder<'a> {
ResponseHashRewinder(manager.unwinder_to(utils::dur_to_systime(self.cutoff_timestamp)))
}
}
impl PartialEq for Request {
fn eq(&self, other: &Self) -> bool {
self.pier == other.pier
&& self.cutoff_timestamp == other.cutoff_timestamp
&& self.offset == other.offset
}
}
impl Eq for Request {}
/// The hash data for the response.
#[derive(Clone, Deserialize, Serialize, PartialEq, Eq)]
#[repr(transparent)]
pub struct ResponseHash([u8; 16]);
impl Debug for ResponseHash {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:x}", u128::from_le_bytes(self.0))
}
}
/// A response to [`Request`].
///
/// Contains the [`Self::hashes`] for all the resources the sender wants.
///
/// # Eq implementation
///
/// If the [receiver](Self::recipient) is the same, this is considered equal.
/// Only one of these conversations should be communicated at once, therefore the filter doesn't
/// matter.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[must_use]
pub struct Response {
pier: Uuid,
resources: resource::Matcher,
hashes: BTreeMap<String, ResponseHash>,
requested_cutoff_timestamp: Duration,
cutoff_timestamp: Duration,
}
impl Response {
/// Get the receiver's UUID.
#[inline]
pub fn recipient(&self) -> Uuid {
self.pier
}
/// Get a reference to the response's hashes.
#[must_use]
#[inline]
pub fn hashes(&self) -> &BTreeMap<String, ResponseHash> {
&self.hashes
}
/// Tests if the requested cutoff (in time) is the same as the one reponded with.
///
/// While the pier is handling the [`Request`] we sent, we can process our own response, to
/// be ready once we get this response. This checks if the remote was forced to change the
/// cutoff. If the returned value is true, we cannot rely on the hash check response we created
/// while the pier processed it.
#[must_use]
#[inline]
pub fn different_cutoff(&self) -> bool {
self.requested_cutoff_timestamp == self.cutoff_timestamp
}
/// Get the request's cutoff.
/// Used when constructing our own [hashes](Self::hashes) to then give to
/// [`crate::Manager::apply_hash_check_reply`].
#[inline]
pub(crate) fn cutoff_timestamp(&self) -> Duration {
self.cutoff_timestamp
}
/// Get an unwinder to unwind the resources you will add to the map before calling
/// [`crate::Manager::apply_hash_check_reply`].
///
/// The returned item should be reused for each resource.
pub fn unwinder<'a>(&self, manager: &'a Manager) -> ResponseHashRewinder<'a> {
ResponseHashRewinder(manager.unwinder_to(utils::dur_to_systime(self.cutoff_timestamp())))
}
/// Test if `resource` is included in the requested hash check.
#[inline]
#[must_use]
pub fn matches(&self, resource: &str) -> bool {
self.resources.matches(resource)
}
}
impl PartialEq for Response {
fn eq(&self, other: &Self) -> bool {
self.pier == other.pier && self.hashes == other.hashes
}
}
impl Eq for Response {}
/// A struct used to rewind resources before hashing them to call
/// [`crate::Manager::apply_hash_check_reply`].
#[derive(Debug)]
pub struct ResponseHashRewinder<'a>(event::Unwinder<'a>);
impl<'a> ResponseHashRewinder<'a> {
/// Unwind the resources before creating a `hash` for [`crate::Manager::apply_hash_check_reply`].
///
/// Call this for every resource.
#[inline]
pub fn unwinder(&mut self) -> &mut event::Unwinder<'a> {
self.0.clear_unwound();
&mut self.0
}
}
/// Builder struct for [`Response`].
#[derive(Debug)]
pub struct ResponseBuilder<'a>(Response, event::Unwinder<'a>);
impl<'a> ResponseBuilder<'a> {
pub(crate) fn new(
pier: Uuid,
request: Request,
selected_cutoff_offset: Duration,
unwinder: event::Unwinder<'a>,
) -> Self {
Self(
Response {
pier,
resources: request.resources,
hashes: BTreeMap::new(),
requested_cutoff_timestamp: request.cutoff_timestamp,
cutoff_timestamp: (request.cutoff_timestamp + request.offset)
.saturating_sub(selected_cutoff_offset),
},
unwinder,
)
}
/// It's a logic error to pass a `resource` that isn't included in the [`Request::matches`].
#[allow(clippy::needless_pass_by_value)] // The hasher is consumed for one resource.
#[inline]
pub fn insert(&mut self, resource: String, hash: ResponseHash) {
self.0.hashes.insert(resource, hash);
}
/// Unwind the resources before creating a `hash` for [`Self::insert`].
///
/// Call this for every resource.
#[inline]
pub fn unwinder(&mut self) -> &mut event::Unwinder<'a> {
self.1.clear_unwound();
&mut self.1
}
/// Get the built [`Response`].
#[inline]
pub fn finish(self) -> Response {
self.0
}
/// Test if `resource` is included in the requested hash check.
#[inline]
#[must_use]
pub fn matches(&self, resource: &str) -> bool {
self.0.resources.matches(resource)
}
}
/// A hash builder for adding the hashed signature of a `resource`.
/// Should be [created](Self::new), [written to](Self::write) and the
/// [added](ResponseBuilder::insert) when all the data is written.
#[allow(missing_debug_implementations)]
#[must_use]
pub struct ResponseHasher(xxhash_rust::xxh3::Xxh3);
impl ResponseHasher {
/// Creates a new, empty hasher.
///
/// Add data using [`Self::write`].
#[inline]
pub fn new() -> Self {
Self(xxhash_rust::xxh3::Xxh3::default())
}
/// Write data from resource to the internal hasher.
///
/// After all the data for one resource is written, call [`ResponseBuilder::insert`].
#[allow(clippy::inline_always)]
#[inline(always)]
pub fn write(&mut self, bytes: &[u8]) {
self.0.update(bytes);
}
/// Digest this hash.
#[must_use]
pub fn finish(self) -> ResponseHash {
ResponseHash(self.0.digest128().to_le_bytes())
}
}
impl Default for ResponseHasher {
fn default() -> Self {
Self::new()
}
}