ioloop.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900
  1. #!/usr/bin/env python
  2. # $Id: ioloop.py 1217 2013-04-18 18:21:44Z g.rodola $
  3. # ======================================================================
  4. # Copyright (C) 2007-2013 Giampaolo Rodola' <g.rodola@gmail.com>
  5. #
  6. # All Rights Reserved
  7. #
  8. # Permission is hereby granted, free of charge, to any person
  9. # obtaining a copy of this software and associated documentation
  10. # files (the "Software"), to deal in the Software without
  11. # restriction, including without limitation the rights to use,
  12. # copy, modify, merge, publish, distribute, sublicense, and/or sell
  13. # copies of the Software, and to permit persons to whom the
  14. # Software is furnished to do so, subject to the following
  15. # conditions:
  16. #
  17. # The above copyright notice and this permission notice shall be
  18. # included in all copies or substantial portions of the Software.
  19. #
  20. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  21. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
  22. # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  23. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  24. # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
  25. # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  26. # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
  27. # OTHER DEALINGS IN THE SOFTWARE.
  28. #
  29. # ======================================================================
  30. """
  31. A specialized IO loop on top of asyncore adding support for epoll()
  32. on Linux and kqueue() and OSX/BSD, dramatically increasing performances
  33. offered by base asyncore module.
  34. poll() and select() loops are also reimplemented and are an order of
  35. magnitude faster as they support fd un/registration and modification.
  36. This module is not supposed to be used directly unless you want to
  37. include a new dispatcher which runs within the main FTP server loop,
  38. in which case:
  39. __________________________________________________________________
  40. | | |
  41. | INSTEAD OF | ...USE: |
  42. |______________________|___________________________________________|
  43. | | |
  44. | asyncore.dispacher | Acceptor (for servers) |
  45. | asyncore.dispacher | Connector (for clients) |
  46. | asynchat.async_chat | AsyncChat (for a full duplex connection ) |
  47. | asyncore.loop | FTPServer.server_forever() |
  48. |______________________|___________________________________________|
  49. asyncore.dispatcher_with_send is not supported, same for "map" argument
  50. for asyncore.loop and asyncore.dispatcher and asynchat.async_chat
  51. constructors.
  52. Follows a server example:
  53. import socket
  54. from pyftpdlib.ioloop import IOLoop, Acceptor, AsyncChat
  55. class Handler(AsyncChat):
  56. def __init__(self, sock):
  57. AsyncChat.__init__(self, sock)
  58. self.push('200 hello\r\n')
  59. self.close_when_done()
  60. class Server(Acceptor):
  61. def __init__(self, host, port):
  62. Acceptor.__init__(self)
  63. self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
  64. self.set_reuse_addr()
  65. self.bind((host, port))
  66. self.listen(5)
  67. def handle_accepted(self, sock, addr):
  68. Handler(sock)
  69. server = Server('localhost', 8021)
  70. IOLoop.instance().loop()
  71. """
  72. import asyncore
  73. import asynchat
  74. import errno
  75. import select
  76. import os
  77. import sys
  78. import traceback
  79. import time
  80. import heapq
  81. import socket
  82. import logging
  83. try:
  84. import threading
  85. except ImportError:
  86. import dummy_threading as threading
  87. from pyftpdlib._compat import MAXSIZE, callable, b
  88. from pyftpdlib.log import logger, _config_logging
  89. timer = getattr(time, 'monotonic', time.time)
  90. _read = asyncore.read
  91. _write = asyncore.write
  92. # ===================================================================
  93. # --- scheduler
  94. # ===================================================================
  95. class _Scheduler(object):
  96. """Run the scheduled functions due to expire soonest (if any)."""
  97. def __init__(self):
  98. # the heap used for the scheduled tasks
  99. self._tasks = []
  100. self._cancellations = 0
  101. def poll(self):
  102. """Run the scheduled functions due to expire soonest and
  103. return the timeout of the next one (if any, else None).
  104. """
  105. now = timer()
  106. calls = []
  107. while self._tasks:
  108. if now < self._tasks[0].timeout:
  109. break
  110. call = heapq.heappop(self._tasks)
  111. if call.cancelled:
  112. self._cancellations -= 1
  113. else:
  114. calls.append(call)
  115. for call in calls:
  116. if call._repush:
  117. heapq.heappush(self._tasks, call)
  118. call._repush = False
  119. continue
  120. try:
  121. call.call()
  122. except Exception:
  123. logger.error(traceback.format_exc())
  124. # remove cancelled tasks and re-heapify the queue if the
  125. # number of cancelled tasks is more than the half of the
  126. # entire queue
  127. if self._cancellations > 512 \
  128. and self._cancellations > (len(self._tasks) >> 1):
  129. self.reheapify()
  130. try:
  131. return max(0, self._tasks[0].timeout - now)
  132. except IndexError:
  133. pass
  134. def register(self, what):
  135. """Register a _CallLater instance."""
  136. heapq.heappush(self._tasks, what)
  137. def unregister(self, what):
  138. """Unregister a _CallLater instance.
  139. The actual unregistration will happen at a later time though.
  140. """
  141. self._cancellations += 1
  142. def reheapify(self):
  143. """Get rid of cancelled calls and reinitialize the internal heap."""
  144. self._cancellations = 0
  145. self._tasks = [x for x in self._tasks if not x.cancelled]
  146. heapq.heapify(self._tasks)
  147. class _CallLater(object):
  148. """Container object which instance is returned by ioloop.call_later()."""
  149. __slots__ = ('_delay', '_target', '_args', '_kwargs', '_errback', '_sched',
  150. '_repush', 'timeout', 'cancelled')
  151. def __init__(self, seconds, target, *args, **kwargs):
  152. assert callable(target), "%s is not callable" % target
  153. assert MAXSIZE >= seconds >= 0, "%s is not greater than or equal " \
  154. "to 0 seconds" % seconds
  155. self._delay = seconds
  156. self._target = target
  157. self._args = args
  158. self._kwargs = kwargs
  159. self._errback = kwargs.pop('_errback', None)
  160. self._sched = kwargs.pop('_scheduler')
  161. self._repush = False
  162. # seconds from the epoch at which to call the function
  163. if not seconds:
  164. self.timeout = 0
  165. else:
  166. self.timeout = timer() + self._delay
  167. self.cancelled = False
  168. self._sched.register(self)
  169. def __lt__(self, other):
  170. return self.timeout < other.timeout
  171. def __le__(self, other):
  172. return self.timeout <= other.timeout
  173. def __repr__(self):
  174. if self._target is None:
  175. sig = object.__repr__(self)
  176. else:
  177. sig = repr(self._target)
  178. sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' \
  179. % (self._args or '[]', self._kwargs or '{}', self.cancelled,
  180. self._delay)
  181. return '<%s>' % sig
  182. __str__ = __repr__
  183. def _post_call(self, exc):
  184. if not self.cancelled:
  185. self.cancel()
  186. def call(self):
  187. """Call this scheduled function."""
  188. assert not self.cancelled, "already cancelled"
  189. exc = None
  190. try:
  191. try:
  192. self._target(*self._args, **self._kwargs)
  193. except Exception:
  194. exc = sys.exc_info()[1]
  195. if self._errback is not None:
  196. self._errback()
  197. else:
  198. raise
  199. finally:
  200. self._post_call(exc)
  201. def reset(self):
  202. """Reschedule this call resetting the current countdown."""
  203. assert not self.cancelled, "already cancelled"
  204. self.timeout = timer() + self._delay
  205. self._repush = True
  206. def cancel(self):
  207. """Unschedule this call."""
  208. assert not self.cancelled, "already cancelled"
  209. self.cancelled = True
  210. self._target = self._args = self._kwargs = self._errback = None
  211. self._sched.unregister(self)
  212. class _CallEvery(_CallLater):
  213. """Container object which instance is returned by IOLoop.call_every()."""
  214. def _post_call(self, exc):
  215. if not self.cancelled:
  216. if exc:
  217. self.cancel()
  218. else:
  219. self.timeout = timer() + self._delay
  220. self._sched.register(self)
  221. class _IOLoop(object):
  222. """Base class which will later be referred as IOLoop."""
  223. READ = 1
  224. WRITE = 2
  225. _instance = None
  226. _lock = threading.Lock()
  227. _started_once = False
  228. def __init__(self):
  229. self.socket_map = {}
  230. self.sched = _Scheduler()
  231. @classmethod
  232. def instance(cls):
  233. """Return a global IOLoop instance."""
  234. if cls._instance is None:
  235. cls._lock.acquire()
  236. try:
  237. if cls._instance is None:
  238. cls._instance = cls()
  239. finally:
  240. cls._lock.release()
  241. return cls._instance
  242. def register(self, fd, instance, events):
  243. """Register a fd, handled by instance for the given events."""
  244. raise NotImplementedError('must be implemented in subclass')
  245. def unregister(self, fd):
  246. """Register fd."""
  247. raise NotImplementedError('must be implemented in subclass')
  248. def modify(self, fd, events):
  249. """Changes the events assigned for fd."""
  250. raise NotImplementedError('must be implemented in subclass')
  251. def poll(self, timeout):
  252. """Poll once. The subclass overriding this method is supposed
  253. to poll over the registered handlers and the scheduled functions
  254. and then return.
  255. """
  256. raise NotImplementedError('must be implemented in subclass')
  257. def loop(self, timeout=None, blocking=True):
  258. """Start the asynchronous IO loop.
  259. - (float) timeout: the timeout passed to the underlying
  260. multiplex syscall (select(), epoll() etc.).
  261. - (bool) blocking: if True poll repeatedly, as long as there
  262. are registered handlers and/or scheduled functions.
  263. If False poll only once and return the timeout of the next
  264. scheduled call (if any, else None).
  265. """
  266. if not _IOLoop._started_once:
  267. _IOLoop._started_once = True
  268. if not logging.getLogger().handlers:
  269. # If we get to this point it means the user hasn't
  270. # configured logging. We want to log by default so
  271. # we configure logging ourselves so that it will
  272. # print to stderr.
  273. _config_logging()
  274. if blocking:
  275. # localize variable access to minimize overhead
  276. poll = self.poll
  277. socket_map = self.socket_map
  278. tasks = self.sched._tasks
  279. sched_poll = self.sched.poll
  280. if timeout is not None:
  281. while socket_map:
  282. poll(timeout)
  283. sched_poll()
  284. else:
  285. soonest_timeout = None
  286. while socket_map:
  287. poll(soonest_timeout)
  288. soonest_timeout = sched_poll()
  289. else:
  290. sched = self.sched
  291. if self.socket_map:
  292. self.poll(timeout)
  293. if sched._tasks:
  294. return sched.poll()
  295. def call_later(self, seconds, target, *args, **kwargs):
  296. """Calls a function at a later time.
  297. It can be used to asynchronously schedule a call within the polling
  298. loop without blocking it. The instance returned is an object that
  299. can be used to cancel or reschedule the call.
  300. - (int) seconds: the number of seconds to wait
  301. - (obj) target: the callable object to call later
  302. - args: the arguments to call it with
  303. - kwargs: the keyword arguments to call it with; a special
  304. '_errback' parameter can be passed: it is a callable
  305. called in case target function raises an exception.
  306. """
  307. kwargs['_scheduler'] = self.sched
  308. return _CallLater(seconds, target, *args, **kwargs)
  309. def call_every(self, seconds, target, *args, **kwargs):
  310. """Schedules the given callback to be called periodically."""
  311. kwargs['_scheduler'] = self.sched
  312. return _CallEvery(seconds, target, *args, **kwargs)
  313. def close(self):
  314. """Closes the IOLoop, freeing any resources used."""
  315. self.__class__._instance = None
  316. # free connections
  317. instances = sorted(self.socket_map.values(), key=lambda x: x._fileno)
  318. for inst in instances:
  319. try:
  320. inst.close()
  321. except OSError:
  322. err = sys.exc_info()[1]
  323. if err.args[0] != errno.EBADF:
  324. logger.error(traceback.format_exc())
  325. except Exception:
  326. logger.error(traceback.format_exc())
  327. self.socket_map.clear()
  328. # free scheduled functions
  329. for x in self.sched._tasks:
  330. try:
  331. if not x.cancelled:
  332. x.cancel()
  333. except Exception:
  334. logger.error(traceback.format_exc())
  335. del self.sched._tasks[:]
  336. # ===================================================================
  337. # --- select() - POSIX / Windows
  338. # ===================================================================
  339. class Select(_IOLoop):
  340. """select()-based poller."""
  341. def __init__(self):
  342. _IOLoop.__init__(self)
  343. self._r = []
  344. self._w = []
  345. def register(self, fd, instance, events):
  346. if fd not in self.socket_map:
  347. self.socket_map[fd] = instance
  348. if events & self.READ:
  349. self._r.append(fd)
  350. if events & self.WRITE:
  351. self._w.append(fd)
  352. def unregister(self, fd):
  353. try:
  354. del self.socket_map[fd]
  355. except KeyError:
  356. pass
  357. for l in (self._r, self._w):
  358. try:
  359. l.remove(fd)
  360. except ValueError:
  361. pass
  362. def modify(self, fd, events):
  363. inst = self.socket_map.get(fd)
  364. if inst is not None:
  365. self.unregister(fd)
  366. self.register(fd, inst, events)
  367. def poll(self, timeout):
  368. try:
  369. r, w, e = select.select(self._r, self._w, [], timeout)
  370. except select.error:
  371. err = sys.exc_info()[1]
  372. if err.args[0] == errno.EINTR:
  373. return
  374. raise
  375. smap_get = self.socket_map.get
  376. for fd in r:
  377. obj = smap_get(fd)
  378. if obj is None or not obj.readable():
  379. continue
  380. _read(obj)
  381. for fd in w:
  382. obj = smap_get(fd)
  383. if obj is None or not obj.writable():
  384. continue
  385. _write(obj)
  386. # ===================================================================
  387. # --- poll() / epoll()
  388. # ===================================================================
  389. class _BasePollEpoll(_IOLoop):
  390. """This is common to both poll/epoll implementations which
  391. almost share the same interface.
  392. Not supposed to be used directly.
  393. """
  394. def __init__(self):
  395. _IOLoop.__init__(self)
  396. self._poller = self._poller()
  397. def register(self, fd, instance, events):
  398. self._poller.register(fd, events)
  399. self.socket_map[fd] = instance
  400. def unregister(self, fd):
  401. try:
  402. del self.socket_map[fd]
  403. except KeyError:
  404. pass
  405. else:
  406. self._poller.unregister(fd)
  407. def modify(self, fd, events):
  408. self._poller.modify(fd, events)
  409. def poll(self, timeout):
  410. try:
  411. events = self._poller.poll(timeout or -1) # -1 waits indefinitely
  412. except (IOError, select.error): # for epoll() and poll() respectively
  413. err = sys.exc_info()[1]
  414. if err.args[0] == errno.EINTR:
  415. return
  416. raise
  417. # localize variable access to minimize overhead
  418. smap_get = self.socket_map.get
  419. for fd, event in events:
  420. inst = smap_get(fd)
  421. if inst is None:
  422. continue
  423. if event & self._ERROR and not event & self.READ:
  424. inst.handle_close()
  425. else:
  426. if event & self.READ:
  427. if inst.readable():
  428. _read(inst)
  429. if event & self.WRITE:
  430. if inst.writable():
  431. _write(inst)
  432. # ===================================================================
  433. # --- poll() - POSIX
  434. # ===================================================================
  435. if hasattr(select, 'poll'):
  436. class Poll(_BasePollEpoll):
  437. """poll() based poller."""
  438. READ = select.POLLIN
  439. WRITE = select.POLLOUT
  440. _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL
  441. _poller = select.poll
  442. # select.poll() on py < 2.6 has no 'modify' method
  443. if not hasattr(select.poll(), 'modify'):
  444. def modify(self, fd, events):
  445. inst = self.socket_map[fd]
  446. self.unregister(fd)
  447. self.register(fd, inst, events)
  448. def poll(self, timeout):
  449. # poll() timeout is expressed in milliseconds
  450. if timeout is not None:
  451. timeout = int(timeout * 1000)
  452. _BasePollEpoll.poll(self, timeout)
  453. # ===================================================================
  454. # --- epoll() - Linux
  455. # ===================================================================
  456. if hasattr(select, 'epoll'):
  457. class Epoll(_BasePollEpoll):
  458. """epoll() based poller."""
  459. READ = select.EPOLLIN
  460. WRITE = select.EPOLLOUT
  461. _ERROR = select.EPOLLERR | select.EPOLLHUP
  462. _poller = select.epoll
  463. def fileno(self):
  464. """Return epoll() fd."""
  465. return self._poller.fileno()
  466. def close(self):
  467. _IOLoop.close(self)
  468. self._poller.close()
  469. # ===================================================================
  470. # --- kqueue() - BSD / OSX
  471. # ===================================================================
  472. if hasattr(select, 'kqueue'):
  473. class Kqueue(_IOLoop):
  474. """kqueue() based poller."""
  475. def __init__(self):
  476. _IOLoop.__init__(self)
  477. self._kqueue = select.kqueue()
  478. self._active = {}
  479. def fileno(self):
  480. """Return kqueue() fd."""
  481. return self._poller.fileno()
  482. def close(self):
  483. _IOLoop.close(self)
  484. self._kqueue.close()
  485. def register(self, fd, instance, events):
  486. self.socket_map[fd] = instance
  487. self._control(fd, events, select.KQ_EV_ADD)
  488. self._active[fd] = events
  489. def unregister(self, fd):
  490. try:
  491. del self.socket_map[fd]
  492. events = self._active.pop(fd)
  493. except KeyError:
  494. pass
  495. else:
  496. try:
  497. self._control(fd, events, select.KQ_EV_DELETE)
  498. except OSError:
  499. err = sys.exc_info()[1]
  500. if err.errno != errno.EBADF:
  501. raise
  502. def modify(self, fd, events):
  503. instance = self.socket_map[fd]
  504. self.unregister(fd)
  505. self.register(fd, instance, events)
  506. def _control(self, fd, events, flags):
  507. kevents = []
  508. if events & self.WRITE:
  509. kevents.append(select.kevent(
  510. fd, filter=select.KQ_FILTER_WRITE, flags=flags))
  511. if events & self.READ or not kevents:
  512. # always read when there is not a write
  513. kevents.append(select.kevent(
  514. fd, filter=select.KQ_FILTER_READ, flags=flags))
  515. # even though control() takes a list, it seems to return
  516. # EINVAL on Mac OS X (10.6) when there is more than one
  517. # event in the list
  518. for kevent in kevents:
  519. self._kqueue.control([kevent], 0)
  520. # localize variable access to minimize overhead
  521. def poll(self, timeout,
  522. _len=len,
  523. _READ=select.KQ_FILTER_READ,
  524. _WRITE=select.KQ_FILTER_WRITE,
  525. _EOF=select.KQ_EV_EOF,
  526. _ERROR=select.KQ_EV_ERROR):
  527. try:
  528. kevents = self._kqueue.control(None, _len(self.socket_map),
  529. timeout)
  530. except OSError:
  531. err = sys.exc_info()[1]
  532. if err.args[0] == errno.EINTR:
  533. return
  534. raise
  535. for kevent in kevents:
  536. inst = self.socket_map.get(kevent.ident)
  537. if inst is None:
  538. continue
  539. if kevent.filter == _READ:
  540. if inst.readable():
  541. _read(inst)
  542. if kevent.filter == _WRITE:
  543. if kevent.flags & _EOF:
  544. # If an asynchronous connection is refused,
  545. # kqueue returns a write event with the EOF
  546. # flag set.
  547. # Note that for read events, EOF may be returned
  548. # before all data has been consumed from the
  549. # socket buffer, so we only check for EOF on
  550. # write events.
  551. inst.handle_close()
  552. else:
  553. if inst.writable():
  554. _write(inst)
  555. if kevent.flags & _ERROR:
  556. inst.handle_close()
  557. # ===================================================================
  558. # --- choose the better poller for this platform
  559. # ===================================================================
  560. if hasattr(select, 'epoll'): # epoll() - Linux only
  561. IOLoop = Epoll
  562. elif hasattr(select, 'kqueue'): # kqueue() - BSD / OSX
  563. IOLoop = Kqueue
  564. elif hasattr(select, 'poll'): # poll() - POSIX
  565. IOLoop = Poll
  566. else: # select() - POSIX and Windows
  567. IOLoop = Select
  568. # ===================================================================
  569. # --- asyncore dispatchers
  570. # ===================================================================
  571. # these are overridden in order to register() and unregister()
  572. # file descriptors against the new pollers
  573. _DISCONNECTED = frozenset((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN,
  574. errno.ECONNABORTED, errno.EPIPE, errno.EBADF))
  575. class Acceptor(asyncore.dispatcher):
  576. """Same as base asyncore.dispatcher and supposed to be used to
  577. accept new connections.
  578. """
  579. def __init__(self, ioloop=None):
  580. self.ioloop = ioloop or IOLoop.instance()
  581. self._fileno = None # py < 2.6
  582. asyncore.dispatcher.__init__(self)
  583. def bind_af_unspecified(self, addr):
  584. """Same as bind() but guesses address family from addr.
  585. Return the address family just determined.
  586. """
  587. assert self.socket is None
  588. host, port = addr
  589. if host == "":
  590. # When using bind() "" is a symbolic name meaning all
  591. # available interfaces. People might not know we're
  592. # using getaddrinfo() internally, which uses None
  593. # instead of "", so we'll make the conversion for them.
  594. host = None
  595. err = "getaddrinfo() returned an empty list"
  596. info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
  597. socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
  598. for res in info:
  599. self.socket = None
  600. self.del_channel()
  601. af, socktype, proto, canonname, sa = res
  602. try:
  603. self.create_socket(af, socktype)
  604. self.set_reuse_addr()
  605. self.bind(sa)
  606. except socket.error:
  607. err = sys.exc_info()[1]
  608. if self.socket is not None:
  609. self.socket.close()
  610. self.del_channel()
  611. self.socket = None
  612. continue
  613. break
  614. if self.socket is None:
  615. self.del_channel()
  616. raise socket.error(err)
  617. return af
  618. def add_channel(self, map=None):
  619. self.ioloop.register(self._fileno, self, self.ioloop.READ)
  620. def del_channel(self, map=None):
  621. self.ioloop.unregister(self._fileno)
  622. def listen(self, num):
  623. asyncore.dispatcher.listen(self, num)
  624. # XXX - this seems to be necessary, otherwise kqueue.control()
  625. # won't return listening fd events
  626. try:
  627. if isinstance(self.ioloop, Kqueue):
  628. self.ioloop.modify(self._fileno, self.ioloop.READ)
  629. except NameError:
  630. pass
  631. def handle_accept(self):
  632. try:
  633. sock, addr = self.accept()
  634. except TypeError:
  635. # sometimes accept() might return None (see issue 91)
  636. return
  637. except socket.error:
  638. err = sys.exc_info()[1]
  639. # ECONNABORTED might be thrown on *BSD (see issue 105)
  640. if err.args[0] != errno.ECONNABORTED:
  641. raise
  642. else:
  643. # sometimes addr == None instead of (ip, port) (see issue 104)
  644. if addr is not None:
  645. self.handle_accepted(sock, addr)
  646. def handle_accepted(self, sock, addr):
  647. sock.close()
  648. self.log_info('unhandled accepted event', 'warning')
  649. # overridden for convenience; avoid to reuse address on Windows
  650. if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
  651. def set_reuse_addr(self):
  652. pass
  653. class Connector(Acceptor):
  654. """Same as base asyncore.dispatcher and supposed to be used for
  655. clients.
  656. """
  657. def connect_af_unspecified(self, addr, source_address=None):
  658. """Same as connect() but guesses address family from addr.
  659. Return the address family just determined.
  660. """
  661. assert self.socket is None
  662. host, port = addr
  663. err = "getaddrinfo() returned an empty list"
  664. info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
  665. socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
  666. for res in info:
  667. self.socket = None
  668. af, socktype, proto, canonname, sa = res
  669. try:
  670. self.create_socket(af, socktype)
  671. if source_address:
  672. if source_address[0].startswith('::ffff:'):
  673. # In this scenario, the server has an IPv6 socket, but
  674. # the remote client is using IPv4 and its address is
  675. # represented as an IPv4-mapped IPv6 address which
  676. # looks like this ::ffff:151.12.5.65, see:
  677. # http://en.wikipedia.org/wiki/IPv6#IPv4-mapped_addresses
  678. # http://tools.ietf.org/html/rfc3493.html#section-3.7
  679. # We truncate the first bytes to make it look like a
  680. # common IPv4 address.
  681. source_address = (source_address[0][7:],
  682. source_address[1])
  683. self.bind(source_address)
  684. self.connect((host, port))
  685. except socket.error:
  686. err = sys.exc_info()[1]
  687. if self.socket is not None:
  688. self.socket.close()
  689. self.del_channel()
  690. self.socket = None
  691. continue
  692. break
  693. if self.socket is None:
  694. self.del_channel()
  695. raise socket.error(err)
  696. return af
  697. def add_channel(self, map=None):
  698. self.ioloop.register(self._fileno, self, self.ioloop.WRITE)
  699. class AsyncChat(asynchat.async_chat):
  700. """Same as asynchat.async_chat, only working with the new IO poller
  701. and being more clever in avoid registering for read events when
  702. it shouldn't.
  703. """
  704. def __init__(self, sock, ioloop=None):
  705. self.ioloop = ioloop or IOLoop.instance()
  706. self._current_io_events = self.ioloop.READ
  707. self._closed = False
  708. self._closing = False
  709. asynchat.async_chat.__init__(self, sock)
  710. def add_channel(self, map=None, events=None):
  711. self.ioloop.register(self._fileno, self, events or self.ioloop.READ)
  712. def del_channel(self, map=None):
  713. self.ioloop.unregister(self._fileno)
  714. # send() and recv() overridden as a fix around various bugs:
  715. # - http://bugs.python.org/issue1736101
  716. # - http://code.google.com/p/pyftpdlib/issues/detail?id=104
  717. # - http://code.google.com/p/pyftpdlib/issues/detail?id=109
  718. def send(self, data):
  719. try:
  720. return self.socket.send(data)
  721. except socket.error:
  722. why = sys.exc_info()[1]
  723. if why.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
  724. return 0
  725. elif why.args[0] in _DISCONNECTED:
  726. self.handle_close()
  727. return 0
  728. else:
  729. raise
  730. def recv(self, buffer_size):
  731. try:
  732. data = self.socket.recv(buffer_size)
  733. if not data:
  734. # a closed connection is indicated by signaling
  735. # a read condition, and having recv() return 0.
  736. self.handle_close()
  737. return b('')
  738. else:
  739. return data
  740. except socket.error:
  741. why = sys.exc_info()[1]
  742. if why.args[0] in _DISCONNECTED:
  743. self.handle_close()
  744. return b('')
  745. else:
  746. raise
  747. def initiate_send(self):
  748. asynchat.async_chat.initiate_send(self)
  749. if not self._closed:
  750. # if there's still data to send we want to be ready
  751. # for writing, else we're only intereseted in reading
  752. if not self.producer_fifo:
  753. wanted = self.ioloop.READ
  754. else:
  755. wanted = self.ioloop.READ | self.ioloop.WRITE
  756. if self._current_io_events != wanted:
  757. self.ioloop.modify(self._fileno, wanted)
  758. self._current_io_events = wanted
  759. def close_when_done(self):
  760. if len(self.producer_fifo) == 0:
  761. self.handle_close()
  762. else:
  763. self._closing = True
  764. asynchat.async_chat.close_when_done(self)