diff options
-rw-r--r-- | src/h3/mod.rs | 262 |
1 files changed, 260 insertions, 2 deletions
diff --git a/src/h3/mod.rs b/src/h3/mod.rs index efee14d9..5bf28802 100644 --- a/src/h3/mod.rs +++ b/src/h3/mod.rs @@ -1010,7 +1010,7 @@ impl Connection { return Err(Error::Done); } - let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) + + let overhead = 1 + // DATA frame type is always 1 byte long octets::varint_len(body.len() as u64); let stream_cap = match conn.stream_capacity(stream_id) { @@ -1036,8 +1036,15 @@ impl Connection { return Err(Error::Done); } + let data_frame_capacity = stream_cap - overhead; + + // Mark stream blocked if we can't send anything in the DATA frame. + if !body.is_empty() && data_frame_capacity == 0 { + let _ = conn.stream_writable(stream_id, overhead + body.len()); + } + // Cap the frame payload length to the stream's capacity. - let body_len = std::cmp::min(body.len(), stream_cap - overhead); + let body_len = std::cmp::min(body.len(), data_frame_capacity); // If we can't send the entire body, set the fin flag to false so the // application can try again later. @@ -1072,6 +1079,55 @@ impl Connection { Ok(written) } + /// Returns the stream's body capacity considering framing overhead. + /// + /// If the specified stream doesn't exist (including when it has already + /// been completed and closed), the FrameUnexpected or InvalidStreamState + /// error will be returned. + /// + /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected + /// [`InvalidStreamState`]: enum.Error.html#variant.InvalidStreamState + pub fn body_capacity( + &mut self, conn: &super::Connection, stream_id: u64, + ) -> Result<usize> { + // Validate that it is sane to send data on the stream. + if stream_id % 4 != 0 { + return Err(Error::FrameUnexpected); + } + + match self.streams.get(&stream_id) { + Some(s) => + if !s.local_initialized() { + return Err(Error::FrameUnexpected); + }, + + None => { + return Err(Error::FrameUnexpected); + }, + }; + + let stream_cap = match conn.stream_capacity(stream_id) { + Ok(v) => v, + + Err(e) => { + if conn.stream_finished(stream_id) { + self.streams.remove(&stream_id); + } + + return Err(e.into()); + }, + }; + + let overhead = 1 + // DATA frame type is always 1 byte + octets::varint_len(stream_cap as u64); + + if stream_cap < overhead { + return Err(Error::StreamBlocked); + } + + Ok(stream_cap - overhead) + } + /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support. /// /// Support is signalled by the peer's SETTINGS, so this method always @@ -3697,6 +3753,208 @@ mod tests { } #[test] + /// Tests that stream and body capacity is reported correctly. + fn body_capacity() { + let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config.set_application_protos(b"\x02h3").unwrap(); + config.set_initial_max_data(1000); + config.set_initial_max_stream_data_bidi_local(150); + config.set_initial_max_stream_data_bidi_remote(150); + config.set_initial_max_stream_data_uni(150); + config.set_initial_max_streams_bidi(100); + config.set_initial_max_streams_uni(5); + config.verify_peer(false); + + let mut h3_config = Config::new().unwrap(); + + let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap(); + + s.handshake().unwrap(); + + // Can't query stream or body capacity before sending request. + assert_eq!( + s.pipe.client.stream_capacity(0), + Err(crate::Error::InvalidStreamState(0)) + ); + assert_eq!( + s.client.body_capacity(&s.pipe.client, 0), + Err(Error::FrameUnexpected) + ); + + let req = vec![ + Header::new(b":method", b"GET"), + Header::new(b":scheme", b"https"), + Header::new(b":authority", b"quic.tech"), + Header::new(b":path", b"/test"), + ]; + + assert_eq!( + s.client.send_request(&mut s.pipe.client, &req, false), + Ok(0) + ); + + assert_eq!(s.pipe.client.stream_capacity(0), Ok(93)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(90)); + assert_eq!( + s.client + .send_body(&mut s.pipe.client, 0, b"hellohello", false), + Ok(10) + ); + assert_eq!(s.pipe.client.stream_capacity(0), Ok(81)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(78)); + assert_eq!( + s.client + .send_body(&mut s.pipe.client, 0, b"hellohello", false), + Ok(10) + ); + assert_eq!(s.pipe.client.stream_capacity(0), Ok(69)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(66)); + assert_eq!( + s.client + .send_body(&mut s.pipe.client, 0, b"hellohello", false), + Ok(10) + ); + assert_eq!(s.pipe.client.stream_capacity(0), Ok(57)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(55)); + assert_eq!( + s.client + .send_body(&mut s.pipe.client, 0, b"hellohello", false), + Ok(10) + ); + assert_eq!(s.pipe.client.stream_capacity(0), Ok(45)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(43)); + assert_eq!( + s.client + .send_body(&mut s.pipe.client, 0, b"hellohello", false), + Ok(10) + ); + assert_eq!(s.pipe.client.stream_capacity(0), Ok(33)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(31)); + assert_eq!( + s.client + .send_body(&mut s.pipe.client, 0, b"hellohello", false), + Ok(10) + ); + assert_eq!(s.pipe.client.stream_capacity(0), Ok(21)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(19)); + assert_eq!( + s.client + .send_body(&mut s.pipe.client, 0, b"hellohello", false), + Ok(10) + ); + assert_eq!(s.pipe.client.stream_capacity(0), Ok(9)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(7)); + assert_eq!( + s.client + .send_body(&mut s.pipe.client, 0, b"waytoomuchdata", false), + Ok(7) + ); + + // No remaining body capacity on client. + assert_eq!(s.pipe.client.stream_capacity(0), Ok(0)); + assert_eq!( + s.client.body_capacity(&s.pipe.client, 0), + Err(Error::StreamBlocked) + ); + assert_eq!( + s.client.send_body(&mut s.pipe.client, 0, b"", true), + Err(Error::Done) + ); + + // On the server, can't query stream or body capacity until their open. + assert_eq!( + s.pipe.server.stream_capacity(0), + Err(crate::Error::InvalidStreamState(0)) + ); + assert_eq!( + s.server.body_capacity(&s.pipe.server, 0), + Err(Error::FrameUnexpected) + ); + + s.advance().ok(); + + // Server hasn't consumed stream data, so still no capacity at client. + assert_eq!(s.pipe.client.stream_capacity(0), Ok(0)); + assert_eq!( + s.client.body_capacity(&s.pipe.client, 0), + Err(Error::StreamBlocked) + ); + + // Server has stream capacity but body capacity is unavailable until it + // responds. + assert_eq!(s.pipe.server.stream_capacity(0), Ok(150)); + assert_eq!( + s.server.body_capacity(&s.pipe.server, 0), + Err(Error::FrameUnexpected) + ); + + // Server process request to free capacity. + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + let mut recv_buf = vec![0; 100]; + + assert_eq!(s.poll_server(), Ok((0, ev_headers))); + assert_eq!(s.poll_server(), Ok((0, Event::Data))); + assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(77)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + // Server sends a response sized to exactly available capacity. + let resp = s.send_response(0, false).unwrap(); + assert_eq!(s.pipe.server.stream_capacity(0), Ok(98)); + assert_eq!(s.server.body_capacity(&s.pipe.server, 0), Ok(95)); + + assert_eq!( + s.server + .send_body(&mut s.pipe.server, 0, &recv_buf[..95], true), + Ok(95) + ); + assert_eq!(s.pipe.server.stream_capacity(0), Ok(0)); + assert_eq!( + s.server.body_capacity(&s.pipe.server, 0), + Err(Error::StreamBlocked) + ); + + s.advance().ok(); + + // Once the server gives stream flow control credits back, client can send + // the body fin. + assert_eq!(s.pipe.client.stream_capacity(0), Ok(150)); + assert_eq!(s.client.body_capacity(&s.pipe.client, 0), Ok(147)); + assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0)); + + // Client processes response, no credits are given back to the fin'd + // stream. + let ev_headers = Event::Headers { + list: resp, + has_body: true, + }; + + assert_eq!(s.poll_client(), Ok((0, ev_headers))); + + assert_eq!(s.poll_client(), Ok((0, Event::Data))); + assert_eq!(s.recv_body_client(0, &mut recv_buf), Ok(95)); + + assert_eq!(s.poll_client(), Ok((0, Event::Finished))); + assert_eq!(s.poll_client(), Err(Error::Done)); + + s.advance().ok(); + + assert_eq!(s.pipe.server.stream_capacity(0), Ok(0)); + assert_eq!( + s.server.body_capacity(&s.pipe.server, 0), + Err(Error::StreamBlocked) + ); + } + + #[test] /// Tests that receiving a H3_DATAGRAM setting is ok. fn dgram_setting() { let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); |