diff options
-rw-r--r-- | quiche/src/recovery/bbr/mod.rs | 6 | ||||
-rw-r--r-- | quiche/src/recovery/bbr/per_ack.rs | 9 | ||||
-rw-r--r-- | quiche/src/recovery/mod.rs | 216 |
3 files changed, 227 insertions, 4 deletions
diff --git a/quiche/src/recovery/bbr/mod.rs b/quiche/src/recovery/bbr/mod.rs index 07683dc0..74d477f8 100644 --- a/quiche/src/recovery/bbr/mod.rs +++ b/quiche/src/recovery/bbr/mod.rs @@ -605,7 +605,7 @@ mod tests { } // Stop at right before filled_pipe=true. - for _ in 0..5 { + for _ in 0..6 { let pkt = Sent { pkt_num: pn, frames: smallvec![], @@ -640,9 +640,9 @@ mod tests { let mut acked = ranges::RangeSet::default(); - // We sent 5 packets, but ack only one, to stay + // We sent 6 packets, but ack only one, to stay // in Drain state. - acked.insert(0..pn - 4); + acked.insert(0..pn - 5); assert_eq!( r.on_ack_received( diff --git a/quiche/src/recovery/bbr/per_ack.rs b/quiche/src/recovery/bbr/per_ack.rs index 22f3dd26..38d853b4 100644 --- a/quiche/src/recovery/bbr/per_ack.rs +++ b/quiche/src/recovery/bbr/per_ack.rs @@ -257,13 +257,20 @@ fn bbr_check_drain(r: &mut Recovery, now: Instant) { } if r.bbr_state.state == BBRStateMachine::Drain && - r.bytes_in_flight <= bbr_inflight(r, 1.0) + bbr_bytes_in_net(r, now) <= bbr_inflight(r, 1.0) { // we estimate queue is drained bbr_enter_probe_bw(r, now); } } +fn bbr_bytes_in_net(r: &mut Recovery, now: Instant) -> usize { + let buffered = r.get_host_buffered(now); + let in_flight = r.bytes_in_flight; + + in_flight.saturating_sub(buffered) +} + // 4.3.4.3. Gain Cycling Algorithm fn bbr_enter_probe_bw(r: &mut Recovery, now: Instant) { let bbr = &mut r.bbr_state; diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs index a7f0b35a..128f4039 100644 --- a/quiche/src/recovery/mod.rs +++ b/quiche/src/recovery/mod.rs @@ -400,6 +400,24 @@ impl Recovery { trace!("{} {:?}", trace_id, self); } + // Limit data buffered in host send buffer to avoid bufferbloat + // at the sender. + pub fn get_host_buffered(&mut self, now: Instant) -> usize { + let mut buffered: usize = 0; + + for epoch in 0..packet::Epoch::count() { + for bytes in self.sent[epoch].iter() + // Skip packets that have already been sent, acked or lost, or that are not in-flight. + .filter(|p| p.in_flight && p.time_acked.is_none() && p.time_lost.is_none() && p.time_sent > now) + .map(|x| x.size) + { + buffered += bytes; + } + } + + buffered + } + fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) { (self.cc_ops.on_packet_sent)(self, sent_bytes, now); } @@ -2262,6 +2280,204 @@ mod tests { now + Duration::from_secs_f64(12000.0 / pacing_rate as f64) ); } + + #[test] + fn test_host_buffered() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); + + let mut r = Recovery::new(&cfg); + + let mut now = Instant::now(); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 0); + + // Send out the first packet (a full initcwnd). + let p = Sent { + pkt_num: 0, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 12000, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 1); + assert_eq!(r.bytes_in_flight, 12000); + assert_eq!(r.get_host_buffered(now), 0); + + // First packet will be sent out immediately. + assert_eq!(r.pacer.rate(), 0); + assert_eq!(r.get_packet_send_time(), now); + + // Wait 50ms for ACK. + now += Duration::from_millis(50); + + let mut acked = ranges::RangeSet::default(); + acked.insert(0..1); + + assert_eq!( + r.on_ack_received( + &acked, + 10, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + &mut Vec::new(), + ), + Ok((0, 0)) + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 0); + assert_eq!(r.bytes_in_flight, 0); + assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50)); + + // 1 MSS increased. + assert_eq!(r.congestion_window, 12000 + 1200); + + // Send out second packet. + let p = Sent { + pkt_num: 1, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6000, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 1); + assert_eq!(r.bytes_in_flight, 6000); + + // Pacing is not done during initial phase of connection. + assert_eq!(r.get_packet_send_time(), now); + + // Send the third packet out. + let p = Sent { + pkt_num: 2, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6000, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 2); + assert_eq!(r.bytes_in_flight, 12000); + assert_eq!(r.get_host_buffered(now), 0); + + // Send the fourth packet out. + let p = Sent { + pkt_num: 3, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 5000, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 3); + assert_eq!(r.bytes_in_flight, 17000); + assert_eq!(r.get_host_buffered(now), 5000); + + let p = Sent { + pkt_num: 3, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 1200, + ack_eliciting: true, + in_flight: false, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 4); + assert_eq!(r.bytes_in_flight, 17000); + assert_eq!(r.get_host_buffered(now), 5000); + } } mod bbr; |