123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900 |
- #!/usr/bin/env python
- # $Id: ioloop.py 1217 2013-04-18 18:21:44Z g.rodola $
- # ======================================================================
- # Copyright (C) 2007-2013 Giampaolo Rodola' <g.rodola@gmail.com>
- #
- # All Rights Reserved
- #
- # Permission is hereby granted, free of charge, to any person
- # obtaining a copy of this software and associated documentation
- # files (the "Software"), to deal in the Software without
- # restriction, including without limitation the rights to use,
- # copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the
- # Software is furnished to do so, subject to the following
- # conditions:
- #
- # The above copyright notice and this permission notice shall be
- # included in all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
- # OTHER DEALINGS IN THE SOFTWARE.
- #
- # ======================================================================
- """
- A specialized IO loop on top of asyncore adding support for epoll()
- on Linux and kqueue() and OSX/BSD, dramatically increasing performances
- offered by base asyncore module.
- poll() and select() loops are also reimplemented and are an order of
- magnitude faster as they support fd un/registration and modification.
- This module is not supposed to be used directly unless you want to
- include a new dispatcher which runs within the main FTP server loop,
- in which case:
- __________________________________________________________________
- | | |
- | INSTEAD OF | ...USE: |
- |______________________|___________________________________________|
- | | |
- | asyncore.dispacher | Acceptor (for servers) |
- | asyncore.dispacher | Connector (for clients) |
- | asynchat.async_chat | AsyncChat (for a full duplex connection ) |
- | asyncore.loop | FTPServer.server_forever() |
- |______________________|___________________________________________|
- asyncore.dispatcher_with_send is not supported, same for "map" argument
- for asyncore.loop and asyncore.dispatcher and asynchat.async_chat
- constructors.
- Follows a server example:
- import socket
- from pyftpdlib.ioloop import IOLoop, Acceptor, AsyncChat
- class Handler(AsyncChat):
- def __init__(self, sock):
- AsyncChat.__init__(self, sock)
- self.push('200 hello\r\n')
- self.close_when_done()
- class Server(Acceptor):
- def __init__(self, host, port):
- Acceptor.__init__(self)
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.set_reuse_addr()
- self.bind((host, port))
- self.listen(5)
- def handle_accepted(self, sock, addr):
- Handler(sock)
- server = Server('localhost', 8021)
- IOLoop.instance().loop()
- """
- import asyncore
- import asynchat
- import errno
- import select
- import os
- import sys
- import traceback
- import time
- import heapq
- import socket
- import logging
- try:
- import threading
- except ImportError:
- import dummy_threading as threading
- from pyftpdlib._compat import MAXSIZE, callable, b
- from pyftpdlib.log import logger, _config_logging
- timer = getattr(time, 'monotonic', time.time)
- _read = asyncore.read
- _write = asyncore.write
- # ===================================================================
- # --- scheduler
- # ===================================================================
- class _Scheduler(object):
- """Run the scheduled functions due to expire soonest (if any)."""
- def __init__(self):
- # the heap used for the scheduled tasks
- self._tasks = []
- self._cancellations = 0
- def poll(self):
- """Run the scheduled functions due to expire soonest and
- return the timeout of the next one (if any, else None).
- """
- now = timer()
- calls = []
- while self._tasks:
- if now < self._tasks[0].timeout:
- break
- call = heapq.heappop(self._tasks)
- if call.cancelled:
- self._cancellations -= 1
- else:
- calls.append(call)
- for call in calls:
- if call._repush:
- heapq.heappush(self._tasks, call)
- call._repush = False
- continue
- try:
- call.call()
- except Exception:
- logger.error(traceback.format_exc())
- # remove cancelled tasks and re-heapify the queue if the
- # number of cancelled tasks is more than the half of the
- # entire queue
- if self._cancellations > 512 \
- and self._cancellations > (len(self._tasks) >> 1):
- self.reheapify()
- try:
- return max(0, self._tasks[0].timeout - now)
- except IndexError:
- pass
- def register(self, what):
- """Register a _CallLater instance."""
- heapq.heappush(self._tasks, what)
- def unregister(self, what):
- """Unregister a _CallLater instance.
- The actual unregistration will happen at a later time though.
- """
- self._cancellations += 1
- def reheapify(self):
- """Get rid of cancelled calls and reinitialize the internal heap."""
- self._cancellations = 0
- self._tasks = [x for x in self._tasks if not x.cancelled]
- heapq.heapify(self._tasks)
- class _CallLater(object):
- """Container object which instance is returned by ioloop.call_later()."""
- __slots__ = ('_delay', '_target', '_args', '_kwargs', '_errback', '_sched',
- '_repush', 'timeout', 'cancelled')
- def __init__(self, seconds, target, *args, **kwargs):
- assert callable(target), "%s is not callable" % target
- assert MAXSIZE >= seconds >= 0, "%s is not greater than or equal " \
- "to 0 seconds" % seconds
- self._delay = seconds
- self._target = target
- self._args = args
- self._kwargs = kwargs
- self._errback = kwargs.pop('_errback', None)
- self._sched = kwargs.pop('_scheduler')
- self._repush = False
- # seconds from the epoch at which to call the function
- if not seconds:
- self.timeout = 0
- else:
- self.timeout = timer() + self._delay
- self.cancelled = False
- self._sched.register(self)
- def __lt__(self, other):
- return self.timeout < other.timeout
- def __le__(self, other):
- return self.timeout <= other.timeout
- def __repr__(self):
- if self._target is None:
- sig = object.__repr__(self)
- else:
- sig = repr(self._target)
- sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' \
- % (self._args or '[]', self._kwargs or '{}', self.cancelled,
- self._delay)
- return '<%s>' % sig
- __str__ = __repr__
- def _post_call(self, exc):
- if not self.cancelled:
- self.cancel()
- def call(self):
- """Call this scheduled function."""
- assert not self.cancelled, "already cancelled"
- exc = None
- try:
- try:
- self._target(*self._args, **self._kwargs)
- except Exception:
- exc = sys.exc_info()[1]
- if self._errback is not None:
- self._errback()
- else:
- raise
- finally:
- self._post_call(exc)
- def reset(self):
- """Reschedule this call resetting the current countdown."""
- assert not self.cancelled, "already cancelled"
- self.timeout = timer() + self._delay
- self._repush = True
- def cancel(self):
- """Unschedule this call."""
- assert not self.cancelled, "already cancelled"
- self.cancelled = True
- self._target = self._args = self._kwargs = self._errback = None
- self._sched.unregister(self)
- class _CallEvery(_CallLater):
- """Container object which instance is returned by IOLoop.call_every()."""
- def _post_call(self, exc):
- if not self.cancelled:
- if exc:
- self.cancel()
- else:
- self.timeout = timer() + self._delay
- self._sched.register(self)
- class _IOLoop(object):
- """Base class which will later be referred as IOLoop."""
- READ = 1
- WRITE = 2
- _instance = None
- _lock = threading.Lock()
- _started_once = False
- def __init__(self):
- self.socket_map = {}
- self.sched = _Scheduler()
- @classmethod
- def instance(cls):
- """Return a global IOLoop instance."""
- if cls._instance is None:
- cls._lock.acquire()
- try:
- if cls._instance is None:
- cls._instance = cls()
- finally:
- cls._lock.release()
- return cls._instance
- def register(self, fd, instance, events):
- """Register a fd, handled by instance for the given events."""
- raise NotImplementedError('must be implemented in subclass')
- def unregister(self, fd):
- """Register fd."""
- raise NotImplementedError('must be implemented in subclass')
- def modify(self, fd, events):
- """Changes the events assigned for fd."""
- raise NotImplementedError('must be implemented in subclass')
- def poll(self, timeout):
- """Poll once. The subclass overriding this method is supposed
- to poll over the registered handlers and the scheduled functions
- and then return.
- """
- raise NotImplementedError('must be implemented in subclass')
- def loop(self, timeout=None, blocking=True):
- """Start the asynchronous IO loop.
- - (float) timeout: the timeout passed to the underlying
- multiplex syscall (select(), epoll() etc.).
- - (bool) blocking: if True poll repeatedly, as long as there
- are registered handlers and/or scheduled functions.
- If False poll only once and return the timeout of the next
- scheduled call (if any, else None).
- """
- if not _IOLoop._started_once:
- _IOLoop._started_once = True
- if not logging.getLogger().handlers:
- # If we get to this point it means the user hasn't
- # configured logging. We want to log by default so
- # we configure logging ourselves so that it will
- # print to stderr.
- _config_logging()
- if blocking:
- # localize variable access to minimize overhead
- poll = self.poll
- socket_map = self.socket_map
- tasks = self.sched._tasks
- sched_poll = self.sched.poll
- if timeout is not None:
- while socket_map:
- poll(timeout)
- sched_poll()
- else:
- soonest_timeout = None
- while socket_map:
- poll(soonest_timeout)
- soonest_timeout = sched_poll()
- else:
- sched = self.sched
- if self.socket_map:
- self.poll(timeout)
- if sched._tasks:
- return sched.poll()
- def call_later(self, seconds, target, *args, **kwargs):
- """Calls a function at a later time.
- It can be used to asynchronously schedule a call within the polling
- loop without blocking it. The instance returned is an object that
- can be used to cancel or reschedule the call.
- - (int) seconds: the number of seconds to wait
- - (obj) target: the callable object to call later
- - args: the arguments to call it with
- - kwargs: the keyword arguments to call it with; a special
- '_errback' parameter can be passed: it is a callable
- called in case target function raises an exception.
- """
- kwargs['_scheduler'] = self.sched
- return _CallLater(seconds, target, *args, **kwargs)
- def call_every(self, seconds, target, *args, **kwargs):
- """Schedules the given callback to be called periodically."""
- kwargs['_scheduler'] = self.sched
- return _CallEvery(seconds, target, *args, **kwargs)
- def close(self):
- """Closes the IOLoop, freeing any resources used."""
- self.__class__._instance = None
- # free connections
- instances = sorted(self.socket_map.values(), key=lambda x: x._fileno)
- for inst in instances:
- try:
- inst.close()
- except OSError:
- err = sys.exc_info()[1]
- if err.args[0] != errno.EBADF:
- logger.error(traceback.format_exc())
- except Exception:
- logger.error(traceback.format_exc())
- self.socket_map.clear()
- # free scheduled functions
- for x in self.sched._tasks:
- try:
- if not x.cancelled:
- x.cancel()
- except Exception:
- logger.error(traceback.format_exc())
- del self.sched._tasks[:]
- # ===================================================================
- # --- select() - POSIX / Windows
- # ===================================================================
- class Select(_IOLoop):
- """select()-based poller."""
- def __init__(self):
- _IOLoop.__init__(self)
- self._r = []
- self._w = []
- def register(self, fd, instance, events):
- if fd not in self.socket_map:
- self.socket_map[fd] = instance
- if events & self.READ:
- self._r.append(fd)
- if events & self.WRITE:
- self._w.append(fd)
- def unregister(self, fd):
- try:
- del self.socket_map[fd]
- except KeyError:
- pass
- for l in (self._r, self._w):
- try:
- l.remove(fd)
- except ValueError:
- pass
- def modify(self, fd, events):
- inst = self.socket_map.get(fd)
- if inst is not None:
- self.unregister(fd)
- self.register(fd, inst, events)
- def poll(self, timeout):
- try:
- r, w, e = select.select(self._r, self._w, [], timeout)
- except select.error:
- err = sys.exc_info()[1]
- if err.args[0] == errno.EINTR:
- return
- raise
- smap_get = self.socket_map.get
- for fd in r:
- obj = smap_get(fd)
- if obj is None or not obj.readable():
- continue
- _read(obj)
- for fd in w:
- obj = smap_get(fd)
- if obj is None or not obj.writable():
- continue
- _write(obj)
- # ===================================================================
- # --- poll() / epoll()
- # ===================================================================
- class _BasePollEpoll(_IOLoop):
- """This is common to both poll/epoll implementations which
- almost share the same interface.
- Not supposed to be used directly.
- """
- def __init__(self):
- _IOLoop.__init__(self)
- self._poller = self._poller()
- def register(self, fd, instance, events):
- self._poller.register(fd, events)
- self.socket_map[fd] = instance
- def unregister(self, fd):
- try:
- del self.socket_map[fd]
- except KeyError:
- pass
- else:
- self._poller.unregister(fd)
- def modify(self, fd, events):
- self._poller.modify(fd, events)
- def poll(self, timeout):
- try:
- events = self._poller.poll(timeout or -1) # -1 waits indefinitely
- except (IOError, select.error): # for epoll() and poll() respectively
- err = sys.exc_info()[1]
- if err.args[0] == errno.EINTR:
- return
- raise
- # localize variable access to minimize overhead
- smap_get = self.socket_map.get
- for fd, event in events:
- inst = smap_get(fd)
- if inst is None:
- continue
- if event & self._ERROR and not event & self.READ:
- inst.handle_close()
- else:
- if event & self.READ:
- if inst.readable():
- _read(inst)
- if event & self.WRITE:
- if inst.writable():
- _write(inst)
- # ===================================================================
- # --- poll() - POSIX
- # ===================================================================
- if hasattr(select, 'poll'):
- class Poll(_BasePollEpoll):
- """poll() based poller."""
- READ = select.POLLIN
- WRITE = select.POLLOUT
- _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL
- _poller = select.poll
- # select.poll() on py < 2.6 has no 'modify' method
- if not hasattr(select.poll(), 'modify'):
- def modify(self, fd, events):
- inst = self.socket_map[fd]
- self.unregister(fd)
- self.register(fd, inst, events)
- def poll(self, timeout):
- # poll() timeout is expressed in milliseconds
- if timeout is not None:
- timeout = int(timeout * 1000)
- _BasePollEpoll.poll(self, timeout)
- # ===================================================================
- # --- epoll() - Linux
- # ===================================================================
- if hasattr(select, 'epoll'):
- class Epoll(_BasePollEpoll):
- """epoll() based poller."""
- READ = select.EPOLLIN
- WRITE = select.EPOLLOUT
- _ERROR = select.EPOLLERR | select.EPOLLHUP
- _poller = select.epoll
- def fileno(self):
- """Return epoll() fd."""
- return self._poller.fileno()
- def close(self):
- _IOLoop.close(self)
- self._poller.close()
- # ===================================================================
- # --- kqueue() - BSD / OSX
- # ===================================================================
- if hasattr(select, 'kqueue'):
- class Kqueue(_IOLoop):
- """kqueue() based poller."""
- def __init__(self):
- _IOLoop.__init__(self)
- self._kqueue = select.kqueue()
- self._active = {}
- def fileno(self):
- """Return kqueue() fd."""
- return self._poller.fileno()
- def close(self):
- _IOLoop.close(self)
- self._kqueue.close()
- def register(self, fd, instance, events):
- self.socket_map[fd] = instance
- self._control(fd, events, select.KQ_EV_ADD)
- self._active[fd] = events
- def unregister(self, fd):
- try:
- del self.socket_map[fd]
- events = self._active.pop(fd)
- except KeyError:
- pass
- else:
- try:
- self._control(fd, events, select.KQ_EV_DELETE)
- except OSError:
- err = sys.exc_info()[1]
- if err.errno != errno.EBADF:
- raise
- def modify(self, fd, events):
- instance = self.socket_map[fd]
- self.unregister(fd)
- self.register(fd, instance, events)
- def _control(self, fd, events, flags):
- kevents = []
- if events & self.WRITE:
- kevents.append(select.kevent(
- fd, filter=select.KQ_FILTER_WRITE, flags=flags))
- if events & self.READ or not kevents:
- # always read when there is not a write
- kevents.append(select.kevent(
- fd, filter=select.KQ_FILTER_READ, flags=flags))
- # even though control() takes a list, it seems to return
- # EINVAL on Mac OS X (10.6) when there is more than one
- # event in the list
- for kevent in kevents:
- self._kqueue.control([kevent], 0)
- # localize variable access to minimize overhead
- def poll(self, timeout,
- _len=len,
- _READ=select.KQ_FILTER_READ,
- _WRITE=select.KQ_FILTER_WRITE,
- _EOF=select.KQ_EV_EOF,
- _ERROR=select.KQ_EV_ERROR):
- try:
- kevents = self._kqueue.control(None, _len(self.socket_map),
- timeout)
- except OSError:
- err = sys.exc_info()[1]
- if err.args[0] == errno.EINTR:
- return
- raise
- for kevent in kevents:
- inst = self.socket_map.get(kevent.ident)
- if inst is None:
- continue
- if kevent.filter == _READ:
- if inst.readable():
- _read(inst)
- if kevent.filter == _WRITE:
- if kevent.flags & _EOF:
- # If an asynchronous connection is refused,
- # kqueue returns a write event with the EOF
- # flag set.
- # Note that for read events, EOF may be returned
- # before all data has been consumed from the
- # socket buffer, so we only check for EOF on
- # write events.
- inst.handle_close()
- else:
- if inst.writable():
- _write(inst)
- if kevent.flags & _ERROR:
- inst.handle_close()
- # ===================================================================
- # --- choose the better poller for this platform
- # ===================================================================
- if hasattr(select, 'epoll'): # epoll() - Linux only
- IOLoop = Epoll
- elif hasattr(select, 'kqueue'): # kqueue() - BSD / OSX
- IOLoop = Kqueue
- elif hasattr(select, 'poll'): # poll() - POSIX
- IOLoop = Poll
- else: # select() - POSIX and Windows
- IOLoop = Select
- # ===================================================================
- # --- asyncore dispatchers
- # ===================================================================
- # these are overridden in order to register() and unregister()
- # file descriptors against the new pollers
- _DISCONNECTED = frozenset((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN,
- errno.ECONNABORTED, errno.EPIPE, errno.EBADF))
- class Acceptor(asyncore.dispatcher):
- """Same as base asyncore.dispatcher and supposed to be used to
- accept new connections.
- """
- def __init__(self, ioloop=None):
- self.ioloop = ioloop or IOLoop.instance()
- self._fileno = None # py < 2.6
- asyncore.dispatcher.__init__(self)
- def bind_af_unspecified(self, addr):
- """Same as bind() but guesses address family from addr.
- Return the address family just determined.
- """
- assert self.socket is None
- host, port = addr
- if host == "":
- # When using bind() "" is a symbolic name meaning all
- # available interfaces. People might not know we're
- # using getaddrinfo() internally, which uses None
- # instead of "", so we'll make the conversion for them.
- host = None
- err = "getaddrinfo() returned an empty list"
- info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
- socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
- for res in info:
- self.socket = None
- self.del_channel()
- af, socktype, proto, canonname, sa = res
- try:
- self.create_socket(af, socktype)
- self.set_reuse_addr()
- self.bind(sa)
- except socket.error:
- err = sys.exc_info()[1]
- if self.socket is not None:
- self.socket.close()
- self.del_channel()
- self.socket = None
- continue
- break
- if self.socket is None:
- self.del_channel()
- raise socket.error(err)
- return af
- def add_channel(self, map=None):
- self.ioloop.register(self._fileno, self, self.ioloop.READ)
- def del_channel(self, map=None):
- self.ioloop.unregister(self._fileno)
- def listen(self, num):
- asyncore.dispatcher.listen(self, num)
- # XXX - this seems to be necessary, otherwise kqueue.control()
- # won't return listening fd events
- try:
- if isinstance(self.ioloop, Kqueue):
- self.ioloop.modify(self._fileno, self.ioloop.READ)
- except NameError:
- pass
- def handle_accept(self):
- try:
- sock, addr = self.accept()
- except TypeError:
- # sometimes accept() might return None (see issue 91)
- return
- except socket.error:
- err = sys.exc_info()[1]
- # ECONNABORTED might be thrown on *BSD (see issue 105)
- if err.args[0] != errno.ECONNABORTED:
- raise
- else:
- # sometimes addr == None instead of (ip, port) (see issue 104)
- if addr is not None:
- self.handle_accepted(sock, addr)
- def handle_accepted(self, sock, addr):
- sock.close()
- self.log_info('unhandled accepted event', 'warning')
- # overridden for convenience; avoid to reuse address on Windows
- if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
- def set_reuse_addr(self):
- pass
- class Connector(Acceptor):
- """Same as base asyncore.dispatcher and supposed to be used for
- clients.
- """
- def connect_af_unspecified(self, addr, source_address=None):
- """Same as connect() but guesses address family from addr.
- Return the address family just determined.
- """
- assert self.socket is None
- host, port = addr
- err = "getaddrinfo() returned an empty list"
- info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
- socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
- for res in info:
- self.socket = None
- af, socktype, proto, canonname, sa = res
- try:
- self.create_socket(af, socktype)
- if source_address:
- if source_address[0].startswith('::ffff:'):
- # In this scenario, the server has an IPv6 socket, but
- # the remote client is using IPv4 and its address is
- # represented as an IPv4-mapped IPv6 address which
- # looks like this ::ffff:151.12.5.65, see:
- # http://en.wikipedia.org/wiki/IPv6#IPv4-mapped_addresses
- # http://tools.ietf.org/html/rfc3493.html#section-3.7
- # We truncate the first bytes to make it look like a
- # common IPv4 address.
- source_address = (source_address[0][7:],
- source_address[1])
- self.bind(source_address)
- self.connect((host, port))
- except socket.error:
- err = sys.exc_info()[1]
- if self.socket is not None:
- self.socket.close()
- self.del_channel()
- self.socket = None
- continue
- break
- if self.socket is None:
- self.del_channel()
- raise socket.error(err)
- return af
- def add_channel(self, map=None):
- self.ioloop.register(self._fileno, self, self.ioloop.WRITE)
- class AsyncChat(asynchat.async_chat):
- """Same as asynchat.async_chat, only working with the new IO poller
- and being more clever in avoid registering for read events when
- it shouldn't.
- """
- def __init__(self, sock, ioloop=None):
- self.ioloop = ioloop or IOLoop.instance()
- self._current_io_events = self.ioloop.READ
- self._closed = False
- self._closing = False
- asynchat.async_chat.__init__(self, sock)
- def add_channel(self, map=None, events=None):
- self.ioloop.register(self._fileno, self, events or self.ioloop.READ)
- def del_channel(self, map=None):
- self.ioloop.unregister(self._fileno)
- # send() and recv() overridden as a fix around various bugs:
- # - http://bugs.python.org/issue1736101
- # - http://code.google.com/p/pyftpdlib/issues/detail?id=104
- # - http://code.google.com/p/pyftpdlib/issues/detail?id=109
- def send(self, data):
- try:
- return self.socket.send(data)
- except socket.error:
- why = sys.exc_info()[1]
- if why.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
- return 0
- elif why.args[0] in _DISCONNECTED:
- self.handle_close()
- return 0
- else:
- raise
- def recv(self, buffer_size):
- try:
- data = self.socket.recv(buffer_size)
- if not data:
- # a closed connection is indicated by signaling
- # a read condition, and having recv() return 0.
- self.handle_close()
- return b('')
- else:
- return data
- except socket.error:
- why = sys.exc_info()[1]
- if why.args[0] in _DISCONNECTED:
- self.handle_close()
- return b('')
- else:
- raise
- def initiate_send(self):
- asynchat.async_chat.initiate_send(self)
- if not self._closed:
- # if there's still data to send we want to be ready
- # for writing, else we're only intereseted in reading
- if not self.producer_fifo:
- wanted = self.ioloop.READ
- else:
- wanted = self.ioloop.READ | self.ioloop.WRITE
- if self._current_io_events != wanted:
- self.ioloop.modify(self._fileno, wanted)
- self._current_io_events = wanted
- def close_when_done(self):
- if len(self.producer_fifo) == 0:
- self.handle_close()
- else:
- self._closing = True
- asynchat.async_chat.close_when_done(self)
|