Source code for wpull.url

'''URL parsing based on WHATWG URL living standard.'''
import collections
import fnmatch
import functools
import gettext
import ipaddress
import logging
import re
import string
import urllib.parse
import posixpath

from wpull.backport.logging import BraceMessage as __
import wpull.string


_logger = logging.getLogger(__name__)
_ = gettext.gettext


RELATIVE_SCHEME_DEFAULT_PORTS = {
    'ftp': 21,
    'gopher': 70,
    'http': 80,
    'https': 443,
    'ws': 80,
    'wss': 443,
}

C0_CONTROL_SET = frozenset(chr(i) for i in range(0, 0x1f + 1))
'''Characters from 0x00 to 0x1f inclusive'''

DEFAULT_ENCODE_SET = frozenset(b' "#<>?`')
'''Percent encoding set as defined by WHATWG URL living standard.

Does not include U+0000 to U+001F nor U+001F or above.
'''

PASSWORD_ENCODE_SET = DEFAULT_ENCODE_SET | frozenset(b'/@\\')
'''Encoding set for passwords.'''

USERNAME_ENCODE_SET = PASSWORD_ENCODE_SET | frozenset(b':')
'''Encoding set for usernames.'''

QUERY_ENCODE_SET = frozenset(b'"#<>`')
'''Encoding set for query strings.

This set does not include U+0020 (space) so it can be replaced with
U+0043 (plus sign) later.
'''

FRAGMENT_ENCODE_SET = frozenset(b' "<>`')
'''Encoding set for fragment.'''

QUERY_VALUE_ENCODE_SET = QUERY_ENCODE_SET | frozenset(b'&+%')
'''Encoding set for a query value.'''

FORBIDDEN_HOSTNAME_CHARS = frozenset('#%/:?@[\\] ')
'''Forbidden hostname characters.

Does not include non-printing characters. Meant for ASCII.
'''


[docs]class URLInfo(object): '''Represent parts of a URL. Attributes: raw (str): Original string. scheme (str): Protocol (for example, HTTP, FTP). authority (str): Raw userinfo and host. path (str): Location of resource. This value always begins with a slash (``/``). query (str): Additional request parameters. fragment (str): Named anchor of a document. userinfo (str): Raw username and password. username (str): Username. password (str): Password. host (str): Raw hostname and port. hostname (str): Hostname or IP address. port (int): IP address port number. resource (int): Raw path, query, and fragment. This value always begins with a slash (``/``). query_map (dict): Mapping of the query. Values are lists. url (str): A normalized URL without userinfo and fragment. encoding (str): Codec name for IRI support. If scheme is not something like HTTP or FTP, the remaining attributes are None. All attributes are read only. For more information about how the URL parts are derived, see https://medialize.github.io/URI.js/about-uris.html ''' __slots__ = ('raw', 'scheme', 'authority', 'path', 'query', 'fragment', 'userinfo', 'username', 'password', 'host', 'hostname', 'port', 'resource', '_query_map', '_url', 'encoding', ) def __init__(self): self.raw = None self.scheme = None self.authority = None self.path = None self.query = None self.fragment = None self.userinfo = None self.username = None self.password = None self.host = None self.hostname = None self.port = None self.resource = None self._query_map = None self._url = None self.encoding = None @classmethod @functools.lru_cache()
[docs] def parse(cls, url, default_scheme='http', encoding='utf-8'): '''Parse a URL and return a URLInfo.''' url = url.strip() if frozenset(url) & C0_CONTROL_SET: raise ValueError('URL contains control codes: {}'.format(ascii(url))) scheme, sep, remaining = url.partition(':') if not scheme: raise ValueError('URL missing scheme: {}'.format(ascii(url))) scheme = scheme.lower() if not sep and default_scheme: # Likely something like example.com/mystuff remaining = url scheme = default_scheme elif not sep: raise ValueError('URI missing colon: {}'.format(ascii(url))) if default_scheme and '.' in scheme or scheme == 'localhost': # Maybe something like example.com:8080/mystuff or # maybe localhost:8080/mystuff remaining = '{}:{}'.format(scheme, remaining) scheme = default_scheme info = URLInfo() info.encoding = encoding if scheme not in RELATIVE_SCHEME_DEFAULT_PORTS: info.raw = url info.scheme = scheme info.path = remaining return info if remaining.startswith('//'): remaining = remaining[2:] path_index = remaining.find('/') query_index = remaining.find('?') fragment_index = remaining.find('#') try: index_tuple = (path_index, query_index, fragment_index) authority_index = min(num for num in index_tuple if num >= 0) except ValueError: authority_index = len(remaining) authority = remaining[:authority_index] resource = remaining[authority_index:] try: index_tuple = (query_index, fragment_index) path_index = min(num for num in index_tuple if num >= 0) except ValueError: path_index = len(remaining) path = remaining[authority_index + 1:path_index] or '/' if fragment_index >= 0: query_index = fragment_index else: query_index = len(remaining) query = remaining[path_index + 1:query_index] fragment = remaining[query_index + 1:] userinfo, host = cls.parse_authority(authority) hostname, port = cls.parse_host(host) username, password = cls.parse_userinfo(userinfo) if not hostname: raise ValueError('Hostname is empty: {}'.format(ascii(url))) info.raw = url info.scheme = scheme info.authority = authority info.path = normalize_path(path, encoding=encoding) info.query = normalize_query(query, encoding=encoding) info.fragment = normalize_fragment(fragment, encoding=encoding) info.userinfo = userinfo info.username = percent_decode(username, encoding=encoding) info.password = percent_decode(password, encoding=encoding) info.host = host info.hostname = hostname info.port = port or RELATIVE_SCHEME_DEFAULT_PORTS[scheme] info.resource = resource return info
@classmethod
[docs] def parse_authority(cls, authority): '''Parse the authority part and return userinfo and host.''' userinfo, sep, host = authority.partition('@') if not sep: return '', userinfo else: return userinfo, host
@classmethod
[docs] def parse_userinfo(cls, userinfo): '''Parse the userinfo and return username and password.''' username, sep, password = userinfo.partition(':') return username, password
@classmethod
[docs] def parse_host(cls, host): '''Parse the host and return hostname and port.''' if host.endswith(']'): return cls.parse_hostname(host), None else: hostname, sep, port = host.rpartition(':') if sep: port = int(port) if port < 0 or port > 65535: raise ValueError('Port number invalid') else: hostname = port port = None return cls.parse_hostname(hostname), port
@classmethod
[docs] def parse_hostname(cls, hostname): '''Parse the hostname and normalize.''' if hostname.startswith('['): return cls.parse_ipv6_hostname(hostname) else: try: new_hostname = normalize_ipv4_address(hostname) except ValueError: # _logger.debug('', exc_info=True) new_hostname = hostname new_hostname = normalize_hostname(new_hostname) if any(char in new_hostname for char in FORBIDDEN_HOSTNAME_CHARS): raise ValueError('Invalid hostname: {}' .format(ascii(hostname))) return new_hostname
@classmethod
[docs] def parse_ipv6_hostname(cls, hostname): '''Parse and normalize a IPv6 address.''' if not hostname.startswith('[') or not hostname.endswith(']'): raise ValueError('Invalid IPv6 address: {}' .format(ascii(hostname))) hostname = ipaddress.IPv6Address(hostname[1:-1]).compressed return hostname
@property def query_map(self): if self._query_map is None: self._query_map = query_to_map(self.query) return self._query_map @property def url(self): if self._url is None: if self.scheme not in RELATIVE_SCHEME_DEFAULT_PORTS: self._url = self.raw return self._url parts = [self.scheme, '://'] if self.username: parts.append(normalize_username(self.username)) if self.password: parts.append(':') parts.append(normalize_password(self.password)) if self.username or self.password: parts.append('@') if self.is_ipv6(): parts.append('[{}]'.format(self.hostname)) else: parts.append(self.hostname) if RELATIVE_SCHEME_DEFAULT_PORTS[self.scheme] != self.port: parts.append(':{}'.format(self.port)) parts.append(self.path) if self.query: parts.append('?') parts.append(self.query) self._url = ''.join(parts) return self._url
[docs] def to_dict(self): '''Return a dict of the attributes.''' return dict( raw=self.raw, scheme=self.scheme, authority=self.authority, netloc=self.authority, path=self.path, query=self.query, fragment=self.fragment, userinfo=self.userinfo, username=self.username, password=self.password, host=self.host, hostname=self.hostname, port=self.port, resource=self.resource, url=self.url, encoding=self.encoding, )
[docs] def is_port_default(self): '''Return whether the URL is using the default port.''' if self.scheme in RELATIVE_SCHEME_DEFAULT_PORTS: return RELATIVE_SCHEME_DEFAULT_PORTS[self.scheme] == self.port
[docs] def is_ipv6(self): '''Return whether the URL is IPv6.''' if self.host: return self.host.startswith('[')
@property def hostname_with_port(self): '''Return the host portion but omit default port if needed.''' default_port = RELATIVE_SCHEME_DEFAULT_PORTS.get(self.scheme) if not default_port: return '' assert '[' not in self.hostname assert ']' not in self.hostname if self.is_ipv6(): hostname = '[{}]'.format(self.hostname) else: hostname = self.hostname if default_port != self.port: return '{}:{}'.format(hostname, self.port) else: return hostname
[docs] def split_path(self): '''Return the directory and filename from the path. The results are not percent-decoded. ''' return posixpath.split(self.path)
def __repr__(self): return '<URLInfo at 0x{:x} url={} raw={}>'.format( id(self), self.url, self.raw) def __hash__(self): return hash(self.raw) def __eq__(self, other): return self.raw == other.raw def __ne__(self, other): return self.raw != other.raw
[docs]def parse_url_or_log(url, encoding='utf-8'): '''Parse and return a URLInfo. This function logs a warning if the URL cannot be parsed and returns None. ''' try: url_info = URLInfo.parse(url, encoding=encoding) except ValueError as error: _logger.warning(__( _('Unable to parse URL ‘{url}’: {error}.'), url=wpull.string.printable_str(url), error=error)) else: return url_info
[docs]def normalize(url, **kwargs): '''Normalize a URL. This function is a convenience function that is equivalent to:: >>> URLInfo.parse('http://example.com').url 'http://example.com' :seealso: :func:`URLInfo.parse`. ''' return URLInfo.parse(url, **kwargs).url
@functools.lru_cache()
[docs]def normalize_hostname(hostname): '''Normalizes a hostname so that it is ASCII and valid domain name.''' new_hostname = hostname.encode('idna').decode('ascii').lower() if hostname != new_hostname: # Check for round-trip. May raise UnicodeError new_hostname.encode('idna') return new_hostname
[docs]def parse_ipv4_int(text): if text.startswith('0x'): base = 16 elif text.startswith('0'): base = 8 else: base = 10 return int(text, base)
[docs]def normalize_ipv4_address(address): num_decimals = address.count('.') if num_decimals == 0: return ipaddress.IPv4Address(parse_ipv4_int(address)).compressed elif num_decimals == 3: return ipaddress.IPv4Address( sum( parse_ipv4_int(part) << (24 - index * 8) for index, part in enumerate(address.split('.')) ) ).compressed else: raise ValueError('Not an IPv4 address')
[docs]def normalize_path(path, encoding='utf-8'): '''Normalize a path string. Flattens a path by removing dot parts, percent-encodes unacceptable characters and ensures percent-encoding is uppercase. ''' if not path.startswith('/'): path = '/' + path path = percent_encode(flatten_path(path, flatten_slashes=True), encoding=encoding) return uppercase_percent_encoding(path)
[docs]def normalize_query(text, encoding='utf-8'): '''Normalize a query string. Percent-encodes unacceptable characters and ensures percent-encoding is uppercase. ''' path = percent_encode_plus(text, encoding=encoding) return uppercase_percent_encoding(path)
[docs]def normalize_fragment(text, encoding='utf-8'): '''Normalize a fragment. Percent-encodes unacceptable characters and ensures percent-encoding is uppercase. ''' path = percent_encode(text, encoding=encoding, encode_set=FRAGMENT_ENCODE_SET) return uppercase_percent_encoding(path)
[docs]def normalize_username(text, encoding='utf-8'): '''Normalize a username Percent-encodes unacceptable characters and ensures percent-encoding is uppercase. ''' path = percent_encode(text, encoding=encoding, encode_set=USERNAME_ENCODE_SET) return uppercase_percent_encoding(path)
[docs]def normalize_password(text, encoding='utf-8'): '''Normalize a password Percent-encodes unacceptable characters and ensures percent-encoding is uppercase. ''' path = percent_encode(text, encoding=encoding, encode_set=PASSWORD_ENCODE_SET) return uppercase_percent_encoding(path)
[docs]class PercentEncoderMap(collections.defaultdict): '''Helper map for percent encoding.''' # This class is based on urllib.parse.Quoter def __init__(self, encode_set): super().__init__() self.encode_set = encode_set def __missing__(self, char): if char < 0x20 or char > 0x7E or char in self.encode_set: result = '%{:02X}'.format(char) else: result = chr(char) self[char] = result return result
_percent_encoder_map_cache = {} '''Cache of :class:`PercentEncoderMap`.'''
[docs]def percent_encode(text, encode_set=DEFAULT_ENCODE_SET, encoding='utf-8'): '''Percent encode text. Unlike Python's ``quote``, this function accepts a blacklist instead of a whitelist of safe characters. ''' byte_string = text.encode(encoding) try: mapping = _percent_encoder_map_cache[encode_set] except KeyError: mapping = _percent_encoder_map_cache[encode_set] = PercentEncoderMap( encode_set).__getitem__ return ''.join([mapping(char) for char in byte_string])
[docs]def percent_encode_plus(text, encode_set=QUERY_ENCODE_SET, encoding='utf-8'): '''Percent encode text for query strings. Unlike Python's ``quote_plus``, this function accepts a blacklist instead of a whitelist of safe characters. ''' if ' ' not in text: return percent_encode(text, encode_set, encoding) else: result = percent_encode(text, encode_set, encoding) return result.replace(' ', '+')
[docs]def percent_encode_query_value(text, encoding='utf-8'): '''Percent encode a query value.''' result = percent_encode_plus(text, QUERY_VALUE_ENCODE_SET, encoding) return result
percent_decode = urllib.parse.unquote percent_decode_plus = urllib.parse.unquote_plus
[docs]def schemes_similar(scheme1, scheme2): '''Return whether URL schemes are similar. This function considers the following schemes to be similar: * HTTP and HTTPS ''' if scheme1 == scheme2: return True if scheme1 in ('http', 'https') and scheme2 in ('http', 'https'): return True return False
[docs]def is_subdir(base_path, test_path, trailing_slash=False, wildcards=False): '''Return whether the a path is a subpath of another. Args: base_path: The base path test_path: The path which we are testing trailing_slash: If True, the trailing slash is treated with importance. For example, ``/images/`` is a directory while ``/images`` is a file. wildcards: If True, globbing wildcards are matched against paths ''' if trailing_slash: base_path = base_path.rsplit('/', 1)[0] + '/' test_path = test_path.rsplit('/', 1)[0] + '/' else: if not base_path.endswith('/'): base_path += '/' if not test_path.endswith('/'): test_path += '/' if wildcards: return fnmatch.fnmatchcase(test_path, base_path) else: return test_path.startswith(base_path)
[docs]def uppercase_percent_encoding(text): '''Uppercases percent-encoded sequences.''' if '%' not in text: return text return re.sub( r'%[a-f0-9][a-f0-9]', lambda match: match.group(0).upper(), text)
[docs]def split_query(qs, keep_blank_values=False): '''Split the query string. Note for empty values: If an equal sign (``=``) is present, the value will be an empty string (``''``). Otherwise, the value will be ``None``:: >>> list(split_query('a=&b', keep_blank_values=True)) [('a', ''), ('b', None)] No processing is done on the actual values. ''' items = [] for pair in qs.split('&'): name, delim, value = pair.partition('=') if not delim and keep_blank_values: value = None if keep_blank_values or value: items.append((name, value)) return items
[docs]def query_to_map(text): '''Return a key-values mapping from a query string. Plus symbols are replaced with spaces. ''' dict_obj = {} for key, value in split_query(text, True): if key not in dict_obj: dict_obj[key] = [] if value: dict_obj[key].append(value.replace('+', ' ')) else: dict_obj[key].append('') return query_to_map(text)
@functools.lru_cache()
[docs]def urljoin(base_url, url, allow_fragments=True): '''Join URLs like ``urllib.parse.urljoin`` but allow scheme-relative URL.''' if url.startswith('//') and len(url) > 2: scheme = base_url.partition(':')[0] if scheme: return urllib.parse.urljoin( base_url, '{0}:{1}'.format(scheme, url), allow_fragments=allow_fragments ) return urllib.parse.urljoin( base_url, url, allow_fragments=allow_fragments)
[docs]def flatten_path(path, flatten_slashes=False): '''Flatten an absolute URL path by removing the dot segments. :func:`urllib.parse.urljoin` has some support for removing dot segments, but it is conservative and only removes them as needed. Arguments: path (str): The URL path. flatten_slashes (bool): If True, consecutive slashes are removed. The path returned will always have a leading slash. ''' # Based on posixpath.normpath # Fast path if not path or path == '/': return '/' # Take off leading slash if path[0] == '/': path = path[1:] parts = path.split('/') new_parts = collections.deque() for part in parts: if part == '.' or (flatten_slashes and not part): continue elif part != '..': new_parts.append(part) elif new_parts: new_parts.pop() # If the filename is empty string if flatten_slashes and path.endswith('/') or not len(new_parts): new_parts.append('') # Put back leading slash new_parts.appendleft('') return '/'.join(new_parts)