account: impl exponential backoff when retrying connection

When connection fails to be established because of network issues,
perform an exponential back-off reconnection.

https://cloud.google.com/iot/docs/how-tos/exponential-backoff

This will limit maximum waiting time before next attempt and also
prevent reconnecting without wait when there's no reason to (network or
remote server is down).

The algorithm is really crude:

Instead of uniformly sampling 1..=1000 milliseconds, we sample (4 *
random::<u8>()) which is at most 4 * 255 = 1020 which is good enough.

1. Try to connect immediately
2. If it fails, set retries = 1.
3. Try to reconnect after retries * (4 * random::<u8>())
4. If it fails, set retries *= 2 = 2.
5. Try to reconnect after retries * (4 * random::<u8>())
6. If it fails, set retries *= 2 = 4.
7. Try to reconnect after retries * (4 * random::<u8>())
8. If it fails, set retries *= 2 = 8. Stop increasing retries from now
   on.
9. Try to reconnect in a loop after retries * (4 * random::<u8>())
pull/231/head
Manos Pitsidianakis 2023-06-19 00:13:53 +03:00
parent 5699baecfb
commit b05d929975
Signed by: Manos Pitsidianakis
GPG Key ID: 7729C7707F7E09D0
2 changed files with 126 additions and 88 deletions

View File

@ -35,6 +35,7 @@ use std::{
pin::Pin,
result,
sync::{Arc, RwLock},
time::Duration,
};
use futures::{
@ -166,11 +167,39 @@ impl MailboxEntry {
}
}
#[derive(Debug, Default, Clone)]
pub enum IsOnline {
#[default]
Uninit,
True,
Err {
value: Error,
retries: u64,
},
}
impl IsOnline {
is_variant! { is_uninit, Uninit }
is_variant! { is_true, True }
is_variant! { is_err, Err { .. } }
fn set_err(&mut self, err: Error) {
if let Self::Err { ref mut value, .. } = self {
*value = err;
} else {
*self = Self::Err {
value: err,
retries: 1,
};
}
}
}
#[derive(Debug)]
pub struct Account {
pub name: String,
pub hash: AccountHash,
pub is_online: Result<()>,
pub is_online: IsOnline,
pub mailbox_entries: IndexMap<MailboxHash, MailboxEntry>,
pub mailboxes_order: Vec<MailboxHash>,
pub tree: Vec<MailboxNode>,
@ -355,19 +384,14 @@ impl core::fmt::Display for JobRequest {
}
impl JobRequest {
pub fn is_watch(&self) -> bool {
matches!(self, JobRequest::Watch { .. })
}
is_variant! { is_watch, Watch { .. } }
is_variant! { is_online, IsOnline { .. } }
pub fn is_fetch(&self, mailbox_hash: MailboxHash) -> bool {
matches!(self, JobRequest::Fetch {
mailbox_hash: h, ..
} if *h == mailbox_hash)
}
pub fn is_online(&self) -> bool {
matches!(self, JobRequest::IsOnline { .. })
}
}
impl Drop for Account {
@ -540,9 +564,9 @@ impl Account {
hash,
name,
is_online: if !backend.capabilities().is_remote {
Ok(())
IsOnline::True
} else {
Err(Error::new("Attempting connection."))
IsOnline::Uninit
},
mailbox_entries: Default::default(),
mailboxes_order: Default::default(),
@ -1126,6 +1150,7 @@ impl Account {
pub fn len(&self) -> usize {
self.tree.len()
}
pub fn is_empty(&self) -> bool {
self.tree.is_empty()
}
@ -1573,39 +1598,80 @@ impl Account {
/* Call only in Context::is_online, since only Context can launch the watcher
* threads if an account goes from offline to online. */
pub fn is_online(&mut self) -> Result<()> {
if !self.backend_capabilities.is_remote && !self.backend_capabilities.is_async {
return Ok(());
}
if self.is_online.is_err()
&& self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication()
{
return self.is_online.clone();
}
if self.is_online.is_ok() {
return Ok(());
}
if !self.active_jobs.values().any(JobRequest::is_online) {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler
.job_executor
.spawn_specialized(online_job)
let (ret, wait) = match &mut self.is_online {
IsOnline::Uninit => (Err(Error::new("Attempting connection.")), None),
IsOnline::True => return Ok(()),
IsOnline::Err {
value,
ref mut retries,
} => {
let ret = Err(value.clone());
if value.kind.is_authentication()
|| value.kind.is_bug()
|| value.kind.is_configuration()
|| value.kind.is_external()
|| (value.kind.is_network() && !value.kind.is_network_down())
|| value.kind.is_not_implemented()
|| value.kind.is_not_supported()
|| value.kind.is_protocol_error()
|| value.kind.is_protocol_not_supported()
|| value.kind.is_value_error()
{
return ret;
}
let wait = if value.kind.is_timeout()
|| value.kind.is_network_down()
|| value.kind.is_oserror()
{
let oldval = *retries;
if oldval != 8 {
*retries *= 2;
}
Some(Duration::from_millis(
oldval * (4 * melib::utils::random::random_u8() as u64),
))
} else {
self.main_loop_handler
None
};
(ret, wait)
}
};
if !self.active_jobs.values().any(JobRequest::is_online) {
let online_fut = self.backend.read().unwrap().is_online();
if let Ok(online_fut) = online_fut {
use melib::utils::futures::sleep;
let handle = match (wait, self.backend_capabilities.is_async) {
(Some(wait), true) => {
self.main_loop_handler
.job_executor
.spawn_specialized(async move {
sleep(wait).await;
online_fut.await
})
}
(None, true) => self
.main_loop_handler
.job_executor
.spawn_blocking(online_job)
.spawn_specialized(online_fut),
(Some(wait), false) => {
self.main_loop_handler
.job_executor
.spawn_blocking(async move {
sleep(wait).await;
online_fut.await
})
}
(None, false) => self
.main_loop_handler
.job_executor
.spawn_blocking(online_fut),
};
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
}
}
self.is_online.clone()
ret
}
pub fn search(
@ -1672,7 +1738,10 @@ impl Account {
Some(crate::types::NotificationType::Error(err.kind)),
),
));
self.is_online = Err(err);
self.is_online.set_err(err);
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::AccountStatusChange(self.hash, None),
));
return true;
}
let mailboxes_job = self.backend.read().unwrap().mailboxes();
@ -1784,21 +1853,19 @@ impl Account {
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::AccountStatusChange(self.hash, None),
));
if is_online.is_ok() {
if self.is_online.is_err()
&& !self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication()
{
self.watch();
match is_online {
Ok(()) => {
if matches!(self.is_online, IsOnline::Err { ref value, ..} if !value.kind.is_authentication())
{
self.watch();
}
self.is_online = IsOnline::True;
return true;
}
Err(value) => {
self.is_online = IsOnline::Err { value, retries: 1 };
}
self.is_online = Ok(());
return true;
}
self.is_online = is_online;
}
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
@ -1819,47 +1886,18 @@ impl Account {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Ok(()))) => {
if self.is_online.is_err()
&& !self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication()
if matches!(self.is_online, IsOnline::Err { ref value, ..} if !value.kind.is_authentication())
{
self.watch();
}
if !(self.is_online.is_err()
&& self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication())
{
self.is_online = Ok(());
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::AccountStatusChange(self.hash, None),
));
}
self.is_online = IsOnline::True;
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::AccountStatusChange(self.hash, None),
));
}
Ok(Some(Err(err))) => {
if !err.kind.is_authentication() {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let handle = if self.backend_capabilities.is_async {
self.main_loop_handler
.job_executor
.spawn_specialized(online_job)
} else {
self.main_loop_handler
.job_executor
.spawn_blocking(online_job)
};
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
};
}
self.is_online = Err(err);
self.is_online.set_err(err);
_ = self.is_online();
self.main_loop_handler.send(ThreadEvent::UIEvent(
UIEvent::AccountStatusChange(self.hash, None),
));

View File

@ -157,7 +157,7 @@ impl Context {
ref mut replies,
..
} = self;
let was_online = accounts[account_pos].is_online.is_ok();
let was_online = accounts[account_pos].is_online.is_true();
let ret = accounts[account_pos].is_online();
if ret.is_ok() && !was_online {
log::trace!("inserting mailbox hashes:");