summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar lucas <lucas@cloudflare.com> 2021-06-01 13:31:56 +0100
committerGravatar Alessandro Ghedini <alessandro@ghedini.me> 2021-06-03 13:12:32 +0100
commita9116d8ca6539b02212573352a96867c41afa5c2 (patch)
tree302c7ce2d9251da616d88ce0c17e6536af98ad7d
parentc9e7ab567f95d19b67bf0940833f3c66d478242c (diff)
downloadquiche-a9116d8ca6539b02212573352a96867c41afa5c2.tar.gz
quiche-a9116d8ca6539b02212573352a96867c41afa5c2.tar.zst
quiche-a9116d8ca6539b02212573352a96867c41afa5c2.zip
lib: alternate DATAGRAM and STREAM frame packing
Previously, quiche would always prefer sending pending DATAGRAM frames over STREAM frames. Depending on the size of DATAGRAMs in relation to QUIC packet sizes the available space in packets for STREAM frames can end up being small. Similarly, sometimes STREAM frames may only get sent when the congestion window was smaller than the next DATAGRAM. This commit introduces a preference toggle into the quiche sending logic. This gives equal opportunity to send full-sized DATAGRAM or STREAM frames between calls to `send()`. In cases where there is no pending data of one type, quiche will just fallback to the other.
-rw-r--r--src/lib.rs166
1 files changed, 164 insertions, 2 deletions
diff --git a/src/lib.rs b/src/lib.rs
index f546acd4..fb6875a8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1097,6 +1097,9 @@ pub struct Connection {
/// DATAGRAM queues.
dgram_recv_queue: dgram::DatagramQueue,
dgram_send_queue: dgram::DatagramQueue,
+
+ /// Whether to emit DATAGRAM frames in the next packet.
+ emit_dgram: bool,
}
/// Creates a new server-side connection.
@@ -1434,6 +1437,8 @@ impl Connection {
dgram_send_queue: dgram::DatagramQueue::new(
config.dgram_send_max_queue_len,
),
+
+ emit_dgram: true,
});
if let Some(odcid) = odcid {
@@ -2897,10 +2902,27 @@ impl Connection {
}
}
+ // The preference of data-bearing frame to include in a packet
+ // is managed by `self.emit_dgram`. However, whether any frames
+ // can be sent depends on the state of their buffers. In the case
+ // where one type is preferred but its buffer is empty, fall back
+ // to the other type in order not to waste this function call.
+ let mut dgram_emitted = false;
+ let dgrams_to_emit = self.dgram_max_writable_len().is_some();
+ let stream_to_emit = self.streams.has_flushable();
+
+ let mut do_dgram = self.emit_dgram && dgrams_to_emit;
+ let do_stream = !self.emit_dgram && stream_to_emit;
+
+ if !do_stream && dgrams_to_emit {
+ do_dgram = true;
+ }
+
// Create DATAGRAM frame.
if (pkt_type == packet::Type::Short || pkt_type == packet::Type::ZeroRTT) &&
left > frame::MAX_DGRAM_OVERHEAD &&
- !is_closing
+ !is_closing &&
+ do_dgram
{
if let Some(max_dgram_payload) = self.dgram_max_writable_len() {
while let Some(len) = self.dgram_send_queue.peek_front_len() {
@@ -2913,6 +2935,7 @@ impl Connection {
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
+ dgram_emitted = true;
}
},
@@ -2931,7 +2954,8 @@ impl Connection {
// Create a single STREAM frame for the first stream that is flushable.
if (pkt_type == packet::Type::Short || pkt_type == packet::Type::ZeroRTT) &&
left > frame::MAX_STREAM_OVERHEAD &&
- !is_closing
+ !is_closing &&
+ !dgram_emitted
{
while let Some(stream_id) = self.streams.pop_flushable() {
let stream = match self.streams.get_mut(stream_id) {
@@ -3034,6 +3058,9 @@ impl Connection {
}
}
+ // Alternate trying to send DATAGRAMs next time.
+ self.emit_dgram = !dgram_emitted;
+
// Create PING for PTO probe if no other ack-elicitng frame is sent.
if self.recovery.loss_probes[epoch] > 0 &&
!ack_eliciting &&
@@ -9211,6 +9238,141 @@ mod tests {
}
#[test]
+ /// Tests that streams and datagrams are correctly scheduled.
+ fn stream_datagram_priority() {
+ // Limit 1-RTT packet size to avoid congestion control interference.
+ const MAX_TEST_PACKET_SIZE: usize = 540;
+
+ let mut buf = [0; 65535];
+
+ let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config
+ .set_application_protos(b"\x06proto1\x06proto2")
+ .unwrap();
+ config.set_initial_max_data(1_000_000);
+ config.set_initial_max_stream_data_bidi_local(1_000_000);
+ config.set_initial_max_stream_data_bidi_remote(1_000_000);
+ config.set_initial_max_stream_data_uni(0);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(0);
+ config.enable_dgram(true, 10, 10);
+ config.verify_peer(false);
+
+ let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
+ assert_eq!(pipe.handshake(), Ok(()));
+
+ assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
+ assert_eq!(pipe.advance(), Ok(()));
+
+ assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
+ assert_eq!(pipe.advance(), Ok(()));
+
+ let mut b = [0; 1];
+
+ let out = [b'b'; 500];
+
+ // Server prioritizes Stream 0 and 4 with the same urgency with
+ // incremental, meaning the frames should be sent in round-robin
+ // fashion. It also sends DATAGRAMS which are always interleaved with
+ // STREAM frames. So we'll expect a mix of frame types regardless
+ // of the order that the application writes things in.
+
+ pipe.server.stream_recv(0, &mut b).unwrap();
+ assert_eq!(pipe.server.stream_priority(0, 255, true), Ok(()));
+ pipe.server.stream_send(0, &out, false).unwrap();
+ pipe.server.stream_send(0, &out, false).unwrap();
+ pipe.server.stream_send(0, &out, false).unwrap();
+
+ assert_eq!(pipe.server.stream_priority(4, 255, true), Ok(()));
+ pipe.server.stream_send(4, &out, false).unwrap();
+ pipe.server.stream_send(4, &out, false).unwrap();
+ pipe.server.stream_send(4, &out, false).unwrap();
+
+ for _ in 1..=6 {
+ assert_eq!(pipe.server.dgram_send(&out), Ok(()));
+ }
+
+ let mut off_0 = 0;
+ let mut off_4 = 0;
+
+ for _ in 1..=3 {
+ // DATAGRAM
+ let (len, _) =
+ pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
+
+ let frames =
+ testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
+ let mut frame_iter = frames.iter();
+
+ assert_eq!(frame_iter.next().unwrap(), &frame::Frame::Datagram {
+ data: out.into(),
+ });
+ assert_eq!(frame_iter.next(), None);
+
+ // STREAM 0
+ let (len, _) =
+ pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
+
+ let frames =
+ testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
+ let mut frame_iter = frames.iter();
+ let stream = frame_iter.next().unwrap();
+
+ assert_eq!(stream, &frame::Frame::Stream {
+ stream_id: 0,
+ data: stream::RangeBuf::from(&out, off_0, false),
+ });
+
+ off_0 = match stream {
+ frame::Frame::Stream { data, .. } => data.max_off(),
+
+ _ => unreachable!(),
+ };
+ assert_eq!(frame_iter.next(), None);
+
+ // DATAGRAM
+ let (len, _) =
+ pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
+
+ let frames =
+ testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
+ let mut frame_iter = frames.iter();
+
+ assert_eq!(frame_iter.next().unwrap(), &frame::Frame::Datagram {
+ data: out.into(),
+ });
+ assert_eq!(frame_iter.next(), None);
+
+ // STREAM 4
+ let (len, _) =
+ pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
+
+ let frames =
+ testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
+ let mut frame_iter = frames.iter();
+ let stream = frame_iter.next().unwrap();
+
+ assert_eq!(stream, &frame::Frame::Stream {
+ stream_id: 4,
+ data: stream::RangeBuf::from(&out, off_4, false),
+ });
+
+ off_4 = match stream {
+ frame::Frame::Stream { data, .. } => data.max_off(),
+
+ _ => unreachable!(),
+ };
+ assert_eq!(frame_iter.next(), None);
+ }
+ }
+
+ #[test]
/// Tests that old data is retransmitted on PTO.
fn early_retransmit() {
let mut buf = [0; 65535];