123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- #!/usr/bin/env python
- # $Id: servers.py 1219 2013-04-19 14:35:41Z g.rodola $
- # ======================================================================
- # Copyright (C) 2007-2013 Giampaolo Rodola' <g.rodola@gmail.com>
- #
- # All Rights Reserved
- #
- # Permission is hereby granted, free of charge, to any person
- # obtaining a copy of this software and associated documentation
- # files (the "Software"), to deal in the Software without
- # restriction, including without limitation the rights to use,
- # copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the
- # Software is furnished to do so, subject to the following
- # conditions:
- #
- # The above copyright notice and this permission notice shall be
- # included in all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
- # OTHER DEALINGS IN THE SOFTWARE.
- #
- # ======================================================================
- """
- This module contains the main FTPServer class which listens on a
- host:port and dispatches the incoming connections to a handler.
- The concurrency is handled asynchronously by the main process thread,
- meaning the handler cannot block otherwise the whole server will hang.
- Other than that we have 2 subclasses changing the asynchronous concurrency
- model using multiple threads or processes.
- You might be interested in these in case your code contains blocking
- parts which cannot be adapted to the base async model or if the
- underlying filesystem is particularly slow, see:
- https://code.google.com/p/pyftpdlib/issues/detail?id=197
- https://code.google.com/p/pyftpdlib/issues/detail?id=212
- Two classes are provided:
- - ThreadingFTPServer
- - MultiprocessFTPServer
- ...spawning a new thread or process every time a client connects.
- The main thread will be async-based and be used only to accept new
- connections.
- Every time a new connection comes in that will be dispatched to a
- separate thread/process which internally will run its own IO loop.
- This way the handler handling that connections will be free to block
- without hanging the whole FTP server.
- """
- import os
- import socket
- import traceback
- import sys
- import errno
- import select
- import logging
- import signal
- import time
- from pyftpdlib.log import logger
- from pyftpdlib.ioloop import Acceptor, IOLoop
- __all__ = ['FTPServer']
- _BSD = 'bsd' in sys.platform
- # ===================================================================
- # --- base class
- # ===================================================================
- class FTPServer(Acceptor):
- """Creates a socket listening on <address>, dispatching the requests
- to a <handler> (typically FTPHandler class).
- Depending on the type of address specified IPv4 or IPv6 connections
- (or both, depending from the underlying system) will be accepted.
- All relevant session information is stored in class attributes
- described below.
- - (int) max_cons:
- number of maximum simultaneous connections accepted (defaults
- to 512). Can be set to 0 for unlimited but it is recommended
- to always have a limit to avoid running out of file descriptors
- (DoS).
- - (int) max_cons_per_ip:
- number of maximum connections accepted for the same IP address
- (defaults to 0 == unlimited).
- """
- max_cons = 512
- max_cons_per_ip = 0
- def __init__(self, address_or_socket, handler, ioloop=None, backlog=5):
- """Creates a socket listening on 'address' dispatching
- connections to a 'handler'.
- - (tuple) address_or_socket: the (host, port) pair on which
- the command channel will listen for incoming connections or
- an existent socket object.
- - (instance) handler: the handler class to use.
- - (instance) ioloop: a pyftpdlib.ioloop.IOLoop instance
- - (int) backlog: the maximum number of queued connections
- passed to listen(). If a connection request arrives when
- the queue is full the client may raise ECONNRESET.
- Defaults to 5.
- """
- Acceptor.__init__(self, ioloop=ioloop)
- self.handler = handler
- self.backlog = backlog
- self.ip_map = []
- # in case of FTPS class not properly configured we want errors
- # to be raised here rather than later, when client connects
- if hasattr(handler, 'get_ssl_context'):
- handler.get_ssl_context()
- if isinstance(address_or_socket, socket.socket):
- sock = address_or_socket
- sock.setblocking(0)
- self.set_socket(sock)
- if hasattr(sock, 'family'):
- self._af = sock.family
- else:
- # python 2.4
- ip, port = self.socket.getsockname()[:2]
- self._af = socket.getaddrinfo(ip, port, socket.AF_UNSPEC,
- socket.SOCK_STREAM)[0][0]
- else:
- self._af = self.bind_af_unspecified(address_or_socket)
- self.listen(backlog)
- @property
- def address(self):
- return self.socket.getsockname()[:2]
- def _map_len(self):
- return len(self.ioloop.socket_map)
- def _accept_new_cons(self):
- """Return True if the server is willing to accept new connections."""
- if not self.max_cons:
- return True
- else:
- return self._map_len() <= self.max_cons
- def _log_start(self):
- if not logging.getLogger().handlers:
- # If we get to this point it means the user hasn't
- # configured logger. We want to log by default so
- # we configure logging ourselves so that it will
- # print to stderr.
- from pyftpdlib.ioloop import _config_logging
- _config_logging()
- if self.handler.passive_ports:
- pasv_ports = "%s->%s" % (self.handler.passive_ports[0],
- self.handler.passive_ports[-1])
- else:
- pasv_ports = None
- addr = self.address
- logger.info(">>> starting FTP server on %s:%s, pid=%i <<<"
- % (addr[0], addr[1], os.getpid()))
- logger.info("poller: %r", self.ioloop.__class__)
- logger.info("masquerade (NAT) address: %s",
- self.handler.masquerade_address)
- logger.info("passive ports: %s", pasv_ports)
- if os.name == 'posix':
- logger.info("use sendfile(2): %s", self.handler.use_sendfile)
- def serve_forever(self, timeout=None, blocking=True, handle_exit=True):
- """Start serving.
- - (float) timeout: the timeout passed to the underlying IO
- loop expressed in seconds (default 1.0).
- - (bool) blocking: if False loop once and then return the
- timeout of the next scheduled call next to expire soonest
- (if any).
- - (bool) handle_exit: when True catches KeyboardInterrupt and
- SystemExit exceptions (generally caused by SIGTERM / SIGINT
- signals) and gracefully exits after cleaning up resources.
- Also, logs server start and stop.
- """
- if handle_exit:
- log = handle_exit and blocking == True
- if log:
- self._log_start()
- try:
- self.ioloop.loop(timeout, blocking)
- except (KeyboardInterrupt, SystemExit):
- pass
- if blocking:
- if log:
- logger.info(">>> shutting down FTP server (%s active fds) <<<",
- self._map_len())
- self.close_all()
- else:
- self.ioloop.loop(timeout, blocking)
- def handle_accepted(self, sock, addr):
- """Called when remote client initiates a connection."""
- handler = None
- ip = None
- try:
- handler = self.handler(sock, self, ioloop=self.ioloop)
- if not handler.connected:
- return
- ip = addr[0]
- self.ip_map.append(ip)
- # For performance and security reasons we should always set a
- # limit for the number of file descriptors that socket_map
- # should contain. When we're running out of such limit we'll
- # use the last available channel for sending a 421 response
- # to the client before disconnecting it.
- if not self._accept_new_cons():
- handler.handle_max_cons()
- return
- # accept only a limited number of connections from the same
- # source address.
- if self.max_cons_per_ip:
- if self.ip_map.count(ip) > self.max_cons_per_ip:
- handler.handle_max_cons_per_ip()
- return
- try:
- handler.handle()
- except:
- handler.handle_error()
- else:
- return handler
- except Exception:
- # This is supposed to be an application bug that should
- # be fixed. We do not want to tear down the server though
- # (DoS). We just log the exception, hoping that someone
- # will eventually file a bug. References:
- # - http://code.google.com/p/pyftpdlib/issues/detail?id=143
- # - http://code.google.com/p/pyftpdlib/issues/detail?id=166
- # - https://groups.google.com/forum/#!topic/pyftpdlib/h7pPybzAx14
- logger.error(traceback.format_exc())
- if handler is not None:
- handler.close()
- else:
- if ip is not None and ip in self.ip_map:
- self.ip_map.remove(ip)
- def handle_error(self):
- """Called to handle any uncaught exceptions."""
- try:
- raise
- except Exception:
- logger.error(traceback.format_exc())
- self.close()
- def close_all(self):
- """Stop serving and also disconnects all currently connected
- clients.
- """
- return self.ioloop.close()
- # ===================================================================
- # --- extra implementations
- # ===================================================================
- class _SpawnerBase(FTPServer):
- """Base class shared by multiple threads/process dispatcher.
- Not supposed to be used.
- """
- # how many seconds to wait when join()ing parent's threads
- # or processes
- join_timeout = 5
- _lock = None
- _exit = None
- def __init__(self, address, handler, ioloop=None):
- FTPServer.__init__(self, address, handler, ioloop)
- self._active_tasks = []
- def _start_task(self, *args, **kwargs):
- raise NotImplementedError('must be implemented in subclass')
- def _current_task(self):
- raise NotImplementedError('must be implemented in subclass')
- def _map_len(self):
- raise NotImplementedError('must be implemented in subclass')
- def _loop(self, handler):
- """Serve handler's IO loop in a separate thread or process."""
- ioloop = IOLoop()
- try:
- handler.ioloop = ioloop
- try:
- handler.add_channel()
- except EnvironmentError:
- err = sys.exc_info()[1]
- if err.errno == errno.EBADF:
- # we might get here in case the other end quickly
- # disconnected (see test_quick_connect())
- return
- else:
- raise
- # Here we localize variable access to minimize overhead.
- poll = ioloop.poll
- sched_poll = ioloop.sched.poll
- poll_timeout = getattr(self, 'poll_timeout', None)
- soonest_timeout = poll_timeout
- while (ioloop.socket_map or ioloop.sched._tasks) and not \
- self._exit.is_set():
- try:
- if ioloop.socket_map:
- poll(timeout=soonest_timeout)
- if ioloop.sched._tasks:
- soonest_timeout = sched_poll()
- # Handle the case where socket_map is emty but some
- # cancelled scheduled calls are still around causing
- # this while loop to hog CPU resources.
- # In theory this should never happen as all the sched
- # functions are supposed to be cancel()ed on close()
- # but by using threads we can incur into
- # synchronization issues such as this one.
- # https://code.google.com/p/pyftpdlib/issues/detail?id=245
- if not ioloop.socket_map:
- ioloop.sched.reheapify() # get rid of cancel()led calls
- soonest_timeout = sched_poll()
- if soonest_timeout:
- time.sleep(min(soonest_timeout, 1))
- else:
- soonest_timeout = None
- except (KeyboardInterrupt, SystemExit):
- # note: these two exceptions are raised in all sub
- # processes
- self._exit.set()
- except select.error:
- # on Windows we can get WSAENOTSOCK if the client
- # rapidly connect and disconnects
- err = sys.exc_info()[1]
- if os.name == 'nt' and err.args[0] == 10038:
- for fd in list(ioloop.socket_map.keys()):
- try:
- select.select([fd], [], [], 0)
- except select.error:
- try:
- logger.info("discarding broken socket %r",
- ioloop.socket_map[fd])
- del ioloop.socket_map[fd]
- except KeyError:
- # dict changed during iteration
- pass
- else:
- raise
- else:
- if poll_timeout:
- if soonest_timeout is None \
- or soonest_timeout > poll_timeout:
- soonest_timeout = poll_timeout
- finally:
- try:
- self._active_tasks.remove(self._current_task())
- except ValueError:
- pass
- ioloop.close()
- def handle_accepted(self, sock, addr):
- handler = FTPServer.handle_accepted(self, sock, addr)
- if handler is not None:
- # unregister the handler from the main IOLoop used by the
- # main thread to accept connections
- self.ioloop.unregister(handler._fileno)
- t = self._start_task(target=self._loop, args=(handler,))
- t.name = repr(addr)
- t.start()
- self._lock.acquire()
- try:
- self._active_tasks.append(t)
- finally:
- self._lock.release()
- def _log_start(self):
- FTPServer._log_start(self)
- logger.info("dispatcher: %r", self.__class__)
- def serve_forever(self, timeout=None, blocking=True, handle_exit=True):
- self._exit.clear()
- if handle_exit:
- log = handle_exit and blocking == True
- if log:
- self._log_start()
- try:
- self.ioloop.loop(timeout, blocking)
- except (KeyboardInterrupt, SystemExit):
- pass
- if blocking:
- if log:
- logger.info(">>> shutting down FTP server (%s active " \
- "workers) <<<", self._map_len())
- self.close_all()
- else:
- self.ioloop.loop(timeout, blocking)
- def close_all(self):
- tasks = self._active_tasks[:]
- # this must be set after getting active tasks as it causes
- # thread objects to get out of the list too soon
- self._exit.set()
- if tasks and hasattr(tasks[0], 'terminate'):
- # we're dealing with subprocesses
- for t in tasks:
- try:
- if not _BSD:
- t.terminate()
- else:
- # XXX - On FreeBSD using SIGTERM doesn't work
- # as the process hangs on kqueue.control() or
- # select.select(). Use SIGKILL instead.
- os.kill(t.pid, signal.SIGKILL)
- except OSError:
- err = sys.exc_info()[1]
- if err.errno != errno.ESRCH:
- raise
- self._wait_for_tasks(tasks)
- del self._active_tasks[:]
- FTPServer.close_all(self)
- def _wait_for_tasks(self, tasks):
- """Wait for threads or subprocesses to terminate."""
- warn = logger.warning
- for t in tasks:
- t.join(self.join_timeout)
- if t.is_alive():
- # Thread or process is still alive. If it's a process
- # attempt to send SIGKILL as last resort.
- # Set timeout to None so that we will exit immediately
- # in case also other threads/processes are hanging.
- self.join_timeout = None
- if hasattr(t, 'terminate'):
- msg = "could not terminate process %r" % t
- if not _BSD:
- warn(msg + "; sending SIGKILL as last resort")
- try:
- os.kill(t.pid, signal.SIGKILL)
- except OSError:
- err = sys.exc_info()[1]
- if err.errno != errno.ESRCH:
- raise
- else:
- warn(msg)
- else:
- warn("thread %r didn't terminate; ignoring it", t)
- try:
- import threading
- except ImportError:
- pass
- else:
- __all__ += ['ThreadedFTPServer']
- # compatibility with python <= 2.6
- if not hasattr(threading.Thread, 'is_alive'):
- threading.Thread.is_alive = threading.Thread.isAlive
- class ThreadedFTPServer(_SpawnerBase):
- """A modified version of base FTPServer class which spawns a
- thread every time a new connection is established.
- """
- # The timeout passed to thread's IOLoop.poll() call on every
- # loop. Necessary since threads ignore KeyboardInterrupt.
- poll_timeout = 1.0
- _lock = threading.Lock()
- _exit = threading.Event()
- # compatibility with python <= 2.6
- if not hasattr(_exit, 'is_set'):
- _exit.is_set = _exit.isSet
- def _start_task(self, *args, **kwargs):
- return threading.Thread(*args, **kwargs)
- def _current_task(self):
- return threading.currentThread()
- def _map_len(self):
- return threading.activeCount()
- if os.name == 'posix':
- try:
- import multiprocessing
- except ImportError:
- pass
- else:
- __all__ += ['MultiprocessFTPServer']
- class MultiprocessFTPServer(_SpawnerBase):
- """A modified version of base FTPServer class which spawns a
- process every time a new connection is established.
- """
- _lock = multiprocessing.Lock()
- _exit = multiprocessing.Event()
- def _start_task(self, *args, **kwargs):
- return multiprocessing.Process(*args, **kwargs)
- def _current_task(self):
- return multiprocessing.current_process()
- def _map_len(self):
- return len(multiprocessing.active_children())
|