1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//! Periodic events.

use crate::*;

/// returns true every denominator time, on average
fn one_in(denominator: f64, rng: &mut impl rand::Rng) -> bool {
    let mut random: f64 = rng.gen();
    random *= denominator;
    random < 1.0
}
pub async fn start<P: Platform>(
    options: &Options<P>,
    mgr: &Arc<Mutex<Manager>>,
    platform: &PlatformExt<P>,
) {
    // start at 1 so no matches below don't trigger at the first iteration
    let mut i: u64 = 1;
    loop {
        // flushing
        {
            flush(options, i % 10 == 0).await;
        }
        // event log
        if i % 10 == 0 {
            event_log_check(mgr, platform).await;
        }
        // hash check
        if i % 30 == 0 {
            hash_check(mgr, platform).await;
        }

        P::Rt::sleep(options.periodic_interval).await;
        i += 1;
    }
}
async fn flush<P: Platform>(options: &Options<P>, clear: bool) {
    let r = if clear {
        options.flush_out().await
    } else {
        options.flush().await
    };
    if let Err(err) = r {
        error!("Error while flushing data: {err:?}");
    };
}
pub async fn event_log_check(mgr: &Arc<Mutex<Manager>>, platform: &PlatformExt<impl Platform>) {
    let mut manager = mgr.lock().await;
    manager.clean_log_checks();

    if !manager.is_fast_forwarding()
        && manager.pier_count() > 0
        && one_in(manager.client_count() as f64, &mut *manager.rng())
    {
        let count = manager.event_log_limit() / 4;
        let msg = manager.process_event_log_check(count);
        drop(manager);
        if let Some((msg, conversation_uuid)) = msg {
            if let Err(err) = platform.send(&msg).await {
                error!("Error when trying to send event log check message: {err:?}");
            } else {
                let mgr = Arc::clone(mgr);
                assure_event_log_check(mgr, platform.clone(), conversation_uuid).await;
            }
        }
    }
}
pub async fn assure_event_log_check<P: Platform>(
    mgr: Arc<Mutex<Manager>>,
    platform: PlatformExt<P>,
    conversation_uuid: agde::Uuid,
) {
    P::Rt::spawn(async move {
        P::Rt::sleep(Duration::from_secs(8)).await;
        let mut manager = mgr.lock().await;
        if let Some(pier) = manager.assure_log_check(conversation_uuid) {
            let hc = manager.process_hash_check(pier).expect(
                "BUG: Internal agde state error, trying to send a \
            hash check after a event log check while fast forwarding.",
            );
            drop(manager);
            if let Err(err) = platform.send(&hc).await {
                error!("Error when trying to send hash check message: {err:?}");
            } else {
                // check that the pier responded.
                P::Rt::spawn(async move {
                    let pier = pier.uuid();
                    watch_hash_check(pier, &mgr, &platform).await;
                });
            }
        }
    });
}
async fn hash_check<P: Platform>(mgr: &Arc<Mutex<Manager>>, platform: &PlatformExt<P>) {
    let mut manager = mgr.lock().await;
    let other_persistent = manager
        .filter_piers(|_, capabilities| capabilities.persistent())
        .count();
    if !manager.is_fast_forwarding()
        && manager.capabilities().persistent()
        && other_persistent > 0
        && one_in((other_persistent + 1) as f64, &mut *manager.rng())
    {
        let pier = manager.hash_check_persistent();
        if let Some(pier) = pier {
            info!("Sending hash check to {}", pier.uuid());
            let msg = manager
                .process_hash_check(pier)
                .expect("BUG: Internal agde state error when trying to send a hash check");
            drop(manager);
            if let Err(err) = platform.send(&msg).await {
                error!("Error when trying to send hash check message: {err:?}");
            } else {
                let mgr = Arc::clone(mgr);
                let platform = platform.clone();
                // check that the pier responded.
                P::Rt::spawn(async move {
                    let pier = pier.uuid();
                    watch_hash_check(pier, &mgr, &platform).await;
                });
            }
        }
    }
}
async fn watch_hash_check<P: Platform>(
    mut pier: agde::Uuid,
    mgr: &Mutex<Manager>,
    platform: &PlatformExt<P>,
) {
    loop {
        P::Rt::sleep(Duration::from_secs(30)).await;
        let mut manager = mgr.lock().await;

        // we're still hash checking with the same pier
        if manager.hash_checking() == Some(pier) {
            info!("Hash check with {pier} failed. Retrying.");
            let action = manager.apply_cancelled(pier);

            if let agde::CancelAction::HashCheck(hc_pier) = action {
                let hc = manager.process_hash_check(hc_pier).expect(
                    "BUG: Internal agde state error when recovering from a failed hash check",
                );

                drop(manager);

                pier = hc_pier.uuid();

                if let Err(err) = platform.send(&hc).await {
                    error!(
                        "Error when trying to fast forward to \
                        other piers after one failed: {err:?}"
                    );
                };
                continue;
            }
        }
        break;
    }
}