diff options
author | 2023-10-24 15:45:08 -0700 | |
---|---|---|
committer | 2023-10-31 11:57:24 -0700 | |
commit | 8953b3c9c7de8d6940995d0c114275e4eb13a805 (patch) | |
tree | ede3531a3914dd00ef3553abb48f1c90e74e33cf | |
parent | 83d9168ab6f76302ae846cb068cc8991f2b06479 (diff) | |
download | quiche-lohith/avoid_over_buffering.tar.gz quiche-lohith/avoid_over_buffering.tar.zst quiche-lohith/avoid_over_buffering.zip |
When using packet pacing at the lower layers there can be less datalohith/avoid_over_buffering
in the network than in_flight data. This is due to buffering the packets
at lower layers until the sent time of the packet. When using BBR as the
CC algorithm and when we are checking the condition to exit the drain
phase we need to consider only the data which is in the network and not
the data buffered in the host. If we consider the inflight(host buffered
+ in the network) data then we exit the drain phase later than expected
leading to low throughput.
-rw-r--r-- | quiche/src/recovery/bbr/mod.rs | 6 | ||||
-rw-r--r-- | quiche/src/recovery/bbr/per_ack.rs | 9 | ||||
-rw-r--r-- | quiche/src/recovery/mod.rs | 216 |
3 files changed, 227 insertions, 4 deletions
diff --git a/quiche/src/recovery/bbr/mod.rs b/quiche/src/recovery/bbr/mod.rs index 07683dc0..74d477f8 100644 --- a/quiche/src/recovery/bbr/mod.rs +++ b/quiche/src/recovery/bbr/mod.rs @@ -605,7 +605,7 @@ mod tests { } // Stop at right before filled_pipe=true. - for _ in 0..5 { + for _ in 0..6 { let pkt = Sent { pkt_num: pn, frames: smallvec![], @@ -640,9 +640,9 @@ mod tests { let mut acked = ranges::RangeSet::default(); - // We sent 5 packets, but ack only one, to stay + // We sent 6 packets, but ack only one, to stay // in Drain state. - acked.insert(0..pn - 4); + acked.insert(0..pn - 5); assert_eq!( r.on_ack_received( diff --git a/quiche/src/recovery/bbr/per_ack.rs b/quiche/src/recovery/bbr/per_ack.rs index 22f3dd26..38d853b4 100644 --- a/quiche/src/recovery/bbr/per_ack.rs +++ b/quiche/src/recovery/bbr/per_ack.rs @@ -257,13 +257,20 @@ fn bbr_check_drain(r: &mut Recovery, now: Instant) { } if r.bbr_state.state == BBRStateMachine::Drain && - r.bytes_in_flight <= bbr_inflight(r, 1.0) + bbr_bytes_in_net(r, now) <= bbr_inflight(r, 1.0) { // we estimate queue is drained bbr_enter_probe_bw(r, now); } } +fn bbr_bytes_in_net(r: &mut Recovery, now: Instant) -> usize { + let buffered = r.get_host_buffered(now); + let in_flight = r.bytes_in_flight; + + in_flight.saturating_sub(buffered) +} + // 4.3.4.3. Gain Cycling Algorithm fn bbr_enter_probe_bw(r: &mut Recovery, now: Instant) { let bbr = &mut r.bbr_state; diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs index a7f0b35a..128f4039 100644 --- a/quiche/src/recovery/mod.rs +++ b/quiche/src/recovery/mod.rs @@ -400,6 +400,24 @@ impl Recovery { trace!("{} {:?}", trace_id, self); } + // Limit data buffered in host send buffer to avoid bufferbloat + // at the sender. + pub fn get_host_buffered(&mut self, now: Instant) -> usize { + let mut buffered: usize = 0; + + for epoch in 0..packet::Epoch::count() { + for bytes in self.sent[epoch].iter() + // Skip packets that have already been sent, acked or lost, or that are not in-flight. + .filter(|p| p.in_flight && p.time_acked.is_none() && p.time_lost.is_none() && p.time_sent > now) + .map(|x| x.size) + { + buffered += bytes; + } + } + + buffered + } + fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) { (self.cc_ops.on_packet_sent)(self, sent_bytes, now); } @@ -2262,6 +2280,204 @@ mod tests { now + Duration::from_secs_f64(12000.0 / pacing_rate as f64) ); } + + #[test] + fn test_host_buffered() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); + + let mut r = Recovery::new(&cfg); + + let mut now = Instant::now(); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 0); + + // Send out the first packet (a full initcwnd). + let p = Sent { + pkt_num: 0, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 12000, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 1); + assert_eq!(r.bytes_in_flight, 12000); + assert_eq!(r.get_host_buffered(now), 0); + + // First packet will be sent out immediately. + assert_eq!(r.pacer.rate(), 0); + assert_eq!(r.get_packet_send_time(), now); + + // Wait 50ms for ACK. + now += Duration::from_millis(50); + + let mut acked = ranges::RangeSet::default(); + acked.insert(0..1); + + assert_eq!( + r.on_ack_received( + &acked, + 10, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + &mut Vec::new(), + ), + Ok((0, 0)) + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 0); + assert_eq!(r.bytes_in_flight, 0); + assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50)); + + // 1 MSS increased. + assert_eq!(r.congestion_window, 12000 + 1200); + + // Send out second packet. + let p = Sent { + pkt_num: 1, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6000, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 1); + assert_eq!(r.bytes_in_flight, 6000); + + // Pacing is not done during initial phase of connection. + assert_eq!(r.get_packet_send_time(), now); + + // Send the third packet out. + let p = Sent { + pkt_num: 2, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6000, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 2); + assert_eq!(r.bytes_in_flight, 12000); + assert_eq!(r.get_host_buffered(now), 0); + + // Send the fourth packet out. + let p = Sent { + pkt_num: 3, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 5000, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 3); + assert_eq!(r.bytes_in_flight, 17000); + assert_eq!(r.get_host_buffered(now), 5000); + + let p = Sent { + pkt_num: 3, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 1200, + ack_eliciting: true, + in_flight: false, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::Epoch::Application, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::Epoch::Application].len(), 4); + assert_eq!(r.bytes_in_flight, 17000); + assert_eq!(r.get_host_buffered(now), 5000); + } } mod bbr; |