import time
import asyncio
import warnings
import ssl as _ssl
from collections import Counter
from .errors import *
from .utils import log, parse_headers
from .resolver import Resolver
from .negotiators import NGTRS
_HTTP_PROTOS = {'HTTP', 'CONNECT:80', 'SOCKS4', 'SOCKS5'}
_HTTPS_PROTOS = {'HTTPS', 'SOCKS4', 'SOCKS5'}
[docs]class Proxy:
"""Proxy.
:param str host: IP address of the proxy
:param int port: Port of the proxy
:param tuple types:
(optional) List of types (protocols) which may be supported
by the proxy and which can be checked to work with the proxy
:param int timeout:
(optional) Timeout of a connection and receive a response in seconds
:param bool verify_ssl:
(optional) Flag indicating whether to check the SSL certificates.
Set to True to check ssl certifications
:raises ValueError: If the host not is IP address, or if the port > 65535
"""
[docs] @classmethod
async def create(cls, host, *args, **kwargs):
"""Asynchronously create a :class:`Proxy` object.
:param str host: A passed host can be a domain or IP address.
If the host is a domain, try to resolve it
:param str \*args:
(optional) Positional arguments that :class:`Proxy` takes
:param str \*\*kwargs:
(optional) Keyword arguments that :class:`Proxy` takes
:return: :class:`Proxy` object
:rtype: proxybroker.Proxy
:raises ResolveError: If could not resolve the host
:raises ValueError: If the port > 65535
"""
loop = kwargs.pop('loop', None)
resolver = kwargs.pop('resolver', Resolver(loop=loop))
try:
_host = await resolver.resolve(host)
self = cls(_host, *args, **kwargs)
except (ResolveError, ValueError) as e:
log.error('%s:%s: Error at creating: %s' % (host, args[0], e))
raise
return self
def __init__(self, host=None, port=None, types=(),
timeout=8, verify_ssl=False):
self.host = host
if not Resolver.host_is_ip(self.host):
raise ValueError('The host of proxy should be the IP address. '
'Try Proxy.create() if the host is a domain')
self.port = int(port)
if self.port > 65535:
raise ValueError('The port of proxy cannot be greater than 65535')
self.expected_types = set(types) & {'HTTP', 'HTTPS', 'CONNECT:80',
'CONNECT:25', 'SOCKS4', 'SOCKS5'}
self._timeout = timeout
self._ssl_context = (True if verify_ssl else
_ssl._create_unverified_context())
self._types = {}
self._is_working = False
self.stat = {'requests': 0, 'errors': Counter()}
self._ngtr = None
self._geo = Resolver.get_ip_info(self.host)
self._log = []
self._runtimes = []
self._schemes = ()
self._closed = True
self._reader = {'conn': None, 'ssl': None}
self._writer = {'conn': None, 'ssl': None}
def __repr__(self):
# <Proxy US 1.12 [HTTP: Anonymous, HTTPS] 10.0.0.1:8080>
tpinfo = []
order = lambda tp_lvl: (len(tp_lvl[0]), tp_lvl[0][-1])
for tp, lvl in sorted(self.types.items(), key=order):
s = '{tp}: {lvl}' if lvl else '{tp}'
s = s.format(tp=tp, lvl=lvl)
tpinfo.append(s)
tpinfo = ', '.join(tpinfo)
return '<Proxy {code} {avg:.2f}s [{types}] {host}:{port}>'.format(
code=self._geo.code, types=tpinfo, host=self.host,
port=self.port, avg=self.avg_resp_time)
@property
def types(self):
"""Types (protocols) supported by the proxy.
| Where key is type, value is level of anonymity
(only for HTTP, for other types level always is None).
| Available types: HTTP, HTTPS, SOCKS4, SOCKS5, CONNECT:80, CONNECT:25
| Available levels: Transparent, Anonymous, High.
:rtype: dict
"""
return self._types
@property
def is_working(self):
"""True if the proxy is working, False otherwise.
:rtype: bool
"""
return self._is_working
@is_working.setter
def is_working(self, val):
self._is_working = val
@property
def writer(self):
return self._writer.get('ssl') or self._writer.get('conn')
@property
def reader(self):
return self._reader.get('ssl') or self._reader.get('conn')
@property
def priority(self):
return (self.error_rate, self.avg_resp_time)
@property
def error_rate(self):
"""Error rate: from 0 to 1.
For example: 0.7 = 70% requests ends with error.
:rtype: float
.. versionadded:: 0.2.0
"""
if not self.stat['requests']:
return 0
return sum(self.stat['errors'].values()) / self.stat['requests']
@property
def schemes(self):
"""Return supported schemes."""
if not self._schemes:
_schemes = []
if self.types.keys() & _HTTP_PROTOS:
_schemes.append('HTTP')
if self.types.keys() & _HTTPS_PROTOS:
_schemes.append('HTTPS')
self._schemes = tuple(_schemes)
return self._schemes
@property
def avg_resp_time(self):
"""The average connection/response time.
:rtype: float
"""
if self._runtimes:
return sum(self._runtimes) / len(self._runtimes)
else:
return 0.0
@property
def avgRespTime(self):
"""
.. deprecated:: 2.0
Use :attr:`avg_resp_time` instead.
"""
warnings.warn('`avgRespTime` property is deprecated, '
'use `avg_resp_time` instead.', DeprecationWarning)
return self.avg_resp_time
@property
def geo(self):
"""Geo information about IP address of the proxy.
:return:
Named tuple with fields:
* ``code`` - ISO country code
* ``name`` - Full name of country
:rtype: collections.namedtuple
.. versionchanged:: 0.2.0
In previous versions return a dictionary, now named tuple.
"""
return self._geo
@property
def ngtr(self):
return self._ngtr
@ngtr.setter
def ngtr(self, proto):
self._ngtr = NGTRS[proto](self)
def as_json(self):
"""Return the proxy's properties in JSON format.
:rtype: dict
"""
info = {
'host': self.host,
'port': self.port,
'geo': {
'country': {
'code': self._geo.code,
'name': self._geo.name,
},
},
'types': [],
'avg_resp_time': 0,
'error_rate': 0,
}
order = lambda tp_lvl: (len(tp_lvl[0]), tp_lvl[0][-1])
for tp, lvl in sorted(self.types.items(), key=order):
info['types'].append({'type': tp, 'level': lvl or ''})
info['avg_resp_time'] = round(self.avg_resp_time, 2)
info['error_rate'] = round(self.error_rate, 2)
return info
def log(self, msg, stime=0, err=None):
ngtr = self.ngtr.name if self.ngtr else 'INFO'
runtime = time.time() - stime if stime else 0
log.debug('{h}:{p} [{n}]: {msg}; Runtime: {rt:.2f}'.format(
h=self.host, p=self.port, n=ngtr, msg=msg, rt=runtime))
trunc = '...' if len(msg) > 58 else ''
msg = '{msg:.60s}{trunc}'.format(msg=msg, trunc=trunc)
self._log.append((ngtr, msg, runtime))
if err:
self.stat['errors'][err.errmsg] += 1
if runtime and 'timeout' not in msg:
self._runtimes.append(runtime)
[docs] def get_log(self):
"""Proxy log.
:return: The proxy log in format: (negotaitor, msg, runtime)
:rtype: tuple
.. versionadded:: 0.2.0
"""
return self._log
async def connect(self, ssl=False):
err = None
msg = '%s' % 'SSL: ' if ssl else ''
stime = time.time()
self.log('%sInitial connection' % msg)
try:
if ssl:
_type = 'ssl'
sock = self._writer['conn'].get_extra_info('socket')
params = {'ssl': self._ssl_context, 'sock': sock, 'server_hostname': self.host}
else:
_type = 'conn'
params = {'host': self.host, 'port': self.port}
self._reader[_type], self._writer[_type] = \
await asyncio.wait_for(asyncio.open_connection(**params),
timeout=self._timeout)
except asyncio.TimeoutError:
msg += 'Connection: timeout'
err = ProxyTimeoutError(msg)
raise err
except (ConnectionRefusedError, OSError, _ssl.SSLError):
msg += 'Connection: failed'
err = ProxyConnError(msg)
raise err
# except asyncio.CancelledError:
# log.debug('Cancelled in proxy.connect()')
# raise ProxyConnError()
else:
msg += 'Connection: success'
self._closed = False
finally:
self.stat['requests'] += 1
self.log(msg, stime, err=err)
def close(self):
if self._closed:
return
self._closed = True
if self.writer:
# try:
self.writer.close()
# except RuntimeError:
# print('Try proxy.close() when loop is closed:',
# asyncio.get_event_loop()._closed)
self._reader = {'conn': None, 'ssl': None}
self._writer = {'conn': None, 'ssl': None}
self.log('Connection: closed')
self._ngtr = None
async def send(self, req):
msg, err = '', None
_req = req.encode() if not isinstance(req, bytes) else req
try:
self.writer.write(_req)
await self.writer.drain()
except ConnectionResetError:
msg = '; Sending: failed'
err = ProxySendError(msg)
raise err
finally:
self.log('Request: %s%s' % (req, msg), err=err)
async def recv(self, length=0, head_only=False):
resp, msg, err = b'', '', None
stime = time.time()
try:
resp = await asyncio.wait_for(
self._recv(length, head_only), timeout=self._timeout)
except asyncio.TimeoutError:
msg = 'Received: timeout'
err = ProxyTimeoutError(msg)
raise err
except (ConnectionResetError, OSError) as e:
msg = 'Received: failed' # (connection is reset by the peer)
err = ProxyRecvError(msg)
raise err
else:
msg = 'Received: %s bytes' % len(resp)
if not resp:
err = ProxyEmptyRecvError(msg)
raise err
finally:
if resp:
msg += ': %s' % resp[:12]
self.log(msg, stime, err=err)
return resp
async def _recv(self, length=0, head_only=False):
resp = b''
if length:
try:
resp = await self.reader.readexactly(length)
except asyncio.IncompleteReadError as e:
resp = e.partial
else:
body_size, body_recv, chunked = 0, 0, None
while not self.reader.at_eof():
line = await self.reader.readline()
resp += line
if body_size:
body_recv += len(line)
if body_recv >= body_size:
break
elif chunked and line == b'0\r\n':
break
elif not body_size and line == b'\r\n':
if head_only:
break
headers = parse_headers(resp)
body_size = int(headers.get('Content-Length', 0))
if not body_size:
chunked = headers.get('Transfer-Encoding') == 'chunked'
return resp