1#[cfg(feature = "__tls")]
2use http::header::HeaderValue;
3#[cfg(feature = "__tls")]
4use http::uri::Scheme;
5use http::Uri;
6use hyper::rt::{Read, ReadBufCursor, Write};
7use hyper_util::client::legacy::connect::{Connected, Connection};
8#[cfg(any(feature = "socks", feature = "__tls"))]
9use hyper_util::rt::TokioIo;
10#[cfg(feature = "default-tls")]
11use native_tls_crate::{TlsConnector, TlsConnectorBuilder};
12use pin_project_lite::pin_project;
13use tower::util::{BoxCloneSyncServiceLayer, MapRequestLayer};
14use tower::{timeout::TimeoutLayer, util::BoxCloneSyncService, ServiceBuilder};
15use tower_service::Service;
16
17use std::future::Future;
18use std::io::{self, IoSlice};
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25#[cfg(feature = "default-tls")]
26use self::native_tls_conn::NativeTlsConn;
27#[cfg(feature = "__rustls")]
28use self::rustls_tls_conn::RustlsTlsConn;
29use crate::dns::DynResolver;
30use crate::error::{cast_to_internal_error, BoxError};
31use crate::proxy::{Intercepted, Matcher as ProxyMatcher};
32use sealed::{Conn, Unnameable};
33
34pub(crate) type HttpConnector = hyper_util::client::legacy::connect::HttpConnector<DynResolver>;
35
36#[derive(Clone)]
37pub(crate) enum Connector {
38 Simple(ConnectorService),
40 WithLayers(BoxCloneSyncService<Unnameable, Conn, BoxError>),
43}
44
45impl Service<Uri> for Connector {
46 type Response = Conn;
47 type Error = BoxError;
48 type Future = Connecting;
49
50 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51 match self {
52 Connector::Simple(service) => service.poll_ready(cx),
53 Connector::WithLayers(service) => service.poll_ready(cx),
54 }
55 }
56
57 fn call(&mut self, dst: Uri) -> Self::Future {
58 match self {
59 Connector::Simple(service) => service.call(dst),
60 Connector::WithLayers(service) => service.call(Unnameable(dst)),
61 }
62 }
63}
64
65pub(crate) type BoxedConnectorService = BoxCloneSyncService<Unnameable, Conn, BoxError>;
66
67pub(crate) type BoxedConnectorLayer =
68 BoxCloneSyncServiceLayer<BoxedConnectorService, Unnameable, Conn, BoxError>;
69
70pub(crate) struct ConnectorBuilder {
71 inner: Inner,
72 proxies: Arc<Vec<ProxyMatcher>>,
73 verbose: verbose::Wrapper,
74 timeout: Option<Duration>,
75 #[cfg(feature = "__tls")]
76 nodelay: bool,
77 #[cfg(feature = "__tls")]
78 tls_info: bool,
79 #[cfg(feature = "__tls")]
80 user_agent: Option<HeaderValue>,
81 #[cfg(feature = "socks")]
82 resolver: Option<DynResolver>,
83}
84
85impl ConnectorBuilder {
86 pub(crate) fn build(self, layers: Vec<BoxedConnectorLayer>) -> Connector
87where {
88 let mut base_service = ConnectorService {
90 inner: self.inner,
91 proxies: self.proxies,
92 verbose: self.verbose,
93 #[cfg(feature = "__tls")]
94 nodelay: self.nodelay,
95 #[cfg(feature = "__tls")]
96 tls_info: self.tls_info,
97 #[cfg(feature = "__tls")]
98 user_agent: self.user_agent,
99 simple_timeout: None,
100 #[cfg(feature = "socks")]
101 resolver: self.resolver.unwrap_or_else(DynResolver::gai),
102 };
103
104 if layers.is_empty() {
105 base_service.simple_timeout = self.timeout;
107 return Connector::Simple(base_service);
108 }
109
110 let unnameable_service = ServiceBuilder::new()
114 .layer(MapRequestLayer::new(|request: Unnameable| request.0))
115 .service(base_service);
116 let mut service = BoxCloneSyncService::new(unnameable_service);
117
118 for layer in layers {
119 service = ServiceBuilder::new().layer(layer).service(service);
120 }
121
122 match self.timeout {
126 Some(timeout) => {
127 let service = ServiceBuilder::new()
128 .layer(TimeoutLayer::new(timeout))
129 .service(service);
130 let service = ServiceBuilder::new()
131 .map_err(|error: BoxError| cast_to_internal_error(error))
132 .service(service);
133 let service = BoxCloneSyncService::new(service);
134
135 Connector::WithLayers(service)
136 }
137 None => {
138 let service = ServiceBuilder::new().service(service);
142 let service = ServiceBuilder::new()
143 .map_err(|error: BoxError| cast_to_internal_error(error))
144 .service(service);
145 let service = BoxCloneSyncService::new(service);
146 Connector::WithLayers(service)
147 }
148 }
149 }
150
151 #[cfg(not(feature = "__tls"))]
152 pub(crate) fn new<T>(
153 mut http: HttpConnector,
154 proxies: Arc<Vec<ProxyMatcher>>,
155 local_addr: T,
156 #[cfg(any(
157 target_os = "android",
158 target_os = "fuchsia",
159 target_os = "illumos",
160 target_os = "ios",
161 target_os = "linux",
162 target_os = "macos",
163 target_os = "solaris",
164 target_os = "tvos",
165 target_os = "visionos",
166 target_os = "watchos",
167 ))]
168 interface: Option<&str>,
169 nodelay: bool,
170 ) -> ConnectorBuilder
171 where
172 T: Into<Option<IpAddr>>,
173 {
174 http.set_local_address(local_addr.into());
175 #[cfg(any(
176 target_os = "android",
177 target_os = "fuchsia",
178 target_os = "illumos",
179 target_os = "ios",
180 target_os = "linux",
181 target_os = "macos",
182 target_os = "solaris",
183 target_os = "tvos",
184 target_os = "visionos",
185 target_os = "watchos",
186 ))]
187 if let Some(interface) = interface {
188 http.set_interface(interface.to_owned());
189 }
190 http.set_nodelay(nodelay);
191
192 ConnectorBuilder {
193 inner: Inner::Http(http),
194 proxies,
195 verbose: verbose::OFF,
196 timeout: None,
197 #[cfg(feature = "socks")]
198 resolver: None,
199 }
200 }
201
202 #[cfg(feature = "default-tls")]
203 pub(crate) fn new_default_tls<T>(
204 http: HttpConnector,
205 tls: TlsConnectorBuilder,
206 proxies: Arc<Vec<ProxyMatcher>>,
207 user_agent: Option<HeaderValue>,
208 local_addr: T,
209 #[cfg(any(
210 target_os = "android",
211 target_os = "fuchsia",
212 target_os = "illumos",
213 target_os = "ios",
214 target_os = "linux",
215 target_os = "macos",
216 target_os = "solaris",
217 target_os = "tvos",
218 target_os = "visionos",
219 target_os = "watchos",
220 ))]
221 interface: Option<&str>,
222 nodelay: bool,
223 tls_info: bool,
224 ) -> crate::Result<ConnectorBuilder>
225 where
226 T: Into<Option<IpAddr>>,
227 {
228 let tls = tls.build().map_err(crate::error::builder)?;
229 Ok(Self::from_built_default_tls(
230 http,
231 tls,
232 proxies,
233 user_agent,
234 local_addr,
235 #[cfg(any(
236 target_os = "android",
237 target_os = "fuchsia",
238 target_os = "illumos",
239 target_os = "ios",
240 target_os = "linux",
241 target_os = "macos",
242 target_os = "solaris",
243 target_os = "tvos",
244 target_os = "visionos",
245 target_os = "watchos",
246 ))]
247 interface,
248 nodelay,
249 tls_info,
250 ))
251 }
252
253 #[cfg(feature = "default-tls")]
254 pub(crate) fn from_built_default_tls<T>(
255 mut http: HttpConnector,
256 tls: TlsConnector,
257 proxies: Arc<Vec<ProxyMatcher>>,
258 user_agent: Option<HeaderValue>,
259 local_addr: T,
260 #[cfg(any(
261 target_os = "android",
262 target_os = "fuchsia",
263 target_os = "illumos",
264 target_os = "ios",
265 target_os = "linux",
266 target_os = "macos",
267 target_os = "solaris",
268 target_os = "tvos",
269 target_os = "visionos",
270 target_os = "watchos",
271 ))]
272 interface: Option<&str>,
273 nodelay: bool,
274 tls_info: bool,
275 ) -> ConnectorBuilder
276 where
277 T: Into<Option<IpAddr>>,
278 {
279 http.set_local_address(local_addr.into());
280 #[cfg(any(
281 target_os = "android",
282 target_os = "fuchsia",
283 target_os = "illumos",
284 target_os = "ios",
285 target_os = "linux",
286 target_os = "macos",
287 target_os = "solaris",
288 target_os = "tvos",
289 target_os = "visionos",
290 target_os = "watchos",
291 ))]
292 if let Some(interface) = interface {
293 http.set_interface(interface);
294 }
295 http.set_nodelay(nodelay);
296 http.enforce_http(false);
297
298 ConnectorBuilder {
299 inner: Inner::DefaultTls(http, tls),
300 proxies,
301 verbose: verbose::OFF,
302 nodelay,
303 tls_info,
304 user_agent,
305 timeout: None,
306 #[cfg(feature = "socks")]
307 resolver: None,
308 }
309 }
310
311 #[cfg(feature = "__rustls")]
312 pub(crate) fn new_rustls_tls<T>(
313 mut http: HttpConnector,
314 tls: rustls::ClientConfig,
315 proxies: Arc<Vec<ProxyMatcher>>,
316 user_agent: Option<HeaderValue>,
317 local_addr: T,
318 #[cfg(any(
319 target_os = "android",
320 target_os = "fuchsia",
321 target_os = "illumos",
322 target_os = "ios",
323 target_os = "linux",
324 target_os = "macos",
325 target_os = "solaris",
326 target_os = "tvos",
327 target_os = "visionos",
328 target_os = "watchos",
329 ))]
330 interface: Option<&str>,
331 nodelay: bool,
332 tls_info: bool,
333 ) -> ConnectorBuilder
334 where
335 T: Into<Option<IpAddr>>,
336 {
337 http.set_local_address(local_addr.into());
338 #[cfg(any(
339 target_os = "android",
340 target_os = "fuchsia",
341 target_os = "illumos",
342 target_os = "ios",
343 target_os = "linux",
344 target_os = "macos",
345 target_os = "solaris",
346 target_os = "tvos",
347 target_os = "visionos",
348 target_os = "watchos",
349 ))]
350 if let Some(interface) = interface {
351 http.set_interface(interface.to_owned());
352 }
353 http.set_nodelay(nodelay);
354 http.enforce_http(false);
355
356 let (tls, tls_proxy) = if proxies.is_empty() {
357 let tls = Arc::new(tls);
358 (tls.clone(), tls)
359 } else {
360 let mut tls_proxy = tls.clone();
361 tls_proxy.alpn_protocols.clear();
362 (Arc::new(tls), Arc::new(tls_proxy))
363 };
364
365 ConnectorBuilder {
366 inner: Inner::RustlsTls {
367 http,
368 tls,
369 tls_proxy,
370 },
371 proxies,
372 verbose: verbose::OFF,
373 nodelay,
374 tls_info,
375 user_agent,
376 timeout: None,
377 #[cfg(feature = "socks")]
378 resolver: None,
379 }
380 }
381
382 pub(crate) fn set_timeout(&mut self, timeout: Option<Duration>) {
383 self.timeout = timeout;
384 }
385
386 pub(crate) fn set_verbose(&mut self, enabled: bool) {
387 self.verbose.0 = enabled;
388 }
389
390 pub(crate) fn set_keepalive(&mut self, dur: Option<Duration>) {
391 match &mut self.inner {
392 #[cfg(feature = "default-tls")]
393 Inner::DefaultTls(http, _tls) => http.set_keepalive(dur),
394 #[cfg(feature = "__rustls")]
395 Inner::RustlsTls { http, .. } => http.set_keepalive(dur),
396 #[cfg(not(feature = "__tls"))]
397 Inner::Http(http) => http.set_keepalive(dur),
398 }
399 }
400
401 pub(crate) fn set_keepalive_interval(&mut self, dur: Option<Duration>) {
402 match &mut self.inner {
403 #[cfg(feature = "default-tls")]
404 Inner::DefaultTls(http, _tls) => http.set_keepalive_interval(dur),
405 #[cfg(feature = "__rustls")]
406 Inner::RustlsTls { http, .. } => http.set_keepalive_interval(dur),
407 #[cfg(not(feature = "__tls"))]
408 Inner::Http(http) => http.set_keepalive_interval(dur),
409 }
410 }
411
412 pub(crate) fn set_keepalive_retries(&mut self, retries: Option<u32>) {
413 match &mut self.inner {
414 #[cfg(feature = "default-tls")]
415 Inner::DefaultTls(http, _tls) => http.set_keepalive_retries(retries),
416 #[cfg(feature = "__rustls")]
417 Inner::RustlsTls { http, .. } => http.set_keepalive_retries(retries),
418 #[cfg(not(feature = "__tls"))]
419 Inner::Http(http) => http.set_keepalive_retries(retries),
420 }
421 }
422
423 #[cfg(feature = "socks")]
424 pub(crate) fn set_socks_resolver(&mut self, resolver: DynResolver) {
425 self.resolver = Some(resolver);
426 }
427
428 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
429 pub(crate) fn set_tcp_user_timeout(&mut self, dur: Option<Duration>) {
430 match &mut self.inner {
431 #[cfg(feature = "default-tls")]
432 Inner::DefaultTls(http, _tls) => http.set_tcp_user_timeout(dur),
433 #[cfg(feature = "__rustls")]
434 Inner::RustlsTls { http, .. } => http.set_tcp_user_timeout(dur),
435 #[cfg(not(feature = "__tls"))]
436 Inner::Http(http) => http.set_tcp_user_timeout(dur),
437 }
438 }
439}
440
441#[allow(missing_debug_implementations)]
442#[derive(Clone)]
443pub(crate) struct ConnectorService {
444 inner: Inner,
445 proxies: Arc<Vec<ProxyMatcher>>,
446 verbose: verbose::Wrapper,
447 simple_timeout: Option<Duration>,
452 #[cfg(feature = "__tls")]
453 nodelay: bool,
454 #[cfg(feature = "__tls")]
455 tls_info: bool,
456 #[cfg(feature = "__tls")]
457 user_agent: Option<HeaderValue>,
458 #[cfg(feature = "socks")]
459 resolver: DynResolver,
460}
461
462#[derive(Clone)]
463enum Inner {
464 #[cfg(not(feature = "__tls"))]
465 Http(HttpConnector),
466 #[cfg(feature = "default-tls")]
467 DefaultTls(HttpConnector, TlsConnector),
468 #[cfg(feature = "__rustls")]
469 RustlsTls {
470 http: HttpConnector,
471 tls: Arc<rustls::ClientConfig>,
472 tls_proxy: Arc<rustls::ClientConfig>,
473 },
474}
475
476impl Inner {
477 #[cfg(feature = "socks")]
478 fn get_http_connector(&mut self) -> &mut crate::connect::HttpConnector {
479 match self {
480 #[cfg(feature = "default-tls")]
481 Inner::DefaultTls(http, _) => http,
482 #[cfg(feature = "__rustls")]
483 Inner::RustlsTls { http, .. } => http,
484 #[cfg(not(feature = "__tls"))]
485 Inner::Http(http) => http,
486 }
487 }
488}
489
490impl ConnectorService {
491 #[cfg(feature = "socks")]
492 async fn connect_socks(mut self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
493 let dns = match proxy.uri().scheme_str() {
494 Some("socks4") | Some("socks5") => socks::DnsResolve::Local,
495 Some("socks4a") | Some("socks5h") => socks::DnsResolve::Proxy,
496 _ => {
497 unreachable!("connect_socks is only called for socks proxies");
498 }
499 };
500
501 match &mut self.inner {
502 #[cfg(feature = "default-tls")]
503 Inner::DefaultTls(http, tls) => {
504 if dst.scheme() == Some(&Scheme::HTTPS) {
505 let host = dst.host().ok_or("no host in url")?.to_string();
506 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
507 let conn = TokioIo::new(conn);
508 let conn = TokioIo::new(conn);
509 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
510 let io = tls_connector.connect(&host, conn).await?;
511 let io = TokioIo::new(io);
512 return Ok(Conn {
513 inner: self.verbose.wrap(NativeTlsConn { inner: io }),
514 is_proxy: false,
515 tls_info: self.tls_info,
516 });
517 }
518 }
519 #[cfg(feature = "__rustls")]
520 Inner::RustlsTls { http, tls, .. } => {
521 if dst.scheme() == Some(&Scheme::HTTPS) {
522 use std::convert::TryFrom;
523 use tokio_rustls::TlsConnector as RustlsConnector;
524
525 let tls = tls.clone();
526 let host = dst.host().ok_or("no host in url")?.to_string();
527 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
528 let conn = TokioIo::new(conn);
529 let conn = TokioIo::new(conn);
530 let server_name =
531 rustls_pki_types::ServerName::try_from(host.as_str().to_owned())
532 .map_err(|_| "Invalid Server Name")?;
533 let io = RustlsConnector::from(tls)
534 .connect(server_name, conn)
535 .await?;
536 let io = TokioIo::new(io);
537 return Ok(Conn {
538 inner: self.verbose.wrap(RustlsTlsConn { inner: io }),
539 is_proxy: false,
540 tls_info: false,
541 });
542 }
543 }
544 #[cfg(not(feature = "__tls"))]
545 Inner::Http(http) => {
546 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
547 return Ok(Conn {
548 inner: self.verbose.wrap(TokioIo::new(conn)),
549 is_proxy: false,
550 tls_info: false,
551 });
552 }
553 }
554
555 let resolver = &self.resolver;
556 let http = self.inner.get_http_connector();
557 socks::connect(proxy, dst, dns, resolver, http)
558 .await
559 .map(|tcp| Conn {
560 inner: self.verbose.wrap(TokioIo::new(tcp)),
561 is_proxy: false,
562 tls_info: false,
563 })
564 .map_err(Into::into)
565 }
566
567 async fn connect_with_maybe_proxy(self, dst: Uri, is_proxy: bool) -> Result<Conn, BoxError> {
568 match self.inner {
569 #[cfg(not(feature = "__tls"))]
570 Inner::Http(mut http) => {
571 let io = http.call(dst).await?;
572 Ok(Conn {
573 inner: self.verbose.wrap(io),
574 is_proxy,
575 tls_info: false,
576 })
577 }
578 #[cfg(feature = "default-tls")]
579 Inner::DefaultTls(http, tls) => {
580 let mut http = http.clone();
581
582 if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
586 http.set_nodelay(true);
587 }
588
589 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
590 let mut http = hyper_tls::HttpsConnector::from((http, tls_connector));
591 let io = http.call(dst).await?;
592
593 if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
594 if !self.nodelay {
595 stream
596 .inner()
597 .get_ref()
598 .get_ref()
599 .get_ref()
600 .inner()
601 .inner()
602 .set_nodelay(false)?;
603 }
604 Ok(Conn {
605 inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
606 is_proxy,
607 tls_info: self.tls_info,
608 })
609 } else {
610 Ok(Conn {
611 inner: self.verbose.wrap(io),
612 is_proxy,
613 tls_info: false,
614 })
615 }
616 }
617 #[cfg(feature = "__rustls")]
618 Inner::RustlsTls { http, tls, .. } => {
619 let mut http = http.clone();
620
621 if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
625 http.set_nodelay(true);
626 }
627
628 let mut http = hyper_rustls::HttpsConnector::from((http, tls.clone()));
629 let io = http.call(dst).await?;
630
631 if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
632 if !self.nodelay {
633 let (io, _) = stream.inner().get_ref();
634 io.inner().inner().set_nodelay(false)?;
635 }
636 Ok(Conn {
637 inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
638 is_proxy,
639 tls_info: self.tls_info,
640 })
641 } else {
642 Ok(Conn {
643 inner: self.verbose.wrap(io),
644 is_proxy,
645 tls_info: false,
646 })
647 }
648 }
649 }
650 }
651
652 async fn connect_via_proxy(self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
653 log::debug!("proxy({proxy:?}) intercepts '{dst:?}'");
654
655 #[cfg(feature = "socks")]
656 match proxy.uri().scheme_str().ok_or("proxy scheme expected")? {
657 "socks4" | "socks4a" | "socks5" | "socks5h" => {
658 return self.connect_socks(dst, proxy).await
659 }
660 _ => (),
661 }
662
663 let proxy_dst = proxy.uri().clone();
664 #[cfg(feature = "__tls")]
665 let auth = proxy.basic_auth().cloned();
666
667 #[cfg(feature = "__tls")]
668 let misc = proxy.custom_headers().clone();
669
670 match &self.inner {
671 #[cfg(feature = "default-tls")]
672 Inner::DefaultTls(http, tls) => {
673 if dst.scheme() == Some(&Scheme::HTTPS) {
674 log::trace!("tunneling HTTPS over proxy");
675 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
676 let inner =
677 hyper_tls::HttpsConnector::from((http.clone(), tls_connector.clone()));
678 let mut tunnel =
680 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
681 if let Some(auth) = auth {
682 tunnel = tunnel.with_auth(auth);
683 }
684 if let Some(ua) = self.user_agent {
685 let mut headers = http::HeaderMap::new();
686 headers.insert(http::header::USER_AGENT, ua);
687 tunnel = tunnel.with_headers(headers);
688 }
689 if let Some(custom_headers) = misc {
691 tunnel = tunnel.with_headers(custom_headers.clone());
692 }
693 let tunneled = tunnel.call(dst.clone()).await?;
696 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
697 let io = tls_connector
698 .connect(dst.host().ok_or("no host in url")?, TokioIo::new(tunneled))
699 .await?;
700 return Ok(Conn {
701 inner: self.verbose.wrap(NativeTlsConn {
702 inner: TokioIo::new(io),
703 }),
704 is_proxy: false,
705 tls_info: false,
706 });
707 }
708 }
709 #[cfg(feature = "__rustls")]
710 Inner::RustlsTls {
711 http,
712 tls,
713 tls_proxy,
714 } => {
715 if dst.scheme() == Some(&Scheme::HTTPS) {
716 use rustls_pki_types::ServerName;
717 use std::convert::TryFrom;
718 use tokio_rustls::TlsConnector as RustlsConnector;
719
720 log::trace!("tunneling HTTPS over proxy");
721 let http = http.clone();
722 let inner = hyper_rustls::HttpsConnector::from((http, tls_proxy.clone()));
723 let mut tunnel =
725 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
726 if let Some(auth) = auth {
727 tunnel = tunnel.with_auth(auth);
728 }
729 if let Some(custom_headers) = misc {
730 tunnel = tunnel.with_headers(custom_headers.clone());
731 }
732 if let Some(ua) = self.user_agent {
733 let mut headers = http::HeaderMap::new();
734 headers.insert(http::header::USER_AGENT, ua);
735 tunnel = tunnel.with_headers(headers);
736 }
737 let tunneled = tunnel.call(dst.clone()).await?;
740 let host = dst.host().ok_or("no host in url")?.to_string();
741 let server_name = ServerName::try_from(host.as_str().to_owned())
742 .map_err(|_| "Invalid Server Name")?;
743 let io = RustlsConnector::from(tls.clone())
744 .connect(server_name, TokioIo::new(tunneled))
745 .await?;
746
747 return Ok(Conn {
748 inner: self.verbose.wrap(RustlsTlsConn {
749 inner: TokioIo::new(io),
750 }),
751 is_proxy: false,
752 tls_info: false,
753 });
754 }
755 }
756 #[cfg(not(feature = "__tls"))]
757 Inner::Http(_) => (),
758 }
759
760 self.connect_with_maybe_proxy(proxy_dst, true).await
761 }
762}
763
764async fn with_timeout<T, F>(f: F, timeout: Option<Duration>) -> Result<T, BoxError>
765where
766 F: Future<Output = Result<T, BoxError>>,
767{
768 if let Some(to) = timeout {
769 match tokio::time::timeout(to, f).await {
770 Err(_elapsed) => Err(Box::new(crate::error::TimedOut) as BoxError),
771 Ok(Ok(try_res)) => Ok(try_res),
772 Ok(Err(e)) => Err(e),
773 }
774 } else {
775 f.await
776 }
777}
778
779impl Service<Uri> for ConnectorService {
780 type Response = Conn;
781 type Error = BoxError;
782 type Future = Connecting;
783
784 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
785 Poll::Ready(Ok(()))
786 }
787
788 fn call(&mut self, dst: Uri) -> Self::Future {
789 log::debug!("starting new connection: {dst:?}");
790 let timeout = self.simple_timeout;
791 for prox in self.proxies.iter() {
792 if let Some(intercepted) = prox.intercept(&dst) {
793 return Box::pin(with_timeout(
794 self.clone().connect_via_proxy(dst, intercepted),
795 timeout,
796 ));
797 }
798 }
799
800 Box::pin(with_timeout(
801 self.clone().connect_with_maybe_proxy(dst, false),
802 timeout,
803 ))
804 }
805}
806
807#[cfg(feature = "__tls")]
808trait TlsInfoFactory {
809 fn tls_info(&self) -> Option<crate::tls::TlsInfo>;
810}
811
812#[cfg(feature = "__tls")]
813impl TlsInfoFactory for tokio::net::TcpStream {
814 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
815 None
816 }
817}
818
819#[cfg(feature = "__tls")]
820impl<T: TlsInfoFactory> TlsInfoFactory for TokioIo<T> {
821 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
822 self.inner().tls_info()
823 }
824}
825
826#[cfg(feature = "default-tls")]
827impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
828 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
829 let peer_certificate = self
830 .get_ref()
831 .peer_certificate()
832 .ok()
833 .flatten()
834 .and_then(|c| c.to_der().ok());
835 Some(crate::tls::TlsInfo { peer_certificate })
836 }
837}
838
839#[cfg(feature = "default-tls")]
840impl TlsInfoFactory
841 for tokio_native_tls::TlsStream<
842 TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
843 >
844{
845 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
846 let peer_certificate = self
847 .get_ref()
848 .peer_certificate()
849 .ok()
850 .flatten()
851 .and_then(|c| c.to_der().ok());
852 Some(crate::tls::TlsInfo { peer_certificate })
853 }
854}
855
856#[cfg(feature = "default-tls")]
857impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
858 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
859 match self {
860 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
861 hyper_tls::MaybeHttpsStream::Http(_) => None,
862 }
863 }
864}
865
866#[cfg(feature = "__rustls")]
867impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
868 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
869 let peer_certificate = self
870 .get_ref()
871 .1
872 .peer_certificates()
873 .and_then(|certs| certs.first())
874 .map(|c| c.to_vec());
875 Some(crate::tls::TlsInfo { peer_certificate })
876 }
877}
878
879#[cfg(feature = "__rustls")]
880impl TlsInfoFactory
881 for tokio_rustls::client::TlsStream<
882 TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
883 >
884{
885 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
886 let peer_certificate = self
887 .get_ref()
888 .1
889 .peer_certificates()
890 .and_then(|certs| certs.first())
891 .map(|c| c.to_vec());
892 Some(crate::tls::TlsInfo { peer_certificate })
893 }
894}
895
896#[cfg(feature = "__rustls")]
897impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
898 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
899 match self {
900 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
901 hyper_rustls::MaybeHttpsStream::Http(_) => None,
902 }
903 }
904}
905
906pub(crate) trait AsyncConn:
907 Read + Write + Connection + Send + Sync + Unpin + 'static
908{
909}
910
911impl<T: Read + Write + Connection + Send + Sync + Unpin + 'static> AsyncConn for T {}
912
913#[cfg(feature = "__tls")]
914trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {}
915#[cfg(not(feature = "__tls"))]
916trait AsyncConnWithInfo: AsyncConn {}
917
918#[cfg(feature = "__tls")]
919impl<T: AsyncConn + TlsInfoFactory> AsyncConnWithInfo for T {}
920#[cfg(not(feature = "__tls"))]
921impl<T: AsyncConn> AsyncConnWithInfo for T {}
922
923type BoxConn = Box<dyn AsyncConnWithInfo>;
924
925pub(crate) mod sealed {
926 use super::*;
927 #[derive(Debug)]
928 pub struct Unnameable(pub(super) Uri);
929
930 pin_project! {
931 #[allow(missing_debug_implementations)]
936 pub struct Conn {
937 #[pin]
938 pub(super)inner: BoxConn,
939 pub(super) is_proxy: bool,
940 pub(super) tls_info: bool,
942 }
943 }
944
945 impl Connection for Conn {
946 fn connected(&self) -> Connected {
947 let connected = self.inner.connected().proxy(self.is_proxy);
948 #[cfg(feature = "__tls")]
949 if self.tls_info {
950 if let Some(tls_info) = self.inner.tls_info() {
951 connected.extra(tls_info)
952 } else {
953 connected
954 }
955 } else {
956 connected
957 }
958 #[cfg(not(feature = "__tls"))]
959 connected
960 }
961 }
962
963 impl Read for Conn {
964 fn poll_read(
965 self: Pin<&mut Self>,
966 cx: &mut Context,
967 buf: ReadBufCursor<'_>,
968 ) -> Poll<io::Result<()>> {
969 let this = self.project();
970 Read::poll_read(this.inner, cx, buf)
971 }
972 }
973
974 impl Write for Conn {
975 fn poll_write(
976 self: Pin<&mut Self>,
977 cx: &mut Context,
978 buf: &[u8],
979 ) -> Poll<Result<usize, io::Error>> {
980 let this = self.project();
981 Write::poll_write(this.inner, cx, buf)
982 }
983
984 fn poll_write_vectored(
985 self: Pin<&mut Self>,
986 cx: &mut Context<'_>,
987 bufs: &[IoSlice<'_>],
988 ) -> Poll<Result<usize, io::Error>> {
989 let this = self.project();
990 Write::poll_write_vectored(this.inner, cx, bufs)
991 }
992
993 fn is_write_vectored(&self) -> bool {
994 self.inner.is_write_vectored()
995 }
996
997 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
998 let this = self.project();
999 Write::poll_flush(this.inner, cx)
1000 }
1001
1002 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1003 let this = self.project();
1004 Write::poll_shutdown(this.inner, cx)
1005 }
1006 }
1007}
1008
1009pub(crate) type Connecting = Pin<Box<dyn Future<Output = Result<Conn, BoxError>> + Send>>;
1010
1011#[cfg(feature = "default-tls")]
1012mod native_tls_conn {
1013 use super::TlsInfoFactory;
1014 use hyper::rt::{Read, ReadBufCursor, Write};
1015 use hyper_tls::MaybeHttpsStream;
1016 use hyper_util::client::legacy::connect::{Connected, Connection};
1017 use hyper_util::rt::TokioIo;
1018 use pin_project_lite::pin_project;
1019 use std::{
1020 io::{self, IoSlice},
1021 pin::Pin,
1022 task::{Context, Poll},
1023 };
1024 use tokio::io::{AsyncRead, AsyncWrite};
1025 use tokio::net::TcpStream;
1026 use tokio_native_tls::TlsStream;
1027
1028 pin_project! {
1029 pub(super) struct NativeTlsConn<T> {
1030 #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1031 }
1032 }
1033
1034 impl Connection for NativeTlsConn<TokioIo<TokioIo<TcpStream>>> {
1035 fn connected(&self) -> Connected {
1036 let connected = self
1037 .inner
1038 .inner()
1039 .get_ref()
1040 .get_ref()
1041 .get_ref()
1042 .inner()
1043 .connected();
1044 #[cfg(feature = "native-tls-alpn")]
1045 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1046 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1047 _ => connected,
1048 }
1049 #[cfg(not(feature = "native-tls-alpn"))]
1050 connected
1051 }
1052 }
1053
1054 impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1055 fn connected(&self) -> Connected {
1056 let connected = self
1057 .inner
1058 .inner()
1059 .get_ref()
1060 .get_ref()
1061 .get_ref()
1062 .inner()
1063 .connected();
1064 #[cfg(feature = "native-tls-alpn")]
1065 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1066 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1067 _ => connected,
1068 }
1069 #[cfg(not(feature = "native-tls-alpn"))]
1070 connected
1071 }
1072 }
1073
1074 impl<T: AsyncRead + AsyncWrite + Unpin> Read for NativeTlsConn<T> {
1075 fn poll_read(
1076 self: Pin<&mut Self>,
1077 cx: &mut Context,
1078 buf: ReadBufCursor<'_>,
1079 ) -> Poll<tokio::io::Result<()>> {
1080 let this = self.project();
1081 Read::poll_read(this.inner, cx, buf)
1082 }
1083 }
1084
1085 impl<T: AsyncRead + AsyncWrite + Unpin> Write for NativeTlsConn<T> {
1086 fn poll_write(
1087 self: Pin<&mut Self>,
1088 cx: &mut Context,
1089 buf: &[u8],
1090 ) -> Poll<Result<usize, tokio::io::Error>> {
1091 let this = self.project();
1092 Write::poll_write(this.inner, cx, buf)
1093 }
1094
1095 fn poll_write_vectored(
1096 self: Pin<&mut Self>,
1097 cx: &mut Context<'_>,
1098 bufs: &[IoSlice<'_>],
1099 ) -> Poll<Result<usize, io::Error>> {
1100 let this = self.project();
1101 Write::poll_write_vectored(this.inner, cx, bufs)
1102 }
1103
1104 fn is_write_vectored(&self) -> bool {
1105 self.inner.is_write_vectored()
1106 }
1107
1108 fn poll_flush(
1109 self: Pin<&mut Self>,
1110 cx: &mut Context,
1111 ) -> Poll<Result<(), tokio::io::Error>> {
1112 let this = self.project();
1113 Write::poll_flush(this.inner, cx)
1114 }
1115
1116 fn poll_shutdown(
1117 self: Pin<&mut Self>,
1118 cx: &mut Context,
1119 ) -> Poll<Result<(), tokio::io::Error>> {
1120 let this = self.project();
1121 Write::poll_shutdown(this.inner, cx)
1122 }
1123 }
1124
1125 impl<T> TlsInfoFactory for NativeTlsConn<T>
1126 where
1127 TokioIo<TlsStream<T>>: TlsInfoFactory,
1128 {
1129 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1130 self.inner.tls_info()
1131 }
1132 }
1133}
1134
1135#[cfg(feature = "__rustls")]
1136mod rustls_tls_conn {
1137 use super::TlsInfoFactory;
1138 use hyper::rt::{Read, ReadBufCursor, Write};
1139 use hyper_rustls::MaybeHttpsStream;
1140 use hyper_util::client::legacy::connect::{Connected, Connection};
1141 use hyper_util::rt::TokioIo;
1142 use pin_project_lite::pin_project;
1143 use std::{
1144 io::{self, IoSlice},
1145 pin::Pin,
1146 task::{Context, Poll},
1147 };
1148 use tokio::io::{AsyncRead, AsyncWrite};
1149 use tokio::net::TcpStream;
1150 use tokio_rustls::client::TlsStream;
1151
1152 pin_project! {
1153 pub(super) struct RustlsTlsConn<T> {
1154 #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1155 }
1156 }
1157
1158 impl Connection for RustlsTlsConn<TokioIo<TokioIo<TcpStream>>> {
1159 fn connected(&self) -> Connected {
1160 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1161 self.inner
1162 .inner()
1163 .get_ref()
1164 .0
1165 .inner()
1166 .connected()
1167 .negotiated_h2()
1168 } else {
1169 self.inner.inner().get_ref().0.inner().connected()
1170 }
1171 }
1172 }
1173 impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1174 fn connected(&self) -> Connected {
1175 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1176 self.inner
1177 .inner()
1178 .get_ref()
1179 .0
1180 .inner()
1181 .connected()
1182 .negotiated_h2()
1183 } else {
1184 self.inner.inner().get_ref().0.inner().connected()
1185 }
1186 }
1187 }
1188
1189 impl<T: AsyncRead + AsyncWrite + Unpin> Read for RustlsTlsConn<T> {
1190 fn poll_read(
1191 self: Pin<&mut Self>,
1192 cx: &mut Context,
1193 buf: ReadBufCursor<'_>,
1194 ) -> Poll<tokio::io::Result<()>> {
1195 let this = self.project();
1196 Read::poll_read(this.inner, cx, buf)
1197 }
1198 }
1199
1200 impl<T: AsyncRead + AsyncWrite + Unpin> Write for RustlsTlsConn<T> {
1201 fn poll_write(
1202 self: Pin<&mut Self>,
1203 cx: &mut Context,
1204 buf: &[u8],
1205 ) -> Poll<Result<usize, tokio::io::Error>> {
1206 let this = self.project();
1207 Write::poll_write(this.inner, cx, buf)
1208 }
1209
1210 fn poll_write_vectored(
1211 self: Pin<&mut Self>,
1212 cx: &mut Context<'_>,
1213 bufs: &[IoSlice<'_>],
1214 ) -> Poll<Result<usize, io::Error>> {
1215 let this = self.project();
1216 Write::poll_write_vectored(this.inner, cx, bufs)
1217 }
1218
1219 fn is_write_vectored(&self) -> bool {
1220 self.inner.is_write_vectored()
1221 }
1222
1223 fn poll_flush(
1224 self: Pin<&mut Self>,
1225 cx: &mut Context,
1226 ) -> Poll<Result<(), tokio::io::Error>> {
1227 let this = self.project();
1228 Write::poll_flush(this.inner, cx)
1229 }
1230
1231 fn poll_shutdown(
1232 self: Pin<&mut Self>,
1233 cx: &mut Context,
1234 ) -> Poll<Result<(), tokio::io::Error>> {
1235 let this = self.project();
1236 Write::poll_shutdown(this.inner, cx)
1237 }
1238 }
1239 impl<T> TlsInfoFactory for RustlsTlsConn<T>
1240 where
1241 TokioIo<TlsStream<T>>: TlsInfoFactory,
1242 {
1243 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1244 self.inner.tls_info()
1245 }
1246 }
1247}
1248
1249#[cfg(feature = "socks")]
1250mod socks {
1251 use tower_service::Service;
1252
1253 use http::uri::Scheme;
1254 use http::Uri;
1255 use hyper_util::client::legacy::connect::proxy::{SocksV4, SocksV5};
1256 use tokio::net::TcpStream;
1257
1258 use super::BoxError;
1259 use crate::proxy::Intercepted;
1260
1261 pub(super) enum DnsResolve {
1262 Local,
1263 Proxy,
1264 }
1265
1266 #[derive(Debug)]
1267 pub(super) enum SocksProxyError {
1268 SocksNoHostInUrl,
1269 SocksLocalResolve(BoxError),
1270 SocksConnect(BoxError),
1271 }
1272
1273 pub(super) async fn connect(
1274 proxy: Intercepted,
1275 dst: Uri,
1276 dns_mode: DnsResolve,
1277 resolver: &crate::dns::DynResolver,
1278 http_connector: &mut crate::connect::HttpConnector,
1279 ) -> Result<TcpStream, SocksProxyError> {
1280 let https = dst.scheme() == Some(&Scheme::HTTPS);
1281 let original_host = dst.host().ok_or(SocksProxyError::SocksNoHostInUrl)?;
1282 let mut host = original_host.to_owned();
1283 let port = match dst.port() {
1284 Some(p) => p.as_u16(),
1285 None if https => 443u16,
1286 _ => 80u16,
1287 };
1288
1289 if let DnsResolve::Local = dns_mode {
1290 let maybe_new_target = resolver
1291 .http_resolve(&dst)
1292 .await
1293 .map_err(SocksProxyError::SocksLocalResolve)?
1294 .next();
1295 if let Some(new_target) = maybe_new_target {
1296 log::trace!("socks local dns resolved {new_target:?}");
1297 let ip = new_target.ip();
1299 if ip.is_ipv6() {
1300 host = format!("[{}]", ip);
1301 } else {
1302 host = ip.to_string();
1303 }
1304 }
1305 }
1306
1307 let proxy_uri = proxy.uri().clone();
1308 let dst_uri = format!(
1310 "{}://{}:{}",
1311 if https { "https" } else { "http" },
1312 host,
1313 port
1314 )
1315 .parse::<Uri>()
1316 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1317
1318 match proxy.uri().scheme_str() {
1320 Some("socks4") | Some("socks4a") => {
1321 let mut svc = SocksV4::new(proxy_uri, http_connector);
1322 let stream = Service::call(&mut svc, dst_uri)
1323 .await
1324 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1325 Ok(stream.into_inner())
1326 }
1327 Some("socks5") | Some("socks5h") => {
1328 let mut svc = if let Some((username, password)) = proxy.raw_auth() {
1329 SocksV5::new(proxy_uri, http_connector)
1330 .with_auth(username.to_string(), password.to_string())
1331 } else {
1332 SocksV5::new(proxy_uri, http_connector)
1333 };
1334 let stream = Service::call(&mut svc, dst_uri)
1335 .await
1336 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1337 Ok(stream.into_inner())
1338 }
1339 _ => unreachable!(),
1340 }
1341 }
1342
1343 impl std::fmt::Display for SocksProxyError {
1344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1345 match self {
1346 Self::SocksNoHostInUrl => f.write_str("socks proxy destination has no host"),
1347 Self::SocksLocalResolve(_) => f.write_str("error resolving for socks proxy"),
1348 Self::SocksConnect(_) => f.write_str("error connecting to socks proxy"),
1349 }
1350 }
1351 }
1352
1353 impl std::error::Error for SocksProxyError {
1354 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1355 match self {
1356 Self::SocksNoHostInUrl => None,
1357 Self::SocksLocalResolve(ref e) => Some(&**e),
1358 Self::SocksConnect(ref e) => Some(&**e),
1359 }
1360 }
1361 }
1362}
1363
1364mod verbose {
1365 use crate::util::Escape;
1366 use hyper::rt::{Read, ReadBufCursor, Write};
1367 use hyper_util::client::legacy::connect::{Connected, Connection};
1368 use std::cmp::min;
1369 use std::fmt;
1370 use std::io::{self, IoSlice};
1371 use std::pin::Pin;
1372 use std::task::{Context, Poll};
1373
1374 pub(super) const OFF: Wrapper = Wrapper(false);
1375
1376 #[derive(Clone, Copy)]
1377 pub(super) struct Wrapper(pub(super) bool);
1378
1379 impl Wrapper {
1380 pub(super) fn wrap<T: super::AsyncConnWithInfo>(&self, conn: T) -> super::BoxConn {
1381 if self.0 && log::log_enabled!(log::Level::Trace) {
1382 Box::new(Verbose {
1383 id: crate::util::fast_random() as u32,
1385 inner: conn,
1386 })
1387 } else {
1388 Box::new(conn)
1389 }
1390 }
1391 }
1392
1393 struct Verbose<T> {
1394 id: u32,
1395 inner: T,
1396 }
1397
1398 impl<T: Connection + Read + Write + Unpin> Connection for Verbose<T> {
1399 fn connected(&self) -> Connected {
1400 self.inner.connected()
1401 }
1402 }
1403
1404 impl<T: Read + Write + Unpin> Read for Verbose<T> {
1405 fn poll_read(
1406 mut self: Pin<&mut Self>,
1407 cx: &mut Context,
1408 mut buf: ReadBufCursor<'_>,
1409 ) -> Poll<std::io::Result<()>> {
1410 let mut vbuf = hyper::rt::ReadBuf::uninit(unsafe { buf.as_mut() });
1414 match Pin::new(&mut self.inner).poll_read(cx, vbuf.unfilled()) {
1415 Poll::Ready(Ok(())) => {
1416 log::trace!("{:08x} read: {:?}", self.id, Escape::new(vbuf.filled()));
1417 let len = vbuf.filled().len();
1418 unsafe {
1421 buf.advance(len);
1422 }
1423 Poll::Ready(Ok(()))
1424 }
1425 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1426 Poll::Pending => Poll::Pending,
1427 }
1428 }
1429 }
1430
1431 impl<T: Read + Write + Unpin> Write for Verbose<T> {
1432 fn poll_write(
1433 mut self: Pin<&mut Self>,
1434 cx: &mut Context,
1435 buf: &[u8],
1436 ) -> Poll<Result<usize, std::io::Error>> {
1437 match Pin::new(&mut self.inner).poll_write(cx, buf) {
1438 Poll::Ready(Ok(n)) => {
1439 log::trace!("{:08x} write: {:?}", self.id, Escape::new(&buf[..n]));
1440 Poll::Ready(Ok(n))
1441 }
1442 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1443 Poll::Pending => Poll::Pending,
1444 }
1445 }
1446
1447 fn poll_write_vectored(
1448 mut self: Pin<&mut Self>,
1449 cx: &mut Context<'_>,
1450 bufs: &[IoSlice<'_>],
1451 ) -> Poll<Result<usize, io::Error>> {
1452 match Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) {
1453 Poll::Ready(Ok(nwritten)) => {
1454 log::trace!(
1455 "{:08x} write (vectored): {:?}",
1456 self.id,
1457 Vectored { bufs, nwritten }
1458 );
1459 Poll::Ready(Ok(nwritten))
1460 }
1461 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1462 Poll::Pending => Poll::Pending,
1463 }
1464 }
1465
1466 fn is_write_vectored(&self) -> bool {
1467 self.inner.is_write_vectored()
1468 }
1469
1470 fn poll_flush(
1471 mut self: Pin<&mut Self>,
1472 cx: &mut Context,
1473 ) -> Poll<Result<(), std::io::Error>> {
1474 Pin::new(&mut self.inner).poll_flush(cx)
1475 }
1476
1477 fn poll_shutdown(
1478 mut self: Pin<&mut Self>,
1479 cx: &mut Context,
1480 ) -> Poll<Result<(), std::io::Error>> {
1481 Pin::new(&mut self.inner).poll_shutdown(cx)
1482 }
1483 }
1484
1485 #[cfg(feature = "__tls")]
1486 impl<T: super::TlsInfoFactory> super::TlsInfoFactory for Verbose<T> {
1487 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1488 self.inner.tls_info()
1489 }
1490 }
1491
1492 struct Vectored<'a, 'b> {
1493 bufs: &'a [IoSlice<'b>],
1494 nwritten: usize,
1495 }
1496
1497 impl fmt::Debug for Vectored<'_, '_> {
1498 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1499 let mut left = self.nwritten;
1500 for buf in self.bufs.iter() {
1501 if left == 0 {
1502 break;
1503 }
1504 let n = min(left, buf.len());
1505 Escape::new(&buf[..n]).fmt(f)?;
1506 left -= n;
1507 }
1508 Ok(())
1509 }
1510 }
1511}