aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Lohith Bellad <lohith@cloudflare.com> 2023-10-24 15:45:08 -0700
committerGravatar Lohith Bellad <lohith@cloudflare.com> 2023-10-31 11:57:24 -0700
commit8953b3c9c7de8d6940995d0c114275e4eb13a805 (patch)
treeede3531a3914dd00ef3553abb48f1c90e74e33cf
parent83d9168ab6f76302ae846cb068cc8991f2b06479 (diff)
downloadquiche-lohith/avoid_over_buffering.tar.gz
quiche-lohith/avoid_over_buffering.tar.zst
quiche-lohith/avoid_over_buffering.zip
When using packet pacing at the lower layers there can be less datalohith/avoid_over_buffering
in the network than in_flight data. This is due to buffering the packets at lower layers until the sent time of the packet. When using BBR as the CC algorithm and when we are checking the condition to exit the drain phase we need to consider only the data which is in the network and not the data buffered in the host. If we consider the inflight(host buffered + in the network) data then we exit the drain phase later than expected leading to low throughput.
-rw-r--r--quiche/src/recovery/bbr/mod.rs6
-rw-r--r--quiche/src/recovery/bbr/per_ack.rs9
-rw-r--r--quiche/src/recovery/mod.rs216
3 files changed, 227 insertions, 4 deletions
diff --git a/quiche/src/recovery/bbr/mod.rs b/quiche/src/recovery/bbr/mod.rs
index 07683dc0..74d477f8 100644
--- a/quiche/src/recovery/bbr/mod.rs
+++ b/quiche/src/recovery/bbr/mod.rs
@@ -605,7 +605,7 @@ mod tests {
}
// Stop at right before filled_pipe=true.
- for _ in 0..5 {
+ for _ in 0..6 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
@@ -640,9 +640,9 @@ mod tests {
let mut acked = ranges::RangeSet::default();
- // We sent 5 packets, but ack only one, to stay
+ // We sent 6 packets, but ack only one, to stay
// in Drain state.
- acked.insert(0..pn - 4);
+ acked.insert(0..pn - 5);
assert_eq!(
r.on_ack_received(
diff --git a/quiche/src/recovery/bbr/per_ack.rs b/quiche/src/recovery/bbr/per_ack.rs
index 22f3dd26..38d853b4 100644
--- a/quiche/src/recovery/bbr/per_ack.rs
+++ b/quiche/src/recovery/bbr/per_ack.rs
@@ -257,13 +257,20 @@ fn bbr_check_drain(r: &mut Recovery, now: Instant) {
}
if r.bbr_state.state == BBRStateMachine::Drain &&
- r.bytes_in_flight <= bbr_inflight(r, 1.0)
+ bbr_bytes_in_net(r, now) <= bbr_inflight(r, 1.0)
{
// we estimate queue is drained
bbr_enter_probe_bw(r, now);
}
}
+fn bbr_bytes_in_net(r: &mut Recovery, now: Instant) -> usize {
+ let buffered = r.get_host_buffered(now);
+ let in_flight = r.bytes_in_flight;
+
+ in_flight.saturating_sub(buffered)
+}
+
// 4.3.4.3. Gain Cycling Algorithm
fn bbr_enter_probe_bw(r: &mut Recovery, now: Instant) {
let bbr = &mut r.bbr_state;
diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs
index a7f0b35a..128f4039 100644
--- a/quiche/src/recovery/mod.rs
+++ b/quiche/src/recovery/mod.rs
@@ -400,6 +400,24 @@ impl Recovery {
trace!("{} {:?}", trace_id, self);
}
+ // Limit data buffered in host send buffer to avoid bufferbloat
+ // at the sender.
+ pub fn get_host_buffered(&mut self, now: Instant) -> usize {
+ let mut buffered: usize = 0;
+
+ for epoch in 0..packet::Epoch::count() {
+ for bytes in self.sent[epoch].iter()
+ // Skip packets that have already been sent, acked or lost, or that are not in-flight.
+ .filter(|p| p.in_flight && p.time_acked.is_none() && p.time_lost.is_none() && p.time_sent > now)
+ .map(|x| x.size)
+ {
+ buffered += bytes;
+ }
+ }
+
+ buffered
+ }
+
fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
(self.cc_ops.on_packet_sent)(self, sent_bytes, now);
}
@@ -2262,6 +2280,204 @@ mod tests {
now + Duration::from_secs_f64(12000.0 / pacing_rate as f64)
);
}
+
+ #[test]
+ fn test_host_buffered() {
+ let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
+
+ let mut r = Recovery::new(&cfg);
+
+ let mut now = Instant::now();
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
+
+ // Send out the first packet (a full initcwnd).
+ let p = Sent {
+ pkt_num: 0,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 12000,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
+ assert_eq!(r.bytes_in_flight, 12000);
+ assert_eq!(r.get_host_buffered(now), 0);
+
+ // First packet will be sent out immediately.
+ assert_eq!(r.pacer.rate(), 0);
+ assert_eq!(r.get_packet_send_time(), now);
+
+ // Wait 50ms for ACK.
+ now += Duration::from_millis(50);
+
+ let mut acked = ranges::RangeSet::default();
+ acked.insert(0..1);
+
+ assert_eq!(
+ r.on_ack_received(
+ &acked,
+ 10,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ &mut Vec::new(),
+ ),
+ Ok((0, 0))
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
+ assert_eq!(r.bytes_in_flight, 0);
+ assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
+
+ // 1 MSS increased.
+ assert_eq!(r.congestion_window, 12000 + 1200);
+
+ // Send out second packet.
+ let p = Sent {
+ pkt_num: 1,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 6000,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
+ assert_eq!(r.bytes_in_flight, 6000);
+
+ // Pacing is not done during initial phase of connection.
+ assert_eq!(r.get_packet_send_time(), now);
+
+ // Send the third packet out.
+ let p = Sent {
+ pkt_num: 2,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 6000,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
+ assert_eq!(r.bytes_in_flight, 12000);
+ assert_eq!(r.get_host_buffered(now), 0);
+
+ // Send the fourth packet out.
+ let p = Sent {
+ pkt_num: 3,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 5000,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
+ assert_eq!(r.bytes_in_flight, 17000);
+ assert_eq!(r.get_host_buffered(now), 5000);
+
+ let p = Sent {
+ pkt_num: 3,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 1200,
+ ack_eliciting: true,
+ in_flight: false,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
+ assert_eq!(r.bytes_in_flight, 17000);
+ assert_eq!(r.get_host_buffered(now), 5000);
+ }
}
mod bbr;