diff options
author | 2022-02-06 22:26:21 +0800 | |
---|---|---|
committer | 2022-02-06 14:26:21 +0000 | |
commit | 9d143dab6a4261868e508134ef9944e34086fc57 (patch) | |
tree | 39530707f64a4d71240b55a4e9f24f99812b2d9c /src/server.rs | |
parent | dc5ba42e0a7ddafe5896cfcc693a94764bbafb02 (diff) | |
download | rathole-9d143dab6a4261868e508134ef9944e34086fc57.tar.gz rathole-9d143dab6a4261868e508134ef9944e34086fc57.tar.zst rathole-9d143dab6a4261868e508134ef9944e34086fc57.zip |
fix: reimplement `retry_notify` with signals (#123)
Diffstat (limited to '')
-rw-r--r-- | src/server.rs | 39 |
1 files changed, 11 insertions, 28 deletions
diff --git a/src/server.rs b/src/server.rs index 02a3e37..abd181b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType}; use crate::config_watcher::ServiceChange; use crate::constants::{listen_backoff, UDP_BUFFER_SIZE}; -use crate::helper::retry_notify; +use crate::helper::retry_notify_with_deadline; use crate::multi_map::MultiMap; use crate::protocol::Hello::{ControlChannelHello, DataChannelHello}; use crate::protocol::{ @@ -509,21 +509,15 @@ fn tcp_listen_and_send( let (tx, rx) = mpsc::channel(CHAN_SIZE); tokio::spawn(async move { - let l = retry_notify!(listen_backoff(), { - match shutdown_rx.try_recv() { - Err(broadcast::error::TryRecvError::Closed) => Ok(None), - _ => TcpListener::bind(&addr).await.map(Some) - } + let l = retry_notify_with_deadline(listen_backoff(), || async { + Ok(TcpListener::bind(&addr).await?) }, |e, duration| { error!("{:#}. Retry in {:?}", e, duration); - }) + }, &mut shutdown_rx).await .with_context(|| "Failed to listen for the service"); let l: TcpListener = match l { - Ok(v) => match v { - Some(v) => v, - None => return - }, + Ok(v) => v, Err(e) => { error!("{:#}", e); return; @@ -628,27 +622,16 @@ async fn run_udp_connection_pool<T: Transport>( ) -> Result<()> { // TODO: Load balance - let l = retry_notify!( + let l = retry_notify_with_deadline( listen_backoff(), - { - match shutdown_rx.try_recv() { - Err(broadcast::error::TryRecvError::Closed) => Ok(None), - _ => UdpSocket::bind(&bind_addr).await.map(Some), - } - }, + || async { Ok(UdpSocket::bind(&bind_addr).await?) }, |e, duration| { warn!("{:#}. Retry in {:?}", e, duration); - } - ) - .with_context(|| "Failed to listen for the service"); - - let l = match l { - Ok(v) => match v { - Some(l) => l, - None => return Ok(()), }, - Err(e) => return Err(e), - }; + &mut shutdown_rx, + ) + .await + .with_context(|| "Failed to listen for the service")?; info!("Listening at {}", &bind_addr); |