aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--quiche/src/recovery/bbr/mod.rs6
-rw-r--r--quiche/src/recovery/bbr/per_ack.rs9
-rw-r--r--quiche/src/recovery/mod.rs216
3 files changed, 227 insertions, 4 deletions
diff --git a/quiche/src/recovery/bbr/mod.rs b/quiche/src/recovery/bbr/mod.rs
index 07683dc0..74d477f8 100644
--- a/quiche/src/recovery/bbr/mod.rs
+++ b/quiche/src/recovery/bbr/mod.rs
@@ -605,7 +605,7 @@ mod tests {
}
// Stop at right before filled_pipe=true.
- for _ in 0..5 {
+ for _ in 0..6 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
@@ -640,9 +640,9 @@ mod tests {
let mut acked = ranges::RangeSet::default();
- // We sent 5 packets, but ack only one, to stay
+ // We sent 6 packets, but ack only one, to stay
// in Drain state.
- acked.insert(0..pn - 4);
+ acked.insert(0..pn - 5);
assert_eq!(
r.on_ack_received(
diff --git a/quiche/src/recovery/bbr/per_ack.rs b/quiche/src/recovery/bbr/per_ack.rs
index 22f3dd26..38d853b4 100644
--- a/quiche/src/recovery/bbr/per_ack.rs
+++ b/quiche/src/recovery/bbr/per_ack.rs
@@ -257,13 +257,20 @@ fn bbr_check_drain(r: &mut Recovery, now: Instant) {
}
if r.bbr_state.state == BBRStateMachine::Drain &&
- r.bytes_in_flight <= bbr_inflight(r, 1.0)
+ bbr_bytes_in_net(r, now) <= bbr_inflight(r, 1.0)
{
// we estimate queue is drained
bbr_enter_probe_bw(r, now);
}
}
+fn bbr_bytes_in_net(r: &mut Recovery, now: Instant) -> usize {
+ let buffered = r.get_host_buffered(now);
+ let in_flight = r.bytes_in_flight;
+
+ in_flight.saturating_sub(buffered)
+}
+
// 4.3.4.3. Gain Cycling Algorithm
fn bbr_enter_probe_bw(r: &mut Recovery, now: Instant) {
let bbr = &mut r.bbr_state;
diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs
index a7f0b35a..128f4039 100644
--- a/quiche/src/recovery/mod.rs
+++ b/quiche/src/recovery/mod.rs
@@ -400,6 +400,24 @@ impl Recovery {
trace!("{} {:?}", trace_id, self);
}
+ // Limit data buffered in host send buffer to avoid bufferbloat
+ // at the sender.
+ pub fn get_host_buffered(&mut self, now: Instant) -> usize {
+ let mut buffered: usize = 0;
+
+ for epoch in 0..packet::Epoch::count() {
+ for bytes in self.sent[epoch].iter()
+ // Skip packets that have already been sent, acked or lost, or that are not in-flight.
+ .filter(|p| p.in_flight && p.time_acked.is_none() && p.time_lost.is_none() && p.time_sent > now)
+ .map(|x| x.size)
+ {
+ buffered += bytes;
+ }
+ }
+
+ buffered
+ }
+
fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
(self.cc_ops.on_packet_sent)(self, sent_bytes, now);
}
@@ -2262,6 +2280,204 @@ mod tests {
now + Duration::from_secs_f64(12000.0 / pacing_rate as f64)
);
}
+
+ #[test]
+ fn test_host_buffered() {
+ let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
+
+ let mut r = Recovery::new(&cfg);
+
+ let mut now = Instant::now();
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
+
+ // Send out the first packet (a full initcwnd).
+ let p = Sent {
+ pkt_num: 0,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 12000,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
+ assert_eq!(r.bytes_in_flight, 12000);
+ assert_eq!(r.get_host_buffered(now), 0);
+
+ // First packet will be sent out immediately.
+ assert_eq!(r.pacer.rate(), 0);
+ assert_eq!(r.get_packet_send_time(), now);
+
+ // Wait 50ms for ACK.
+ now += Duration::from_millis(50);
+
+ let mut acked = ranges::RangeSet::default();
+ acked.insert(0..1);
+
+ assert_eq!(
+ r.on_ack_received(
+ &acked,
+ 10,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ &mut Vec::new(),
+ ),
+ Ok((0, 0))
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
+ assert_eq!(r.bytes_in_flight, 0);
+ assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
+
+ // 1 MSS increased.
+ assert_eq!(r.congestion_window, 12000 + 1200);
+
+ // Send out second packet.
+ let p = Sent {
+ pkt_num: 1,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 6000,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
+ assert_eq!(r.bytes_in_flight, 6000);
+
+ // Pacing is not done during initial phase of connection.
+ assert_eq!(r.get_packet_send_time(), now);
+
+ // Send the third packet out.
+ let p = Sent {
+ pkt_num: 2,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 6000,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
+ assert_eq!(r.bytes_in_flight, 12000);
+ assert_eq!(r.get_host_buffered(now), 0);
+
+ // Send the fourth packet out.
+ let p = Sent {
+ pkt_num: 3,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 5000,
+ ack_eliciting: true,
+ in_flight: true,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
+ assert_eq!(r.bytes_in_flight, 17000);
+ assert_eq!(r.get_host_buffered(now), 5000);
+
+ let p = Sent {
+ pkt_num: 3,
+ frames: smallvec![],
+ time_sent: now,
+ time_acked: None,
+ time_lost: None,
+ size: 1200,
+ ack_eliciting: true,
+ in_flight: false,
+ delivered: 0,
+ delivered_time: now,
+ first_sent_time: now,
+ is_app_limited: false,
+ tx_in_flight: 0,
+ lost: 0,
+ has_data: false,
+ };
+
+ r.on_packet_sent(
+ p,
+ packet::Epoch::Application,
+ HandshakeStatus::default(),
+ now,
+ "",
+ );
+
+ assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
+ assert_eq!(r.bytes_in_flight, 17000);
+ assert_eq!(r.get_host_buffered(now), 5000);
+ }
}
mod bbr;