reqwest/
connect.rs

1#[cfg(feature = "__tls")]
2use http::header::HeaderValue;
3#[cfg(feature = "__tls")]
4use http::uri::Scheme;
5use http::Uri;
6use hyper::rt::{Read, ReadBufCursor, Write};
7use hyper_util::client::legacy::connect::{Connected, Connection};
8#[cfg(any(feature = "socks", feature = "__tls"))]
9use hyper_util::rt::TokioIo;
10#[cfg(feature = "default-tls")]
11use native_tls_crate::{TlsConnector, TlsConnectorBuilder};
12use pin_project_lite::pin_project;
13use tower::util::{BoxCloneSyncServiceLayer, MapRequestLayer};
14use tower::{timeout::TimeoutLayer, util::BoxCloneSyncService, ServiceBuilder};
15use tower_service::Service;
16
17use std::future::Future;
18use std::io::{self, IoSlice};
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25#[cfg(feature = "default-tls")]
26use self::native_tls_conn::NativeTlsConn;
27#[cfg(feature = "__rustls")]
28use self::rustls_tls_conn::RustlsTlsConn;
29use crate::dns::DynResolver;
30use crate::error::{cast_to_internal_error, BoxError};
31use crate::proxy::{Intercepted, Matcher as ProxyMatcher};
32use sealed::{Conn, Unnameable};
33
34pub(crate) type HttpConnector = hyper_util::client::legacy::connect::HttpConnector<DynResolver>;
35
36#[derive(Clone)]
37pub(crate) enum Connector {
38    // base service, with or without an embedded timeout
39    Simple(ConnectorService),
40    // at least one custom layer along with maybe an outer timeout layer
41    // from `builder.connect_timeout()`
42    WithLayers(BoxCloneSyncService<Unnameable, Conn, BoxError>),
43}
44
45impl Service<Uri> for Connector {
46    type Response = Conn;
47    type Error = BoxError;
48    type Future = Connecting;
49
50    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51        match self {
52            Connector::Simple(service) => service.poll_ready(cx),
53            Connector::WithLayers(service) => service.poll_ready(cx),
54        }
55    }
56
57    fn call(&mut self, dst: Uri) -> Self::Future {
58        match self {
59            Connector::Simple(service) => service.call(dst),
60            Connector::WithLayers(service) => service.call(Unnameable(dst)),
61        }
62    }
63}
64
65pub(crate) type BoxedConnectorService = BoxCloneSyncService<Unnameable, Conn, BoxError>;
66
67pub(crate) type BoxedConnectorLayer =
68    BoxCloneSyncServiceLayer<BoxedConnectorService, Unnameable, Conn, BoxError>;
69
70pub(crate) struct ConnectorBuilder {
71    inner: Inner,
72    proxies: Arc<Vec<ProxyMatcher>>,
73    verbose: verbose::Wrapper,
74    timeout: Option<Duration>,
75    #[cfg(feature = "__tls")]
76    nodelay: bool,
77    #[cfg(feature = "__tls")]
78    tls_info: bool,
79    #[cfg(feature = "__tls")]
80    user_agent: Option<HeaderValue>,
81    #[cfg(feature = "socks")]
82    resolver: Option<DynResolver>,
83}
84
85impl ConnectorBuilder {
86    pub(crate) fn build(self, layers: Vec<BoxedConnectorLayer>) -> Connector
87where {
88        // construct the inner tower service
89        let mut base_service = ConnectorService {
90            inner: self.inner,
91            proxies: self.proxies,
92            verbose: self.verbose,
93            #[cfg(feature = "__tls")]
94            nodelay: self.nodelay,
95            #[cfg(feature = "__tls")]
96            tls_info: self.tls_info,
97            #[cfg(feature = "__tls")]
98            user_agent: self.user_agent,
99            simple_timeout: None,
100            #[cfg(feature = "socks")]
101            resolver: self.resolver.unwrap_or_else(DynResolver::gai),
102        };
103
104        if layers.is_empty() {
105            // we have no user-provided layers, only use concrete types
106            base_service.simple_timeout = self.timeout;
107            return Connector::Simple(base_service);
108        }
109
110        // otherwise we have user provided layers
111        // so we need type erasure all the way through
112        // as well as mapping the unnameable type of the layers back to Uri for the inner service
113        let unnameable_service = ServiceBuilder::new()
114            .layer(MapRequestLayer::new(|request: Unnameable| request.0))
115            .service(base_service);
116        let mut service = BoxCloneSyncService::new(unnameable_service);
117
118        for layer in layers {
119            service = ServiceBuilder::new().layer(layer).service(service);
120        }
121
122        // now we handle the concrete stuff - any `connect_timeout`,
123        // plus a final map_err layer we can use to cast default tower layer
124        // errors to internal errors
125        match self.timeout {
126            Some(timeout) => {
127                let service = ServiceBuilder::new()
128                    .layer(TimeoutLayer::new(timeout))
129                    .service(service);
130                let service = ServiceBuilder::new()
131                    .map_err(|error: BoxError| cast_to_internal_error(error))
132                    .service(service);
133                let service = BoxCloneSyncService::new(service);
134
135                Connector::WithLayers(service)
136            }
137            None => {
138                // no timeout, but still map err
139                // no named timeout layer but we still map errors since
140                // we might have user-provided timeout layer
141                let service = ServiceBuilder::new().service(service);
142                let service = ServiceBuilder::new()
143                    .map_err(|error: BoxError| cast_to_internal_error(error))
144                    .service(service);
145                let service = BoxCloneSyncService::new(service);
146                Connector::WithLayers(service)
147            }
148        }
149    }
150
151    #[cfg(not(feature = "__tls"))]
152    pub(crate) fn new<T>(
153        mut http: HttpConnector,
154        proxies: Arc<Vec<ProxyMatcher>>,
155        local_addr: T,
156        #[cfg(any(
157            target_os = "android",
158            target_os = "fuchsia",
159            target_os = "illumos",
160            target_os = "ios",
161            target_os = "linux",
162            target_os = "macos",
163            target_os = "solaris",
164            target_os = "tvos",
165            target_os = "visionos",
166            target_os = "watchos",
167        ))]
168        interface: Option<&str>,
169        nodelay: bool,
170    ) -> ConnectorBuilder
171    where
172        T: Into<Option<IpAddr>>,
173    {
174        http.set_local_address(local_addr.into());
175        #[cfg(any(
176            target_os = "android",
177            target_os = "fuchsia",
178            target_os = "illumos",
179            target_os = "ios",
180            target_os = "linux",
181            target_os = "macos",
182            target_os = "solaris",
183            target_os = "tvos",
184            target_os = "visionos",
185            target_os = "watchos",
186        ))]
187        if let Some(interface) = interface {
188            http.set_interface(interface.to_owned());
189        }
190        http.set_nodelay(nodelay);
191
192        ConnectorBuilder {
193            inner: Inner::Http(http),
194            proxies,
195            verbose: verbose::OFF,
196            timeout: None,
197            #[cfg(feature = "socks")]
198            resolver: None,
199        }
200    }
201
202    #[cfg(feature = "default-tls")]
203    pub(crate) fn new_default_tls<T>(
204        http: HttpConnector,
205        tls: TlsConnectorBuilder,
206        proxies: Arc<Vec<ProxyMatcher>>,
207        user_agent: Option<HeaderValue>,
208        local_addr: T,
209        #[cfg(any(
210            target_os = "android",
211            target_os = "fuchsia",
212            target_os = "illumos",
213            target_os = "ios",
214            target_os = "linux",
215            target_os = "macos",
216            target_os = "solaris",
217            target_os = "tvos",
218            target_os = "visionos",
219            target_os = "watchos",
220        ))]
221        interface: Option<&str>,
222        nodelay: bool,
223        tls_info: bool,
224    ) -> crate::Result<ConnectorBuilder>
225    where
226        T: Into<Option<IpAddr>>,
227    {
228        let tls = tls.build().map_err(crate::error::builder)?;
229        Ok(Self::from_built_default_tls(
230            http,
231            tls,
232            proxies,
233            user_agent,
234            local_addr,
235            #[cfg(any(
236                target_os = "android",
237                target_os = "fuchsia",
238                target_os = "illumos",
239                target_os = "ios",
240                target_os = "linux",
241                target_os = "macos",
242                target_os = "solaris",
243                target_os = "tvos",
244                target_os = "visionos",
245                target_os = "watchos",
246            ))]
247            interface,
248            nodelay,
249            tls_info,
250        ))
251    }
252
253    #[cfg(feature = "default-tls")]
254    pub(crate) fn from_built_default_tls<T>(
255        mut http: HttpConnector,
256        tls: TlsConnector,
257        proxies: Arc<Vec<ProxyMatcher>>,
258        user_agent: Option<HeaderValue>,
259        local_addr: T,
260        #[cfg(any(
261            target_os = "android",
262            target_os = "fuchsia",
263            target_os = "illumos",
264            target_os = "ios",
265            target_os = "linux",
266            target_os = "macos",
267            target_os = "solaris",
268            target_os = "tvos",
269            target_os = "visionos",
270            target_os = "watchos",
271        ))]
272        interface: Option<&str>,
273        nodelay: bool,
274        tls_info: bool,
275    ) -> ConnectorBuilder
276    where
277        T: Into<Option<IpAddr>>,
278    {
279        http.set_local_address(local_addr.into());
280        #[cfg(any(
281            target_os = "android",
282            target_os = "fuchsia",
283            target_os = "illumos",
284            target_os = "ios",
285            target_os = "linux",
286            target_os = "macos",
287            target_os = "solaris",
288            target_os = "tvos",
289            target_os = "visionos",
290            target_os = "watchos",
291        ))]
292        if let Some(interface) = interface {
293            http.set_interface(interface);
294        }
295        http.set_nodelay(nodelay);
296        http.enforce_http(false);
297
298        ConnectorBuilder {
299            inner: Inner::DefaultTls(http, tls),
300            proxies,
301            verbose: verbose::OFF,
302            nodelay,
303            tls_info,
304            user_agent,
305            timeout: None,
306            #[cfg(feature = "socks")]
307            resolver: None,
308        }
309    }
310
311    #[cfg(feature = "__rustls")]
312    pub(crate) fn new_rustls_tls<T>(
313        mut http: HttpConnector,
314        tls: rustls::ClientConfig,
315        proxies: Arc<Vec<ProxyMatcher>>,
316        user_agent: Option<HeaderValue>,
317        local_addr: T,
318        #[cfg(any(
319            target_os = "android",
320            target_os = "fuchsia",
321            target_os = "illumos",
322            target_os = "ios",
323            target_os = "linux",
324            target_os = "macos",
325            target_os = "solaris",
326            target_os = "tvos",
327            target_os = "visionos",
328            target_os = "watchos",
329        ))]
330        interface: Option<&str>,
331        nodelay: bool,
332        tls_info: bool,
333    ) -> ConnectorBuilder
334    where
335        T: Into<Option<IpAddr>>,
336    {
337        http.set_local_address(local_addr.into());
338        #[cfg(any(
339            target_os = "android",
340            target_os = "fuchsia",
341            target_os = "illumos",
342            target_os = "ios",
343            target_os = "linux",
344            target_os = "macos",
345            target_os = "solaris",
346            target_os = "tvos",
347            target_os = "visionos",
348            target_os = "watchos",
349        ))]
350        if let Some(interface) = interface {
351            http.set_interface(interface.to_owned());
352        }
353        http.set_nodelay(nodelay);
354        http.enforce_http(false);
355
356        let (tls, tls_proxy) = if proxies.is_empty() {
357            let tls = Arc::new(tls);
358            (tls.clone(), tls)
359        } else {
360            let mut tls_proxy = tls.clone();
361            tls_proxy.alpn_protocols.clear();
362            (Arc::new(tls), Arc::new(tls_proxy))
363        };
364
365        ConnectorBuilder {
366            inner: Inner::RustlsTls {
367                http,
368                tls,
369                tls_proxy,
370            },
371            proxies,
372            verbose: verbose::OFF,
373            nodelay,
374            tls_info,
375            user_agent,
376            timeout: None,
377            #[cfg(feature = "socks")]
378            resolver: None,
379        }
380    }
381
382    pub(crate) fn set_timeout(&mut self, timeout: Option<Duration>) {
383        self.timeout = timeout;
384    }
385
386    pub(crate) fn set_verbose(&mut self, enabled: bool) {
387        self.verbose.0 = enabled;
388    }
389
390    pub(crate) fn set_keepalive(&mut self, dur: Option<Duration>) {
391        match &mut self.inner {
392            #[cfg(feature = "default-tls")]
393            Inner::DefaultTls(http, _tls) => http.set_keepalive(dur),
394            #[cfg(feature = "__rustls")]
395            Inner::RustlsTls { http, .. } => http.set_keepalive(dur),
396            #[cfg(not(feature = "__tls"))]
397            Inner::Http(http) => http.set_keepalive(dur),
398        }
399    }
400
401    pub(crate) fn set_keepalive_interval(&mut self, dur: Option<Duration>) {
402        match &mut self.inner {
403            #[cfg(feature = "default-tls")]
404            Inner::DefaultTls(http, _tls) => http.set_keepalive_interval(dur),
405            #[cfg(feature = "__rustls")]
406            Inner::RustlsTls { http, .. } => http.set_keepalive_interval(dur),
407            #[cfg(not(feature = "__tls"))]
408            Inner::Http(http) => http.set_keepalive_interval(dur),
409        }
410    }
411
412    pub(crate) fn set_keepalive_retries(&mut self, retries: Option<u32>) {
413        match &mut self.inner {
414            #[cfg(feature = "default-tls")]
415            Inner::DefaultTls(http, _tls) => http.set_keepalive_retries(retries),
416            #[cfg(feature = "__rustls")]
417            Inner::RustlsTls { http, .. } => http.set_keepalive_retries(retries),
418            #[cfg(not(feature = "__tls"))]
419            Inner::Http(http) => http.set_keepalive_retries(retries),
420        }
421    }
422
423    #[cfg(feature = "socks")]
424    pub(crate) fn set_socks_resolver(&mut self, resolver: DynResolver) {
425        self.resolver = Some(resolver);
426    }
427
428    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
429    pub(crate) fn set_tcp_user_timeout(&mut self, dur: Option<Duration>) {
430        match &mut self.inner {
431            #[cfg(feature = "default-tls")]
432            Inner::DefaultTls(http, _tls) => http.set_tcp_user_timeout(dur),
433            #[cfg(feature = "__rustls")]
434            Inner::RustlsTls { http, .. } => http.set_tcp_user_timeout(dur),
435            #[cfg(not(feature = "__tls"))]
436            Inner::Http(http) => http.set_tcp_user_timeout(dur),
437        }
438    }
439}
440
441#[allow(missing_debug_implementations)]
442#[derive(Clone)]
443pub(crate) struct ConnectorService {
444    inner: Inner,
445    proxies: Arc<Vec<ProxyMatcher>>,
446    verbose: verbose::Wrapper,
447    /// When there is a single timeout layer and no other layers,
448    /// we embed it directly inside our base Service::call().
449    /// This lets us avoid an extra `Box::pin` indirection layer
450    /// since `tokio::time::Timeout` is `Unpin`
451    simple_timeout: Option<Duration>,
452    #[cfg(feature = "__tls")]
453    nodelay: bool,
454    #[cfg(feature = "__tls")]
455    tls_info: bool,
456    #[cfg(feature = "__tls")]
457    user_agent: Option<HeaderValue>,
458    #[cfg(feature = "socks")]
459    resolver: DynResolver,
460}
461
462#[derive(Clone)]
463enum Inner {
464    #[cfg(not(feature = "__tls"))]
465    Http(HttpConnector),
466    #[cfg(feature = "default-tls")]
467    DefaultTls(HttpConnector, TlsConnector),
468    #[cfg(feature = "__rustls")]
469    RustlsTls {
470        http: HttpConnector,
471        tls: Arc<rustls::ClientConfig>,
472        tls_proxy: Arc<rustls::ClientConfig>,
473    },
474}
475
476impl Inner {
477    #[cfg(feature = "socks")]
478    fn get_http_connector(&mut self) -> &mut crate::connect::HttpConnector {
479        match self {
480            #[cfg(feature = "default-tls")]
481            Inner::DefaultTls(http, _) => http,
482            #[cfg(feature = "__rustls")]
483            Inner::RustlsTls { http, .. } => http,
484            #[cfg(not(feature = "__tls"))]
485            Inner::Http(http) => http,
486        }
487    }
488}
489
490impl ConnectorService {
491    #[cfg(feature = "socks")]
492    async fn connect_socks(mut self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
493        let dns = match proxy.uri().scheme_str() {
494            Some("socks4") | Some("socks5") => socks::DnsResolve::Local,
495            Some("socks4a") | Some("socks5h") => socks::DnsResolve::Proxy,
496            _ => {
497                unreachable!("connect_socks is only called for socks proxies");
498            }
499        };
500
501        match &mut self.inner {
502            #[cfg(feature = "default-tls")]
503            Inner::DefaultTls(http, tls) => {
504                if dst.scheme() == Some(&Scheme::HTTPS) {
505                    let host = dst.host().ok_or("no host in url")?.to_string();
506                    let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
507                    let conn = TokioIo::new(conn);
508                    let conn = TokioIo::new(conn);
509                    let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
510                    let io = tls_connector.connect(&host, conn).await?;
511                    let io = TokioIo::new(io);
512                    return Ok(Conn {
513                        inner: self.verbose.wrap(NativeTlsConn { inner: io }),
514                        is_proxy: false,
515                        tls_info: self.tls_info,
516                    });
517                }
518            }
519            #[cfg(feature = "__rustls")]
520            Inner::RustlsTls { http, tls, .. } => {
521                if dst.scheme() == Some(&Scheme::HTTPS) {
522                    use std::convert::TryFrom;
523                    use tokio_rustls::TlsConnector as RustlsConnector;
524
525                    let tls = tls.clone();
526                    let host = dst.host().ok_or("no host in url")?.to_string();
527                    let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
528                    let conn = TokioIo::new(conn);
529                    let conn = TokioIo::new(conn);
530                    let server_name =
531                        rustls_pki_types::ServerName::try_from(host.as_str().to_owned())
532                            .map_err(|_| "Invalid Server Name")?;
533                    let io = RustlsConnector::from(tls)
534                        .connect(server_name, conn)
535                        .await?;
536                    let io = TokioIo::new(io);
537                    return Ok(Conn {
538                        inner: self.verbose.wrap(RustlsTlsConn { inner: io }),
539                        is_proxy: false,
540                        tls_info: false,
541                    });
542                }
543            }
544            #[cfg(not(feature = "__tls"))]
545            Inner::Http(http) => {
546                let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
547                return Ok(Conn {
548                    inner: self.verbose.wrap(TokioIo::new(conn)),
549                    is_proxy: false,
550                    tls_info: false,
551                });
552            }
553        }
554
555        let resolver = &self.resolver;
556        let http = self.inner.get_http_connector();
557        socks::connect(proxy, dst, dns, resolver, http)
558            .await
559            .map(|tcp| Conn {
560                inner: self.verbose.wrap(TokioIo::new(tcp)),
561                is_proxy: false,
562                tls_info: false,
563            })
564            .map_err(Into::into)
565    }
566
567    async fn connect_with_maybe_proxy(self, dst: Uri, is_proxy: bool) -> Result<Conn, BoxError> {
568        match self.inner {
569            #[cfg(not(feature = "__tls"))]
570            Inner::Http(mut http) => {
571                let io = http.call(dst).await?;
572                Ok(Conn {
573                    inner: self.verbose.wrap(io),
574                    is_proxy,
575                    tls_info: false,
576                })
577            }
578            #[cfg(feature = "default-tls")]
579            Inner::DefaultTls(http, tls) => {
580                let mut http = http.clone();
581
582                // Disable Nagle's algorithm for TLS handshake
583                //
584                // https://www.openssl.org/docs/man1.1.1/man3/SSL_connect.html#NOTES
585                if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
586                    http.set_nodelay(true);
587                }
588
589                let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
590                let mut http = hyper_tls::HttpsConnector::from((http, tls_connector));
591                let io = http.call(dst).await?;
592
593                if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
594                    if !self.nodelay {
595                        stream
596                            .inner()
597                            .get_ref()
598                            .get_ref()
599                            .get_ref()
600                            .inner()
601                            .inner()
602                            .set_nodelay(false)?;
603                    }
604                    Ok(Conn {
605                        inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
606                        is_proxy,
607                        tls_info: self.tls_info,
608                    })
609                } else {
610                    Ok(Conn {
611                        inner: self.verbose.wrap(io),
612                        is_proxy,
613                        tls_info: false,
614                    })
615                }
616            }
617            #[cfg(feature = "__rustls")]
618            Inner::RustlsTls { http, tls, .. } => {
619                let mut http = http.clone();
620
621                // Disable Nagle's algorithm for TLS handshake
622                //
623                // https://www.openssl.org/docs/man1.1.1/man3/SSL_connect.html#NOTES
624                if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
625                    http.set_nodelay(true);
626                }
627
628                let mut http = hyper_rustls::HttpsConnector::from((http, tls.clone()));
629                let io = http.call(dst).await?;
630
631                if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
632                    if !self.nodelay {
633                        let (io, _) = stream.inner().get_ref();
634                        io.inner().inner().set_nodelay(false)?;
635                    }
636                    Ok(Conn {
637                        inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
638                        is_proxy,
639                        tls_info: self.tls_info,
640                    })
641                } else {
642                    Ok(Conn {
643                        inner: self.verbose.wrap(io),
644                        is_proxy,
645                        tls_info: false,
646                    })
647                }
648            }
649        }
650    }
651
652    async fn connect_via_proxy(self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
653        log::debug!("proxy({proxy:?}) intercepts '{dst:?}'");
654
655        #[cfg(feature = "socks")]
656        match proxy.uri().scheme_str().ok_or("proxy scheme expected")? {
657            "socks4" | "socks4a" | "socks5" | "socks5h" => {
658                return self.connect_socks(dst, proxy).await
659            }
660            _ => (),
661        }
662
663        let proxy_dst = proxy.uri().clone();
664        #[cfg(feature = "__tls")]
665        let auth = proxy.basic_auth().cloned();
666
667        #[cfg(feature = "__tls")]
668        let misc = proxy.custom_headers().clone();
669
670        match &self.inner {
671            #[cfg(feature = "default-tls")]
672            Inner::DefaultTls(http, tls) => {
673                if dst.scheme() == Some(&Scheme::HTTPS) {
674                    log::trace!("tunneling HTTPS over proxy");
675                    let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
676                    let inner =
677                        hyper_tls::HttpsConnector::from((http.clone(), tls_connector.clone()));
678                    // TODO: we could cache constructing this
679                    let mut tunnel =
680                        hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
681                    if let Some(auth) = auth {
682                        tunnel = tunnel.with_auth(auth);
683                    }
684                    if let Some(ua) = self.user_agent {
685                        let mut headers = http::HeaderMap::new();
686                        headers.insert(http::header::USER_AGENT, ua);
687                        tunnel = tunnel.with_headers(headers);
688                    }
689                    // Note that custom headers may override the user agent header.
690                    if let Some(custom_headers) = misc {
691                        tunnel = tunnel.with_headers(custom_headers.clone());
692                    }
693                    // We don't wrap this again in an HttpsConnector since that uses Maybe,
694                    // and we know this is definitely HTTPS.
695                    let tunneled = tunnel.call(dst.clone()).await?;
696                    let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
697                    let io = tls_connector
698                        .connect(dst.host().ok_or("no host in url")?, TokioIo::new(tunneled))
699                        .await?;
700                    return Ok(Conn {
701                        inner: self.verbose.wrap(NativeTlsConn {
702                            inner: TokioIo::new(io),
703                        }),
704                        is_proxy: false,
705                        tls_info: false,
706                    });
707                }
708            }
709            #[cfg(feature = "__rustls")]
710            Inner::RustlsTls {
711                http,
712                tls,
713                tls_proxy,
714            } => {
715                if dst.scheme() == Some(&Scheme::HTTPS) {
716                    use rustls_pki_types::ServerName;
717                    use std::convert::TryFrom;
718                    use tokio_rustls::TlsConnector as RustlsConnector;
719
720                    log::trace!("tunneling HTTPS over proxy");
721                    let http = http.clone();
722                    let inner = hyper_rustls::HttpsConnector::from((http, tls_proxy.clone()));
723                    // TODO: we could cache constructing this
724                    let mut tunnel =
725                        hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
726                    if let Some(auth) = auth {
727                        tunnel = tunnel.with_auth(auth);
728                    }
729                    if let Some(custom_headers) = misc {
730                        tunnel = tunnel.with_headers(custom_headers.clone());
731                    }
732                    if let Some(ua) = self.user_agent {
733                        let mut headers = http::HeaderMap::new();
734                        headers.insert(http::header::USER_AGENT, ua);
735                        tunnel = tunnel.with_headers(headers);
736                    }
737                    // We don't wrap this again in an HttpsConnector since that uses Maybe,
738                    // and we know this is definitely HTTPS.
739                    let tunneled = tunnel.call(dst.clone()).await?;
740                    let host = dst.host().ok_or("no host in url")?.to_string();
741                    let server_name = ServerName::try_from(host.as_str().to_owned())
742                        .map_err(|_| "Invalid Server Name")?;
743                    let io = RustlsConnector::from(tls.clone())
744                        .connect(server_name, TokioIo::new(tunneled))
745                        .await?;
746
747                    return Ok(Conn {
748                        inner: self.verbose.wrap(RustlsTlsConn {
749                            inner: TokioIo::new(io),
750                        }),
751                        is_proxy: false,
752                        tls_info: false,
753                    });
754                }
755            }
756            #[cfg(not(feature = "__tls"))]
757            Inner::Http(_) => (),
758        }
759
760        self.connect_with_maybe_proxy(proxy_dst, true).await
761    }
762}
763
764async fn with_timeout<T, F>(f: F, timeout: Option<Duration>) -> Result<T, BoxError>
765where
766    F: Future<Output = Result<T, BoxError>>,
767{
768    if let Some(to) = timeout {
769        match tokio::time::timeout(to, f).await {
770            Err(_elapsed) => Err(Box::new(crate::error::TimedOut) as BoxError),
771            Ok(Ok(try_res)) => Ok(try_res),
772            Ok(Err(e)) => Err(e),
773        }
774    } else {
775        f.await
776    }
777}
778
779impl Service<Uri> for ConnectorService {
780    type Response = Conn;
781    type Error = BoxError;
782    type Future = Connecting;
783
784    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
785        Poll::Ready(Ok(()))
786    }
787
788    fn call(&mut self, dst: Uri) -> Self::Future {
789        log::debug!("starting new connection: {dst:?}");
790        let timeout = self.simple_timeout;
791        for prox in self.proxies.iter() {
792            if let Some(intercepted) = prox.intercept(&dst) {
793                return Box::pin(with_timeout(
794                    self.clone().connect_via_proxy(dst, intercepted),
795                    timeout,
796                ));
797            }
798        }
799
800        Box::pin(with_timeout(
801            self.clone().connect_with_maybe_proxy(dst, false),
802            timeout,
803        ))
804    }
805}
806
807#[cfg(feature = "__tls")]
808trait TlsInfoFactory {
809    fn tls_info(&self) -> Option<crate::tls::TlsInfo>;
810}
811
812#[cfg(feature = "__tls")]
813impl TlsInfoFactory for tokio::net::TcpStream {
814    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
815        None
816    }
817}
818
819#[cfg(feature = "__tls")]
820impl<T: TlsInfoFactory> TlsInfoFactory for TokioIo<T> {
821    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
822        self.inner().tls_info()
823    }
824}
825
826#[cfg(feature = "default-tls")]
827impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
828    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
829        let peer_certificate = self
830            .get_ref()
831            .peer_certificate()
832            .ok()
833            .flatten()
834            .and_then(|c| c.to_der().ok());
835        Some(crate::tls::TlsInfo { peer_certificate })
836    }
837}
838
839#[cfg(feature = "default-tls")]
840impl TlsInfoFactory
841    for tokio_native_tls::TlsStream<
842        TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
843    >
844{
845    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
846        let peer_certificate = self
847            .get_ref()
848            .peer_certificate()
849            .ok()
850            .flatten()
851            .and_then(|c| c.to_der().ok());
852        Some(crate::tls::TlsInfo { peer_certificate })
853    }
854}
855
856#[cfg(feature = "default-tls")]
857impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
858    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
859        match self {
860            hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
861            hyper_tls::MaybeHttpsStream::Http(_) => None,
862        }
863    }
864}
865
866#[cfg(feature = "__rustls")]
867impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
868    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
869        let peer_certificate = self
870            .get_ref()
871            .1
872            .peer_certificates()
873            .and_then(|certs| certs.first())
874            .map(|c| c.to_vec());
875        Some(crate::tls::TlsInfo { peer_certificate })
876    }
877}
878
879#[cfg(feature = "__rustls")]
880impl TlsInfoFactory
881    for tokio_rustls::client::TlsStream<
882        TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
883    >
884{
885    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
886        let peer_certificate = self
887            .get_ref()
888            .1
889            .peer_certificates()
890            .and_then(|certs| certs.first())
891            .map(|c| c.to_vec());
892        Some(crate::tls::TlsInfo { peer_certificate })
893    }
894}
895
896#[cfg(feature = "__rustls")]
897impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
898    fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
899        match self {
900            hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
901            hyper_rustls::MaybeHttpsStream::Http(_) => None,
902        }
903    }
904}
905
906pub(crate) trait AsyncConn:
907    Read + Write + Connection + Send + Sync + Unpin + 'static
908{
909}
910
911impl<T: Read + Write + Connection + Send + Sync + Unpin + 'static> AsyncConn for T {}
912
913#[cfg(feature = "__tls")]
914trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {}
915#[cfg(not(feature = "__tls"))]
916trait AsyncConnWithInfo: AsyncConn {}
917
918#[cfg(feature = "__tls")]
919impl<T: AsyncConn + TlsInfoFactory> AsyncConnWithInfo for T {}
920#[cfg(not(feature = "__tls"))]
921impl<T: AsyncConn> AsyncConnWithInfo for T {}
922
923type BoxConn = Box<dyn AsyncConnWithInfo>;
924
925pub(crate) mod sealed {
926    use super::*;
927    #[derive(Debug)]
928    pub struct Unnameable(pub(super) Uri);
929
930    pin_project! {
931        /// Note: the `is_proxy` member means *is plain text HTTP proxy*.
932        /// This tells hyper whether the URI should be written in
933        /// * origin-form (`GET /just/a/path HTTP/1.1`), when `is_proxy == false`, or
934        /// * absolute-form (`GET http://foo.bar/and/a/path HTTP/1.1`), otherwise.
935        #[allow(missing_debug_implementations)]
936        pub struct Conn {
937            #[pin]
938            pub(super)inner: BoxConn,
939            pub(super) is_proxy: bool,
940            // Only needed for __tls, but #[cfg()] on fields breaks pin_project!
941            pub(super) tls_info: bool,
942        }
943    }
944
945    impl Connection for Conn {
946        fn connected(&self) -> Connected {
947            let connected = self.inner.connected().proxy(self.is_proxy);
948            #[cfg(feature = "__tls")]
949            if self.tls_info {
950                if let Some(tls_info) = self.inner.tls_info() {
951                    connected.extra(tls_info)
952                } else {
953                    connected
954                }
955            } else {
956                connected
957            }
958            #[cfg(not(feature = "__tls"))]
959            connected
960        }
961    }
962
963    impl Read for Conn {
964        fn poll_read(
965            self: Pin<&mut Self>,
966            cx: &mut Context,
967            buf: ReadBufCursor<'_>,
968        ) -> Poll<io::Result<()>> {
969            let this = self.project();
970            Read::poll_read(this.inner, cx, buf)
971        }
972    }
973
974    impl Write for Conn {
975        fn poll_write(
976            self: Pin<&mut Self>,
977            cx: &mut Context,
978            buf: &[u8],
979        ) -> Poll<Result<usize, io::Error>> {
980            let this = self.project();
981            Write::poll_write(this.inner, cx, buf)
982        }
983
984        fn poll_write_vectored(
985            self: Pin<&mut Self>,
986            cx: &mut Context<'_>,
987            bufs: &[IoSlice<'_>],
988        ) -> Poll<Result<usize, io::Error>> {
989            let this = self.project();
990            Write::poll_write_vectored(this.inner, cx, bufs)
991        }
992
993        fn is_write_vectored(&self) -> bool {
994            self.inner.is_write_vectored()
995        }
996
997        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
998            let this = self.project();
999            Write::poll_flush(this.inner, cx)
1000        }
1001
1002        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1003            let this = self.project();
1004            Write::poll_shutdown(this.inner, cx)
1005        }
1006    }
1007}
1008
1009pub(crate) type Connecting = Pin<Box<dyn Future<Output = Result<Conn, BoxError>> + Send>>;
1010
1011#[cfg(feature = "default-tls")]
1012mod native_tls_conn {
1013    use super::TlsInfoFactory;
1014    use hyper::rt::{Read, ReadBufCursor, Write};
1015    use hyper_tls::MaybeHttpsStream;
1016    use hyper_util::client::legacy::connect::{Connected, Connection};
1017    use hyper_util::rt::TokioIo;
1018    use pin_project_lite::pin_project;
1019    use std::{
1020        io::{self, IoSlice},
1021        pin::Pin,
1022        task::{Context, Poll},
1023    };
1024    use tokio::io::{AsyncRead, AsyncWrite};
1025    use tokio::net::TcpStream;
1026    use tokio_native_tls::TlsStream;
1027
1028    pin_project! {
1029        pub(super) struct NativeTlsConn<T> {
1030            #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1031        }
1032    }
1033
1034    impl Connection for NativeTlsConn<TokioIo<TokioIo<TcpStream>>> {
1035        fn connected(&self) -> Connected {
1036            let connected = self
1037                .inner
1038                .inner()
1039                .get_ref()
1040                .get_ref()
1041                .get_ref()
1042                .inner()
1043                .connected();
1044            #[cfg(feature = "native-tls-alpn")]
1045            match self.inner.inner().get_ref().negotiated_alpn().ok() {
1046                Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1047                _ => connected,
1048            }
1049            #[cfg(not(feature = "native-tls-alpn"))]
1050            connected
1051        }
1052    }
1053
1054    impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1055        fn connected(&self) -> Connected {
1056            let connected = self
1057                .inner
1058                .inner()
1059                .get_ref()
1060                .get_ref()
1061                .get_ref()
1062                .inner()
1063                .connected();
1064            #[cfg(feature = "native-tls-alpn")]
1065            match self.inner.inner().get_ref().negotiated_alpn().ok() {
1066                Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1067                _ => connected,
1068            }
1069            #[cfg(not(feature = "native-tls-alpn"))]
1070            connected
1071        }
1072    }
1073
1074    impl<T: AsyncRead + AsyncWrite + Unpin> Read for NativeTlsConn<T> {
1075        fn poll_read(
1076            self: Pin<&mut Self>,
1077            cx: &mut Context,
1078            buf: ReadBufCursor<'_>,
1079        ) -> Poll<tokio::io::Result<()>> {
1080            let this = self.project();
1081            Read::poll_read(this.inner, cx, buf)
1082        }
1083    }
1084
1085    impl<T: AsyncRead + AsyncWrite + Unpin> Write for NativeTlsConn<T> {
1086        fn poll_write(
1087            self: Pin<&mut Self>,
1088            cx: &mut Context,
1089            buf: &[u8],
1090        ) -> Poll<Result<usize, tokio::io::Error>> {
1091            let this = self.project();
1092            Write::poll_write(this.inner, cx, buf)
1093        }
1094
1095        fn poll_write_vectored(
1096            self: Pin<&mut Self>,
1097            cx: &mut Context<'_>,
1098            bufs: &[IoSlice<'_>],
1099        ) -> Poll<Result<usize, io::Error>> {
1100            let this = self.project();
1101            Write::poll_write_vectored(this.inner, cx, bufs)
1102        }
1103
1104        fn is_write_vectored(&self) -> bool {
1105            self.inner.is_write_vectored()
1106        }
1107
1108        fn poll_flush(
1109            self: Pin<&mut Self>,
1110            cx: &mut Context,
1111        ) -> Poll<Result<(), tokio::io::Error>> {
1112            let this = self.project();
1113            Write::poll_flush(this.inner, cx)
1114        }
1115
1116        fn poll_shutdown(
1117            self: Pin<&mut Self>,
1118            cx: &mut Context,
1119        ) -> Poll<Result<(), tokio::io::Error>> {
1120            let this = self.project();
1121            Write::poll_shutdown(this.inner, cx)
1122        }
1123    }
1124
1125    impl<T> TlsInfoFactory for NativeTlsConn<T>
1126    where
1127        TokioIo<TlsStream<T>>: TlsInfoFactory,
1128    {
1129        fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1130            self.inner.tls_info()
1131        }
1132    }
1133}
1134
1135#[cfg(feature = "__rustls")]
1136mod rustls_tls_conn {
1137    use super::TlsInfoFactory;
1138    use hyper::rt::{Read, ReadBufCursor, Write};
1139    use hyper_rustls::MaybeHttpsStream;
1140    use hyper_util::client::legacy::connect::{Connected, Connection};
1141    use hyper_util::rt::TokioIo;
1142    use pin_project_lite::pin_project;
1143    use std::{
1144        io::{self, IoSlice},
1145        pin::Pin,
1146        task::{Context, Poll},
1147    };
1148    use tokio::io::{AsyncRead, AsyncWrite};
1149    use tokio::net::TcpStream;
1150    use tokio_rustls::client::TlsStream;
1151
1152    pin_project! {
1153        pub(super) struct RustlsTlsConn<T> {
1154            #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1155        }
1156    }
1157
1158    impl Connection for RustlsTlsConn<TokioIo<TokioIo<TcpStream>>> {
1159        fn connected(&self) -> Connected {
1160            if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1161                self.inner
1162                    .inner()
1163                    .get_ref()
1164                    .0
1165                    .inner()
1166                    .connected()
1167                    .negotiated_h2()
1168            } else {
1169                self.inner.inner().get_ref().0.inner().connected()
1170            }
1171        }
1172    }
1173    impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1174        fn connected(&self) -> Connected {
1175            if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1176                self.inner
1177                    .inner()
1178                    .get_ref()
1179                    .0
1180                    .inner()
1181                    .connected()
1182                    .negotiated_h2()
1183            } else {
1184                self.inner.inner().get_ref().0.inner().connected()
1185            }
1186        }
1187    }
1188
1189    impl<T: AsyncRead + AsyncWrite + Unpin> Read for RustlsTlsConn<T> {
1190        fn poll_read(
1191            self: Pin<&mut Self>,
1192            cx: &mut Context,
1193            buf: ReadBufCursor<'_>,
1194        ) -> Poll<tokio::io::Result<()>> {
1195            let this = self.project();
1196            Read::poll_read(this.inner, cx, buf)
1197        }
1198    }
1199
1200    impl<T: AsyncRead + AsyncWrite + Unpin> Write for RustlsTlsConn<T> {
1201        fn poll_write(
1202            self: Pin<&mut Self>,
1203            cx: &mut Context,
1204            buf: &[u8],
1205        ) -> Poll<Result<usize, tokio::io::Error>> {
1206            let this = self.project();
1207            Write::poll_write(this.inner, cx, buf)
1208        }
1209
1210        fn poll_write_vectored(
1211            self: Pin<&mut Self>,
1212            cx: &mut Context<'_>,
1213            bufs: &[IoSlice<'_>],
1214        ) -> Poll<Result<usize, io::Error>> {
1215            let this = self.project();
1216            Write::poll_write_vectored(this.inner, cx, bufs)
1217        }
1218
1219        fn is_write_vectored(&self) -> bool {
1220            self.inner.is_write_vectored()
1221        }
1222
1223        fn poll_flush(
1224            self: Pin<&mut Self>,
1225            cx: &mut Context,
1226        ) -> Poll<Result<(), tokio::io::Error>> {
1227            let this = self.project();
1228            Write::poll_flush(this.inner, cx)
1229        }
1230
1231        fn poll_shutdown(
1232            self: Pin<&mut Self>,
1233            cx: &mut Context,
1234        ) -> Poll<Result<(), tokio::io::Error>> {
1235            let this = self.project();
1236            Write::poll_shutdown(this.inner, cx)
1237        }
1238    }
1239    impl<T> TlsInfoFactory for RustlsTlsConn<T>
1240    where
1241        TokioIo<TlsStream<T>>: TlsInfoFactory,
1242    {
1243        fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1244            self.inner.tls_info()
1245        }
1246    }
1247}
1248
1249#[cfg(feature = "socks")]
1250mod socks {
1251    use tower_service::Service;
1252
1253    use http::uri::Scheme;
1254    use http::Uri;
1255    use hyper_util::client::legacy::connect::proxy::{SocksV4, SocksV5};
1256    use tokio::net::TcpStream;
1257
1258    use super::BoxError;
1259    use crate::proxy::Intercepted;
1260
1261    pub(super) enum DnsResolve {
1262        Local,
1263        Proxy,
1264    }
1265
1266    #[derive(Debug)]
1267    pub(super) enum SocksProxyError {
1268        SocksNoHostInUrl,
1269        SocksLocalResolve(BoxError),
1270        SocksConnect(BoxError),
1271    }
1272
1273    pub(super) async fn connect(
1274        proxy: Intercepted,
1275        dst: Uri,
1276        dns_mode: DnsResolve,
1277        resolver: &crate::dns::DynResolver,
1278        http_connector: &mut crate::connect::HttpConnector,
1279    ) -> Result<TcpStream, SocksProxyError> {
1280        let https = dst.scheme() == Some(&Scheme::HTTPS);
1281        let original_host = dst.host().ok_or(SocksProxyError::SocksNoHostInUrl)?;
1282        let mut host = original_host.to_owned();
1283        let port = match dst.port() {
1284            Some(p) => p.as_u16(),
1285            None if https => 443u16,
1286            _ => 80u16,
1287        };
1288
1289        if let DnsResolve::Local = dns_mode {
1290            let maybe_new_target = resolver
1291                .http_resolve(&dst)
1292                .await
1293                .map_err(SocksProxyError::SocksLocalResolve)?
1294                .next();
1295            if let Some(new_target) = maybe_new_target {
1296                log::trace!("socks local dns resolved {new_target:?}");
1297                // If the resolved IP is IPv6, wrap it in brackets for URI formatting
1298                let ip = new_target.ip();
1299                if ip.is_ipv6() {
1300                    host = format!("[{}]", ip);
1301                } else {
1302                    host = ip.to_string();
1303                }
1304            }
1305        }
1306
1307        let proxy_uri = proxy.uri().clone();
1308        // Build a Uri for the destination
1309        let dst_uri = format!(
1310            "{}://{}:{}",
1311            if https { "https" } else { "http" },
1312            host,
1313            port
1314        )
1315        .parse::<Uri>()
1316        .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1317
1318        // TODO: can `Scheme::from_static()` be const fn, compare with a SOCKS5 constant?
1319        match proxy.uri().scheme_str() {
1320            Some("socks4") | Some("socks4a") => {
1321                let mut svc = SocksV4::new(proxy_uri, http_connector);
1322                let stream = Service::call(&mut svc, dst_uri)
1323                    .await
1324                    .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1325                Ok(stream.into_inner())
1326            }
1327            Some("socks5") | Some("socks5h") => {
1328                let mut svc = if let Some((username, password)) = proxy.raw_auth() {
1329                    SocksV5::new(proxy_uri, http_connector)
1330                        .with_auth(username.to_string(), password.to_string())
1331                } else {
1332                    SocksV5::new(proxy_uri, http_connector)
1333                };
1334                let stream = Service::call(&mut svc, dst_uri)
1335                    .await
1336                    .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1337                Ok(stream.into_inner())
1338            }
1339            _ => unreachable!(),
1340        }
1341    }
1342
1343    impl std::fmt::Display for SocksProxyError {
1344        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1345            match self {
1346                Self::SocksNoHostInUrl => f.write_str("socks proxy destination has no host"),
1347                Self::SocksLocalResolve(_) => f.write_str("error resolving for socks proxy"),
1348                Self::SocksConnect(_) => f.write_str("error connecting to socks proxy"),
1349            }
1350        }
1351    }
1352
1353    impl std::error::Error for SocksProxyError {
1354        fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1355            match self {
1356                Self::SocksNoHostInUrl => None,
1357                Self::SocksLocalResolve(ref e) => Some(&**e),
1358                Self::SocksConnect(ref e) => Some(&**e),
1359            }
1360        }
1361    }
1362}
1363
1364mod verbose {
1365    use crate::util::Escape;
1366    use hyper::rt::{Read, ReadBufCursor, Write};
1367    use hyper_util::client::legacy::connect::{Connected, Connection};
1368    use std::cmp::min;
1369    use std::fmt;
1370    use std::io::{self, IoSlice};
1371    use std::pin::Pin;
1372    use std::task::{Context, Poll};
1373
1374    pub(super) const OFF: Wrapper = Wrapper(false);
1375
1376    #[derive(Clone, Copy)]
1377    pub(super) struct Wrapper(pub(super) bool);
1378
1379    impl Wrapper {
1380        pub(super) fn wrap<T: super::AsyncConnWithInfo>(&self, conn: T) -> super::BoxConn {
1381            if self.0 && log::log_enabled!(log::Level::Trace) {
1382                Box::new(Verbose {
1383                    // truncate is fine
1384                    id: crate::util::fast_random() as u32,
1385                    inner: conn,
1386                })
1387            } else {
1388                Box::new(conn)
1389            }
1390        }
1391    }
1392
1393    struct Verbose<T> {
1394        id: u32,
1395        inner: T,
1396    }
1397
1398    impl<T: Connection + Read + Write + Unpin> Connection for Verbose<T> {
1399        fn connected(&self) -> Connected {
1400            self.inner.connected()
1401        }
1402    }
1403
1404    impl<T: Read + Write + Unpin> Read for Verbose<T> {
1405        fn poll_read(
1406            mut self: Pin<&mut Self>,
1407            cx: &mut Context,
1408            mut buf: ReadBufCursor<'_>,
1409        ) -> Poll<std::io::Result<()>> {
1410            // TODO: This _does_ forget the `init` len, so it could result in
1411            // re-initializing twice. Needs upstream support, perhaps.
1412            // SAFETY: Passing to a ReadBuf will never de-initialize any bytes.
1413            let mut vbuf = hyper::rt::ReadBuf::uninit(unsafe { buf.as_mut() });
1414            match Pin::new(&mut self.inner).poll_read(cx, vbuf.unfilled()) {
1415                Poll::Ready(Ok(())) => {
1416                    log::trace!("{:08x} read: {:?}", self.id, Escape::new(vbuf.filled()));
1417                    let len = vbuf.filled().len();
1418                    // SAFETY: The two cursors were for the same buffer. What was
1419                    // filled in one is safe in the other.
1420                    unsafe {
1421                        buf.advance(len);
1422                    }
1423                    Poll::Ready(Ok(()))
1424                }
1425                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1426                Poll::Pending => Poll::Pending,
1427            }
1428        }
1429    }
1430
1431    impl<T: Read + Write + Unpin> Write for Verbose<T> {
1432        fn poll_write(
1433            mut self: Pin<&mut Self>,
1434            cx: &mut Context,
1435            buf: &[u8],
1436        ) -> Poll<Result<usize, std::io::Error>> {
1437            match Pin::new(&mut self.inner).poll_write(cx, buf) {
1438                Poll::Ready(Ok(n)) => {
1439                    log::trace!("{:08x} write: {:?}", self.id, Escape::new(&buf[..n]));
1440                    Poll::Ready(Ok(n))
1441                }
1442                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1443                Poll::Pending => Poll::Pending,
1444            }
1445        }
1446
1447        fn poll_write_vectored(
1448            mut self: Pin<&mut Self>,
1449            cx: &mut Context<'_>,
1450            bufs: &[IoSlice<'_>],
1451        ) -> Poll<Result<usize, io::Error>> {
1452            match Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) {
1453                Poll::Ready(Ok(nwritten)) => {
1454                    log::trace!(
1455                        "{:08x} write (vectored): {:?}",
1456                        self.id,
1457                        Vectored { bufs, nwritten }
1458                    );
1459                    Poll::Ready(Ok(nwritten))
1460                }
1461                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1462                Poll::Pending => Poll::Pending,
1463            }
1464        }
1465
1466        fn is_write_vectored(&self) -> bool {
1467            self.inner.is_write_vectored()
1468        }
1469
1470        fn poll_flush(
1471            mut self: Pin<&mut Self>,
1472            cx: &mut Context,
1473        ) -> Poll<Result<(), std::io::Error>> {
1474            Pin::new(&mut self.inner).poll_flush(cx)
1475        }
1476
1477        fn poll_shutdown(
1478            mut self: Pin<&mut Self>,
1479            cx: &mut Context,
1480        ) -> Poll<Result<(), std::io::Error>> {
1481            Pin::new(&mut self.inner).poll_shutdown(cx)
1482        }
1483    }
1484
1485    #[cfg(feature = "__tls")]
1486    impl<T: super::TlsInfoFactory> super::TlsInfoFactory for Verbose<T> {
1487        fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1488            self.inner.tls_info()
1489        }
1490    }
1491
1492    struct Vectored<'a, 'b> {
1493        bufs: &'a [IoSlice<'b>],
1494        nwritten: usize,
1495    }
1496
1497    impl fmt::Debug for Vectored<'_, '_> {
1498        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1499            let mut left = self.nwritten;
1500            for buf in self.bufs.iter() {
1501                if left == 0 {
1502                    break;
1503                }
1504                let n = min(left, buf.len());
1505                Escape::new(&buf[..n]).fmt(f)?;
1506                left -= n;
1507            }
1508            Ok(())
1509        }
1510    }
1511}