servers.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. #!/usr/bin/env python
  2. # $Id: servers.py 1219 2013-04-19 14:35:41Z g.rodola $
  3. # ======================================================================
  4. # Copyright (C) 2007-2013 Giampaolo Rodola' <g.rodola@gmail.com>
  5. #
  6. # All Rights Reserved
  7. #
  8. # Permission is hereby granted, free of charge, to any person
  9. # obtaining a copy of this software and associated documentation
  10. # files (the "Software"), to deal in the Software without
  11. # restriction, including without limitation the rights to use,
  12. # copy, modify, merge, publish, distribute, sublicense, and/or sell
  13. # copies of the Software, and to permit persons to whom the
  14. # Software is furnished to do so, subject to the following
  15. # conditions:
  16. #
  17. # The above copyright notice and this permission notice shall be
  18. # included in all copies or substantial portions of the Software.
  19. #
  20. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  21. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
  22. # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  23. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  24. # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
  25. # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  26. # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
  27. # OTHER DEALINGS IN THE SOFTWARE.
  28. #
  29. # ======================================================================
  30. """
  31. This module contains the main FTPServer class which listens on a
  32. host:port and dispatches the incoming connections to a handler.
  33. The concurrency is handled asynchronously by the main process thread,
  34. meaning the handler cannot block otherwise the whole server will hang.
  35. Other than that we have 2 subclasses changing the asynchronous concurrency
  36. model using multiple threads or processes.
  37. You might be interested in these in case your code contains blocking
  38. parts which cannot be adapted to the base async model or if the
  39. underlying filesystem is particularly slow, see:
  40. https://code.google.com/p/pyftpdlib/issues/detail?id=197
  41. https://code.google.com/p/pyftpdlib/issues/detail?id=212
  42. Two classes are provided:
  43. - ThreadingFTPServer
  44. - MultiprocessFTPServer
  45. ...spawning a new thread or process every time a client connects.
  46. The main thread will be async-based and be used only to accept new
  47. connections.
  48. Every time a new connection comes in that will be dispatched to a
  49. separate thread/process which internally will run its own IO loop.
  50. This way the handler handling that connections will be free to block
  51. without hanging the whole FTP server.
  52. """
  53. import os
  54. import socket
  55. import traceback
  56. import sys
  57. import errno
  58. import select
  59. import logging
  60. import signal
  61. import time
  62. from pyftpdlib.log import logger
  63. from pyftpdlib.ioloop import Acceptor, IOLoop
  64. __all__ = ['FTPServer']
  65. _BSD = 'bsd' in sys.platform
  66. # ===================================================================
  67. # --- base class
  68. # ===================================================================
  69. class FTPServer(Acceptor):
  70. """Creates a socket listening on <address>, dispatching the requests
  71. to a <handler> (typically FTPHandler class).
  72. Depending on the type of address specified IPv4 or IPv6 connections
  73. (or both, depending from the underlying system) will be accepted.
  74. All relevant session information is stored in class attributes
  75. described below.
  76. - (int) max_cons:
  77. number of maximum simultaneous connections accepted (defaults
  78. to 512). Can be set to 0 for unlimited but it is recommended
  79. to always have a limit to avoid running out of file descriptors
  80. (DoS).
  81. - (int) max_cons_per_ip:
  82. number of maximum connections accepted for the same IP address
  83. (defaults to 0 == unlimited).
  84. """
  85. max_cons = 512
  86. max_cons_per_ip = 0
  87. def __init__(self, address_or_socket, handler, ioloop=None, backlog=5):
  88. """Creates a socket listening on 'address' dispatching
  89. connections to a 'handler'.
  90. - (tuple) address_or_socket: the (host, port) pair on which
  91. the command channel will listen for incoming connections or
  92. an existent socket object.
  93. - (instance) handler: the handler class to use.
  94. - (instance) ioloop: a pyftpdlib.ioloop.IOLoop instance
  95. - (int) backlog: the maximum number of queued connections
  96. passed to listen(). If a connection request arrives when
  97. the queue is full the client may raise ECONNRESET.
  98. Defaults to 5.
  99. """
  100. Acceptor.__init__(self, ioloop=ioloop)
  101. self.handler = handler
  102. self.backlog = backlog
  103. self.ip_map = []
  104. # in case of FTPS class not properly configured we want errors
  105. # to be raised here rather than later, when client connects
  106. if hasattr(handler, 'get_ssl_context'):
  107. handler.get_ssl_context()
  108. if isinstance(address_or_socket, socket.socket):
  109. sock = address_or_socket
  110. sock.setblocking(0)
  111. self.set_socket(sock)
  112. if hasattr(sock, 'family'):
  113. self._af = sock.family
  114. else:
  115. # python 2.4
  116. ip, port = self.socket.getsockname()[:2]
  117. self._af = socket.getaddrinfo(ip, port, socket.AF_UNSPEC,
  118. socket.SOCK_STREAM)[0][0]
  119. else:
  120. self._af = self.bind_af_unspecified(address_or_socket)
  121. self.listen(backlog)
  122. @property
  123. def address(self):
  124. return self.socket.getsockname()[:2]
  125. def _map_len(self):
  126. return len(self.ioloop.socket_map)
  127. def _accept_new_cons(self):
  128. """Return True if the server is willing to accept new connections."""
  129. if not self.max_cons:
  130. return True
  131. else:
  132. return self._map_len() <= self.max_cons
  133. def _log_start(self):
  134. if not logging.getLogger().handlers:
  135. # If we get to this point it means the user hasn't
  136. # configured logger. We want to log by default so
  137. # we configure logging ourselves so that it will
  138. # print to stderr.
  139. from pyftpdlib.ioloop import _config_logging
  140. _config_logging()
  141. if self.handler.passive_ports:
  142. pasv_ports = "%s->%s" % (self.handler.passive_ports[0],
  143. self.handler.passive_ports[-1])
  144. else:
  145. pasv_ports = None
  146. addr = self.address
  147. logger.info(">>> starting FTP server on %s:%s, pid=%i <<<"
  148. % (addr[0], addr[1], os.getpid()))
  149. logger.info("poller: %r", self.ioloop.__class__)
  150. logger.info("masquerade (NAT) address: %s",
  151. self.handler.masquerade_address)
  152. logger.info("passive ports: %s", pasv_ports)
  153. if os.name == 'posix':
  154. logger.info("use sendfile(2): %s", self.handler.use_sendfile)
  155. def serve_forever(self, timeout=None, blocking=True, handle_exit=True):
  156. """Start serving.
  157. - (float) timeout: the timeout passed to the underlying IO
  158. loop expressed in seconds (default 1.0).
  159. - (bool) blocking: if False loop once and then return the
  160. timeout of the next scheduled call next to expire soonest
  161. (if any).
  162. - (bool) handle_exit: when True catches KeyboardInterrupt and
  163. SystemExit exceptions (generally caused by SIGTERM / SIGINT
  164. signals) and gracefully exits after cleaning up resources.
  165. Also, logs server start and stop.
  166. """
  167. if handle_exit:
  168. log = handle_exit and blocking == True
  169. if log:
  170. self._log_start()
  171. try:
  172. self.ioloop.loop(timeout, blocking)
  173. except (KeyboardInterrupt, SystemExit):
  174. pass
  175. if blocking:
  176. if log:
  177. logger.info(">>> shutting down FTP server (%s active fds) <<<",
  178. self._map_len())
  179. self.close_all()
  180. else:
  181. self.ioloop.loop(timeout, blocking)
  182. def handle_accepted(self, sock, addr):
  183. """Called when remote client initiates a connection."""
  184. handler = None
  185. ip = None
  186. try:
  187. handler = self.handler(sock, self, ioloop=self.ioloop)
  188. if not handler.connected:
  189. return
  190. ip = addr[0]
  191. self.ip_map.append(ip)
  192. # For performance and security reasons we should always set a
  193. # limit for the number of file descriptors that socket_map
  194. # should contain. When we're running out of such limit we'll
  195. # use the last available channel for sending a 421 response
  196. # to the client before disconnecting it.
  197. if not self._accept_new_cons():
  198. handler.handle_max_cons()
  199. return
  200. # accept only a limited number of connections from the same
  201. # source address.
  202. if self.max_cons_per_ip:
  203. if self.ip_map.count(ip) > self.max_cons_per_ip:
  204. handler.handle_max_cons_per_ip()
  205. return
  206. try:
  207. handler.handle()
  208. except:
  209. handler.handle_error()
  210. else:
  211. return handler
  212. except Exception:
  213. # This is supposed to be an application bug that should
  214. # be fixed. We do not want to tear down the server though
  215. # (DoS). We just log the exception, hoping that someone
  216. # will eventually file a bug. References:
  217. # - http://code.google.com/p/pyftpdlib/issues/detail?id=143
  218. # - http://code.google.com/p/pyftpdlib/issues/detail?id=166
  219. # - https://groups.google.com/forum/#!topic/pyftpdlib/h7pPybzAx14
  220. logger.error(traceback.format_exc())
  221. if handler is not None:
  222. handler.close()
  223. else:
  224. if ip is not None and ip in self.ip_map:
  225. self.ip_map.remove(ip)
  226. def handle_error(self):
  227. """Called to handle any uncaught exceptions."""
  228. try:
  229. raise
  230. except Exception:
  231. logger.error(traceback.format_exc())
  232. self.close()
  233. def close_all(self):
  234. """Stop serving and also disconnects all currently connected
  235. clients.
  236. """
  237. return self.ioloop.close()
  238. # ===================================================================
  239. # --- extra implementations
  240. # ===================================================================
  241. class _SpawnerBase(FTPServer):
  242. """Base class shared by multiple threads/process dispatcher.
  243. Not supposed to be used.
  244. """
  245. # how many seconds to wait when join()ing parent's threads
  246. # or processes
  247. join_timeout = 5
  248. _lock = None
  249. _exit = None
  250. def __init__(self, address, handler, ioloop=None):
  251. FTPServer.__init__(self, address, handler, ioloop)
  252. self._active_tasks = []
  253. def _start_task(self, *args, **kwargs):
  254. raise NotImplementedError('must be implemented in subclass')
  255. def _current_task(self):
  256. raise NotImplementedError('must be implemented in subclass')
  257. def _map_len(self):
  258. raise NotImplementedError('must be implemented in subclass')
  259. def _loop(self, handler):
  260. """Serve handler's IO loop in a separate thread or process."""
  261. ioloop = IOLoop()
  262. try:
  263. handler.ioloop = ioloop
  264. try:
  265. handler.add_channel()
  266. except EnvironmentError:
  267. err = sys.exc_info()[1]
  268. if err.errno == errno.EBADF:
  269. # we might get here in case the other end quickly
  270. # disconnected (see test_quick_connect())
  271. return
  272. else:
  273. raise
  274. # Here we localize variable access to minimize overhead.
  275. poll = ioloop.poll
  276. sched_poll = ioloop.sched.poll
  277. poll_timeout = getattr(self, 'poll_timeout', None)
  278. soonest_timeout = poll_timeout
  279. while (ioloop.socket_map or ioloop.sched._tasks) and not \
  280. self._exit.is_set():
  281. try:
  282. if ioloop.socket_map:
  283. poll(timeout=soonest_timeout)
  284. if ioloop.sched._tasks:
  285. soonest_timeout = sched_poll()
  286. # Handle the case where socket_map is emty but some
  287. # cancelled scheduled calls are still around causing
  288. # this while loop to hog CPU resources.
  289. # In theory this should never happen as all the sched
  290. # functions are supposed to be cancel()ed on close()
  291. # but by using threads we can incur into
  292. # synchronization issues such as this one.
  293. # https://code.google.com/p/pyftpdlib/issues/detail?id=245
  294. if not ioloop.socket_map:
  295. ioloop.sched.reheapify() # get rid of cancel()led calls
  296. soonest_timeout = sched_poll()
  297. if soonest_timeout:
  298. time.sleep(min(soonest_timeout, 1))
  299. else:
  300. soonest_timeout = None
  301. except (KeyboardInterrupt, SystemExit):
  302. # note: these two exceptions are raised in all sub
  303. # processes
  304. self._exit.set()
  305. except select.error:
  306. # on Windows we can get WSAENOTSOCK if the client
  307. # rapidly connect and disconnects
  308. err = sys.exc_info()[1]
  309. if os.name == 'nt' and err.args[0] == 10038:
  310. for fd in list(ioloop.socket_map.keys()):
  311. try:
  312. select.select([fd], [], [], 0)
  313. except select.error:
  314. try:
  315. logger.info("discarding broken socket %r",
  316. ioloop.socket_map[fd])
  317. del ioloop.socket_map[fd]
  318. except KeyError:
  319. # dict changed during iteration
  320. pass
  321. else:
  322. raise
  323. else:
  324. if poll_timeout:
  325. if soonest_timeout is None \
  326. or soonest_timeout > poll_timeout:
  327. soonest_timeout = poll_timeout
  328. finally:
  329. try:
  330. self._active_tasks.remove(self._current_task())
  331. except ValueError:
  332. pass
  333. ioloop.close()
  334. def handle_accepted(self, sock, addr):
  335. handler = FTPServer.handle_accepted(self, sock, addr)
  336. if handler is not None:
  337. # unregister the handler from the main IOLoop used by the
  338. # main thread to accept connections
  339. self.ioloop.unregister(handler._fileno)
  340. t = self._start_task(target=self._loop, args=(handler,))
  341. t.name = repr(addr)
  342. t.start()
  343. self._lock.acquire()
  344. try:
  345. self._active_tasks.append(t)
  346. finally:
  347. self._lock.release()
  348. def _log_start(self):
  349. FTPServer._log_start(self)
  350. logger.info("dispatcher: %r", self.__class__)
  351. def serve_forever(self, timeout=None, blocking=True, handle_exit=True):
  352. self._exit.clear()
  353. if handle_exit:
  354. log = handle_exit and blocking == True
  355. if log:
  356. self._log_start()
  357. try:
  358. self.ioloop.loop(timeout, blocking)
  359. except (KeyboardInterrupt, SystemExit):
  360. pass
  361. if blocking:
  362. if log:
  363. logger.info(">>> shutting down FTP server (%s active " \
  364. "workers) <<<", self._map_len())
  365. self.close_all()
  366. else:
  367. self.ioloop.loop(timeout, blocking)
  368. def close_all(self):
  369. tasks = self._active_tasks[:]
  370. # this must be set after getting active tasks as it causes
  371. # thread objects to get out of the list too soon
  372. self._exit.set()
  373. if tasks and hasattr(tasks[0], 'terminate'):
  374. # we're dealing with subprocesses
  375. for t in tasks:
  376. try:
  377. if not _BSD:
  378. t.terminate()
  379. else:
  380. # XXX - On FreeBSD using SIGTERM doesn't work
  381. # as the process hangs on kqueue.control() or
  382. # select.select(). Use SIGKILL instead.
  383. os.kill(t.pid, signal.SIGKILL)
  384. except OSError:
  385. err = sys.exc_info()[1]
  386. if err.errno != errno.ESRCH:
  387. raise
  388. self._wait_for_tasks(tasks)
  389. del self._active_tasks[:]
  390. FTPServer.close_all(self)
  391. def _wait_for_tasks(self, tasks):
  392. """Wait for threads or subprocesses to terminate."""
  393. warn = logger.warning
  394. for t in tasks:
  395. t.join(self.join_timeout)
  396. if t.is_alive():
  397. # Thread or process is still alive. If it's a process
  398. # attempt to send SIGKILL as last resort.
  399. # Set timeout to None so that we will exit immediately
  400. # in case also other threads/processes are hanging.
  401. self.join_timeout = None
  402. if hasattr(t, 'terminate'):
  403. msg = "could not terminate process %r" % t
  404. if not _BSD:
  405. warn(msg + "; sending SIGKILL as last resort")
  406. try:
  407. os.kill(t.pid, signal.SIGKILL)
  408. except OSError:
  409. err = sys.exc_info()[1]
  410. if err.errno != errno.ESRCH:
  411. raise
  412. else:
  413. warn(msg)
  414. else:
  415. warn("thread %r didn't terminate; ignoring it", t)
  416. try:
  417. import threading
  418. except ImportError:
  419. pass
  420. else:
  421. __all__ += ['ThreadedFTPServer']
  422. # compatibility with python <= 2.6
  423. if not hasattr(threading.Thread, 'is_alive'):
  424. threading.Thread.is_alive = threading.Thread.isAlive
  425. class ThreadedFTPServer(_SpawnerBase):
  426. """A modified version of base FTPServer class which spawns a
  427. thread every time a new connection is established.
  428. """
  429. # The timeout passed to thread's IOLoop.poll() call on every
  430. # loop. Necessary since threads ignore KeyboardInterrupt.
  431. poll_timeout = 1.0
  432. _lock = threading.Lock()
  433. _exit = threading.Event()
  434. # compatibility with python <= 2.6
  435. if not hasattr(_exit, 'is_set'):
  436. _exit.is_set = _exit.isSet
  437. def _start_task(self, *args, **kwargs):
  438. return threading.Thread(*args, **kwargs)
  439. def _current_task(self):
  440. return threading.currentThread()
  441. def _map_len(self):
  442. return threading.activeCount()
  443. if os.name == 'posix':
  444. try:
  445. import multiprocessing
  446. except ImportError:
  447. pass
  448. else:
  449. __all__ += ['MultiprocessFTPServer']
  450. class MultiprocessFTPServer(_SpawnerBase):
  451. """A modified version of base FTPServer class which spawns a
  452. process every time a new connection is established.
  453. """
  454. _lock = multiprocessing.Lock()
  455. _exit = multiprocessing.Event()
  456. def _start_task(self, *args, **kwargs):
  457. return multiprocessing.Process(*args, **kwargs)
  458. def _current_task(self):
  459. return multiprocessing.current_process()
  460. def _map_len(self):
  461. return len(multiprocessing.active_children())