Source code for wpull.processor.ftp

'''FTP'''
import asyncio
import copy
import fnmatch
import gettext
import logging
import os
import posixpath
import tempfile
import urllib.parse

import namedlist
from typing import cast

from wpull.backport.logging import StyleAdapter
from wpull.body import Body
from wpull.cache import LRUCache
from wpull.errors import ProtocolError
from wpull.application.hook import Actions
from wpull.pipeline.item import LinkType
from wpull.pipeline.session import ItemSession
from wpull.processor.base import BaseProcessor, BaseProcessorSession, \
    REMOTE_ERRORS
from wpull.processor.rule import ResultRule, FetchRule
from wpull.protocol.ftp.client import Client
from wpull.protocol.ftp.request import Request, ListingResponse, Response
from wpull.protocol.ftp.util import FTPServerError
from wpull.scraper.util import urljoin_safe
from wpull.url import parse_url_or_log, URLInfo
from wpull.writer import NullWriter, BaseFileWriter

_logger = StyleAdapter(logging.getLogger(__name__))
_ = gettext.gettext

GLOB_CHARS = frozenset('[]*?')


FTPProcessorFetchParams = namedlist.namedtuple(
    'FTPProcessorFetchParamsType',
    [
        ('remove_listing', True),
        ('glob', True),
        ('preserve_permissions', False),
        ('retr_symlinks', True),
    ]
)
'''FTPProcessorFetchParams

Args:
    remove_listing (bool): Remove `.listing` files after fetching.
    glob (bool): Enable URL globbing.
    preserve_permissions (bool): Preserve file permissions.
    follow_symlinks (bool): Follow symlinks.
'''


[docs]class HookPreResponseBreak(ProtocolError): '''Hook pre-response break.'''
[docs]class FTPProcessor(BaseProcessor): '''FTP processor. Args: ftp_client: The FTP client. fetch_params (:class:`WebProcessorFetchParams`): Parameters for fetching. ''' def __init__(self, ftp_client: Client, fetch_params): super().__init__() self._ftp_client = ftp_client self._fetch_params = fetch_params self._listing_cache = LRUCache(max_items=10, time_to_live=3600) @property def ftp_client(self) -> Client: '''The ftp client.''' return self._ftp_client @property def fetch_params(self) -> FTPProcessorFetchParams: '''The fetch parameters.''' return self._fetch_params @property def listing_cache(self) -> LRUCache: '''Listing cache. Returns: A cache mapping from URL to list of :class:`.ftp.ls.listing.FileEntry`. ''' return self._listing_cache @asyncio.coroutine
[docs] def process(self, item_session: ItemSession): session = FTPProcessorSession(self, item_session) try: return (yield from session.process()) finally: session.close()
[docs] def close(self): '''Close the FTP client.''' self._ftp_client.close()
[docs]class FTPProcessorSession(BaseProcessorSession): '''Fetches FTP files or directory listings.''' def __init__(self, processor: FTPProcessor, item_session: ItemSession): super().__init__() self._processor = processor self._item_session = item_session self._fetch_rule = cast(FetchRule, item_session.app_session.factory['FetchRule']) self._result_rule = cast(ResultRule, item_session.app_session.factory['ResultRule']) file_writer = cast(BaseFileWriter, item_session.app_session.factory['FileWriter']) self._file_writer_session = file_writer.session() self._glob_pattern = None
[docs] def close(self): pass
@asyncio.coroutine
[docs] def process(self): '''Process. Coroutine. ''' self._item_session.request = request = Request(self._item_session.url_record.url) verdict = self._fetch_rule.check_ftp_request(self._item_session)[0] if not verdict: self._item_session.skip() return self._add_request_password(request) dir_name, filename = self._item_session.url_record.url_info.split_path() if self._processor.fetch_params.glob and frozenset(filename) & GLOB_CHARS: request = self._to_directory_request(request) is_file = False self._glob_pattern = urllib.parse.unquote(filename) else: is_file = yield from self._prepare_request_file_vs_dir(request) self._file_writer_session.process_request(request) wait_time = yield from self._fetch(request, is_file) if wait_time: _logger.debug('Sleeping {0}.', wait_time) yield from asyncio.sleep(wait_time)
def _add_request_password(self, request: Request): if self._fetch_rule.ftp_login: request.username, request.password = self._fetch_rule.ftp_login @classmethod def _to_directory_request(cls, request: Request) -> Request: directory_url = to_dir_path_url(request.url_info) directory_request = copy.deepcopy(request) directory_request.url = directory_url return directory_request @asyncio.coroutine def _prepare_request_file_vs_dir(self, request: Request) -> bool: '''Check if file, modify request, and return whether is a file. Coroutine. ''' if self._item_session.url_record.link_type: is_file = self._item_session.url_record.link_type == LinkType.file elif request.url_info.path.endswith('/'): is_file = False else: is_file = 'unknown' if is_file == 'unknown': files = yield from self._fetch_parent_path(request) if not files: return True filename = posixpath.basename(request.file_path) for file_entry in files: if file_entry.name == filename: _logger.debug('Found entry in parent. Type {}', file_entry.type) is_file = file_entry.type != 'dir' break else: _logger.debug('Did not find entry. Assume file.') return True if not is_file: request.url = append_slash_to_path_url(request.url_info) _logger.debug('Request URL changed to {}. Path={}.', request.url, request.file_path) return is_file @asyncio.coroutine def _fetch_parent_path(self, request: Request, use_cache: bool=True): '''Fetch parent directory and return list FileEntry. Coroutine. ''' directory_url = to_dir_path_url(request.url_info) if use_cache: if directory_url in self._processor.listing_cache: return self._processor.listing_cache[directory_url] directory_request = copy.deepcopy(request) directory_request.url = directory_url _logger.debug('Check if URL {} is file with {}.', request.url, directory_url) with self._processor.ftp_client.session() as session: try: yield from session.start_listing(directory_request) except FTPServerError: _logger.debug('Got an error. Assume is file.') if use_cache: self._processor.listing_cache[directory_url] = None return temp_file = tempfile.NamedTemporaryFile( dir=self._item_session.app_session.root_path, prefix='tmp-wpull-list' ) with temp_file as file: directory_response = yield from session.download_listing( file, duration_timeout=self._fetch_rule.duration_timeout) if use_cache: self._processor.listing_cache[directory_url] = \ directory_response.files return directory_response.files @asyncio.coroutine def _fetch(self, request: Request, is_file: bool): '''Fetch the request Coroutine. ''' _logger.info(_('Fetching ‘{url}’.'), url=request.url) self._item_session.request = request response = None try: with self._processor.ftp_client.session() as session: if is_file: response = yield from session.start(request) else: response = yield from session.start_listing(request) self._item_session.response = response action = self._result_rule.handle_pre_response( self._item_session ) if action in (Actions.RETRY, Actions.FINISH): raise HookPreResponseBreak() self._file_writer_session.process_response(response) if not response.body: response.body = Body( directory=self._item_session.app_session.root_path, hint='resp_cb') duration_timeout = self._fetch_rule.duration_timeout if is_file: yield from session.download( response.body, duration_timeout=duration_timeout) else: yield from session.download_listing( response.body, duration_timeout=duration_timeout) except HookPreResponseBreak: if response: response.body.close() except REMOTE_ERRORS as error: self._log_error(request, error) self._result_rule.handle_error(self._item_session, error) wait_time = self._result_rule.get_wait_time( self._item_session, error=error ) if response: response.body.close() return wait_time else: self._log_response(request, response) self._handle_response(request, response) wait_time = self._result_rule.get_wait_time( self._item_session ) if is_file and \ self._processor.fetch_params.preserve_permissions and \ hasattr(response.body, 'name'): yield from self._apply_unix_permissions(request, response) response.body.close() return wait_time def _add_listing_links(self, response: ListingResponse): '''Add links from file listing response.''' base_url = response.request.url_info.url if self._glob_pattern: level = self._item_session.url_record.level else: level = None for file_entry in response.files: if self._glob_pattern and \ not fnmatch.fnmatchcase(file_entry.name, self._glob_pattern): continue if file_entry.type == 'dir': linked_url = urljoin_safe(base_url, file_entry.name + '/') elif file_entry.type in ('file', 'symlink', None): if not self._processor.fetch_params.retr_symlinks and \ file_entry.type == 'symlink': self._make_symlink(file_entry.name, file_entry.dest) linked_url = None else: linked_url = urljoin_safe(base_url, file_entry.name) else: linked_url = None if linked_url: linked_url_info = parse_url_or_log(linked_url) if linked_url_info: verdict = self._fetch_rule.check_ftp_request(self._item_session)[0] if verdict: if linked_url_info.path.endswith('/'): self._item_session.add_child_url(linked_url_info.url, link_type=LinkType.directory) else: self._item_session.add_child_url(linked_url_info.url, link_type=LinkType.file, level=level) def _log_response(self, request: Request, response: Response): '''Log response.''' _logger.info( _('Fetched ‘{url}’: {reply_code} {reply_text}. ' 'Length: {content_length}.'), url=request.url, reply_code=response.reply.code, reply_text=response.reply.text, content_length=response.body.size(), ) def _handle_response(self, request: Request, response: Response): '''Process a response.''' self._item_session.update_record_value(status_code=response.reply.code) is_listing = isinstance(response, ListingResponse) if is_listing and not self._processor.fetch_params.remove_listing or \ not is_listing: filename = self._file_writer_session.save_document(response) action = self._result_rule.handle_document(self._item_session, filename) else: self._file_writer_session.discard_document(response) action = self._result_rule.handle_no_document(self._item_session) if isinstance(response, ListingResponse): self._add_listing_links(response) return action def _make_symlink(self, link_name: str, link_target: str): '''Make a symlink on the system.''' path = self._file_writer_session.extra_resource_path('dummy') if path: dir_path = os.path.dirname(path) symlink_path = os.path.join(dir_path, link_name) _logger.debug('symlink {} -> {}', symlink_path, link_target) os.symlink(link_target, symlink_path) _logger.info( _('Created symbolic link {symlink_path} to target {symlink_target}.'), symlink_path=symlink_path, symlink_target=link_target ) @asyncio.coroutine def _apply_unix_permissions(self, request: Request, response: Response): '''Fetch and apply Unix permissions. Coroutine. ''' files = yield from self._fetch_parent_path(request) if not files: return filename = posixpath.basename(request.file_path) for file_entry in files: if file_entry.name == filename and file_entry.perm: _logger.debug( 'Set chmod {} o{:o}.', response.body.name, file_entry.perm ) os.chmod(response.body.name, file_entry.perm)
[docs]def to_dir_path_url(url_info: URLInfo) -> str: '''Return URL string with the path replaced with directory only.''' dir_name = posixpath.dirname(url_info.path) if not dir_name.endswith('/'): url_template = 'ftp://{}{}/' else: url_template = 'ftp://{}{}' return url_template.format(url_info.hostname_with_port, dir_name)
[docs]def append_slash_to_path_url(url_info: URLInfo) -> str: '''Return URL string with the path suffixed with a slash.''' return 'ftp://{}{}/'.format(url_info.hostname_with_port, url_info.path)