hyper_util/client/legacy/connect/proxy/socks/v5/
mod.rs1mod errors;
2pub use errors::*;
3
4mod messages;
5use messages::*;
6
7use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
8use std::task::{Context, Poll};
9
10use http::Uri;
11use hyper::rt::{Read, Write};
12use tower_service::Service;
13
14use bytes::BytesMut;
15
16use super::{Handshaking, SocksError};
17
18#[derive(Debug, Clone)]
24pub struct SocksV5<C> {
25 inner: C,
26 config: SocksConfig,
27}
28
29#[derive(Debug, Clone)]
30pub struct SocksConfig {
31 proxy: Uri,
32 proxy_auth: Option<(String, String)>,
33
34 local_dns: bool,
35 optimistic: bool,
36}
37
38#[derive(Debug)]
39enum State {
40 SendingNegReq,
41 ReadingNegRes,
42 SendingAuthReq,
43 ReadingAuthRes,
44 SendingProxyReq,
45 ReadingProxyRes,
46}
47
48impl<C> SocksV5<C> {
49 pub fn new(proxy_dst: Uri, connector: C) -> Self {
58 Self {
59 inner: connector,
60 config: SocksConfig::new(proxy_dst),
61 }
62 }
63
64 pub fn with_auth(mut self, user: String, pass: String) -> Self {
70 self.config.proxy_auth = Some((user, pass));
71 self
72 }
73
74 pub fn local_dns(mut self, local_dns: bool) -> Self {
79 self.config.local_dns = local_dns;
80 self
81 }
82
83 pub fn send_optimistically(mut self, optimistic: bool) -> Self {
93 self.config.optimistic = optimistic;
94 self
95 }
96}
97
98impl SocksConfig {
99 fn new(proxy: Uri) -> Self {
100 Self {
101 proxy,
102 proxy_auth: None,
103
104 local_dns: false,
105 optimistic: false,
106 }
107 }
108
109 async fn execute<T, E>(self, mut conn: T, host: String, port: u16) -> Result<T, SocksError<E>>
110 where
111 T: Read + Write + Unpin,
112 {
113 let address = match host.parse::<IpAddr>() {
114 Ok(ip) => Address::Socket(SocketAddr::new(ip, port)),
115 Err(_) if host.len() <= 255 => {
116 if self.local_dns {
117 let socket = (host, port)
118 .to_socket_addrs()?
119 .next()
120 .ok_or(SocksError::DnsFailure)?;
121
122 Address::Socket(socket)
123 } else {
124 Address::Domain(host, port)
125 }
126 }
127 Err(_) => return Err(SocksV5Error::HostTooLong.into()),
128 };
129
130 let method = if self.proxy_auth.is_some() {
131 AuthMethod::UserPass
132 } else {
133 AuthMethod::NoAuth
134 };
135
136 let mut recv_buf = BytesMut::with_capacity(513); let mut send_buf = BytesMut::with_capacity(262); let mut state = State::SendingNegReq;
139
140 loop {
141 match state {
142 State::SendingNegReq => {
143 let req = NegotiationReq(&method);
144
145 let start = send_buf.len();
146 req.write_to_buf(&mut send_buf)?;
147 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
148
149 if self.optimistic {
150 if method == AuthMethod::UserPass {
151 state = State::SendingAuthReq;
152 } else {
153 state = State::SendingProxyReq;
154 }
155 } else {
156 state = State::ReadingNegRes;
157 }
158 }
159
160 State::ReadingNegRes => {
161 let res: NegotiationRes = super::read_message(&mut conn, &mut recv_buf).await?;
162
163 if res.0 == AuthMethod::NoneAcceptable {
164 return Err(SocksV5Error::Auth(AuthError::Unsupported).into());
165 }
166
167 if res.0 != method {
168 return Err(SocksV5Error::Auth(AuthError::MethodMismatch).into());
169 }
170
171 if self.optimistic {
172 if res.0 == AuthMethod::UserPass {
173 state = State::ReadingAuthRes;
174 } else {
175 state = State::ReadingProxyRes;
176 }
177 } else if res.0 == AuthMethod::UserPass {
178 state = State::SendingAuthReq;
179 } else {
180 state = State::SendingProxyReq;
181 }
182 }
183
184 State::SendingAuthReq => {
185 let (user, pass) = self.proxy_auth.as_ref().unwrap();
186 let req = AuthenticationReq(user, pass);
187
188 let start = send_buf.len();
189 req.write_to_buf(&mut send_buf)?;
190 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
191
192 if self.optimistic {
193 state = State::SendingProxyReq;
194 } else {
195 state = State::ReadingAuthRes;
196 }
197 }
198
199 State::ReadingAuthRes => {
200 let res: AuthenticationRes =
201 super::read_message(&mut conn, &mut recv_buf).await?;
202
203 if !res.0 {
204 return Err(SocksV5Error::Auth(AuthError::Failed).into());
205 }
206
207 if self.optimistic {
208 state = State::ReadingProxyRes;
209 } else {
210 state = State::SendingProxyReq;
211 }
212 }
213
214 State::SendingProxyReq => {
215 let req = ProxyReq(&address);
216
217 let start = send_buf.len();
218 req.write_to_buf(&mut send_buf)?;
219 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
220
221 if self.optimistic {
222 state = State::ReadingNegRes;
223 } else {
224 state = State::ReadingProxyRes;
225 }
226 }
227
228 State::ReadingProxyRes => {
229 let res: ProxyRes = super::read_message(&mut conn, &mut recv_buf).await?;
230
231 if res.0 == Status::Success {
232 return Ok(conn);
233 } else {
234 return Err(SocksV5Error::Command(res.0).into());
235 }
236 }
237 }
238 }
239 }
240}
241
242impl<C> Service<Uri> for SocksV5<C>
243where
244 C: Service<Uri>,
245 C::Future: Send + 'static,
246 C::Response: Read + Write + Unpin + Send + 'static,
247 C::Error: Send + 'static,
248{
249 type Response = C::Response;
250 type Error = SocksError<C::Error>;
251 type Future = Handshaking<C::Future, C::Response, C::Error>;
252
253 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
254 self.inner.poll_ready(cx).map_err(SocksError::Inner)
255 }
256
257 fn call(&mut self, dst: Uri) -> Self::Future {
258 let config = self.config.clone();
259 let connecting = self.inner.call(config.proxy.clone());
260
261 let fut = async move {
262 let port = dst.port().map(|p| p.as_u16()).unwrap_or(443);
263 let host = dst.host().ok_or(SocksError::MissingHost)?.to_string();
264
265 let conn = connecting.await.map_err(SocksError::Inner)?;
266 config.execute(conn, host, port).await
267 };
268
269 Handshaking {
270 fut: Box::pin(fut),
271 _marker: Default::default(),
272 }
273 }
274}