123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897 |
- #!/usr/bin/env python
- # $Id: authorizers.py 1171 2013-02-19 10:13:09Z g.rodola $
- # ======================================================================
- # Copyright (C) 2007-2013 Giampaolo Rodola' <g.rodola@gmail.com>
- #
- # All Rights Reserved
- #
- # Permission is hereby granted, free of charge, to any person
- # obtaining a copy of this software and associated documentation
- # files (the "Software"), to deal in the Software without
- # restriction, including without limitation the rights to use,
- # copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the
- # Software is furnished to do so, subject to the following
- # conditions:
- #
- # The above copyright notice and this permission notice shall be
- # included in all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
- # OTHER DEALINGS IN THE SOFTWARE.
- #
- # ======================================================================
- """An "authorizer" is a class handling authentications and permissions
- of the FTP server. It is used by pyftpdlib.handlers.FTPHandler
- class for:
- - verifying user password
- - getting user home directory
- - checking user permissions when a filesystem read/write event occurs
- - changing user when accessing the filesystem
- DummyAuthorizer is the main class which handles virtual users.
- UnixAuthorizer and WindowsAuthorizer are platform specific and
- interact with UNIX and Windows password database.
- """
- import os
- import warnings
- import errno
- import sys
- from pyftpdlib._compat import PY3, unicode, getcwdu
- __all__ = ['DummyAuthorizer',
- #'BaseUnixAuthorizer', 'UnixAuthorizer',
- #'BaseWindowsAuthorizer', 'WindowsAuthorizer',
- ]
- # ===================================================================
- # --- exceptions
- # ===================================================================
- class AuthorizerError(Exception):
- """Base class for authorizer exceptions."""
- class AuthenticationFailed(Exception):
- """Exception raised when authentication fails for any reason."""
- # ===================================================================
- # --- base class
- # ===================================================================
- class DummyAuthorizer(object):
- """Basic "dummy" authorizer class, suitable for subclassing to
- create your own custom authorizers.
- An "authorizer" is a class handling authentications and permissions
- of the FTP server. It is used inside FTPHandler class for verifying
- user's password, getting users home directory, checking user
- permissions when a file read/write event occurs and changing user
- before accessing the filesystem.
- DummyAuthorizer is the base authorizer, providing a platform
- independent interface for managing "virtual" FTP users. System
- dependent authorizers can by written by subclassing this base
- class and overriding appropriate methods as necessary.
- """
- read_perms = "elr"
- write_perms = "adfmwM"
- def __init__(self):
- self.user_table = {}
- def add_user(self, username, password, homedir, perm='elr',
- msg_login="Login successful.", msg_quit="Goodbye."):
- """Add a user to the virtual users table.
- AuthorizerError exceptions raised on error conditions such as
- invalid permissions, missing home directory or duplicate usernames.
- Optional perm argument is a string referencing the user's
- permissions explained below:
- Read permissions:
- - "e" = change directory (CWD command)
- - "l" = list files (LIST, NLST, STAT, MLSD, MLST, SIZE, MDTM commands)
- - "r" = retrieve file from the server (RETR command)
- Write permissions:
- - "a" = append data to an existing file (APPE command)
- - "d" = delete file or directory (DELE, RMD commands)
- - "f" = rename file or directory (RNFR, RNTO commands)
- - "m" = create directory (MKD command)
- - "w" = store a file to the server (STOR, STOU commands)
- - "M" = change file mode (SITE CHMOD command)
- Optional msg_login and msg_quit arguments can be specified to
- provide customized response strings when user log-in and quit.
- """
- if self.has_user(username):
- raise ValueError('user %r already exists' % username)
- if not isinstance(homedir, unicode):
- homedir = homedir.decode('utf8')
- if not os.path.isdir(homedir):
- raise ValueError('no such directory: %r' % homedir)
- homedir = os.path.realpath(homedir)
- self._check_permissions(username, perm)
- dic = {'pwd': str(password),
- 'home': homedir,
- 'perm': perm,
- 'operms': {},
- 'msg_login': str(msg_login),
- 'msg_quit': str(msg_quit)
- }
- self.user_table[username] = dic
- def add_anonymous(self, homedir, **kwargs):
- """Add an anonymous user to the virtual users table.
- AuthorizerError exception raised on error conditions such as
- invalid permissions, missing home directory, or duplicate
- anonymous users.
- The keyword arguments in kwargs are the same expected by
- add_user method: "perm", "msg_login" and "msg_quit".
- The optional "perm" keyword argument is a string defaulting to
- "elr" referencing "read-only" anonymous user's permissions.
- Using write permission values ("adfmwM") results in a
- RuntimeWarning.
- """
- DummyAuthorizer.add_user(self, 'anonymous', '', homedir, **kwargs)
- def remove_user(self, username):
- """Remove a user from the virtual users table."""
- del self.user_table[username]
- def override_perm(self, username, directory, perm, recursive=False):
- """Override permissions for a given directory."""
- self._check_permissions(username, perm)
- if not os.path.isdir(directory):
- raise ValueError('no such directory: %r' % directory)
- directory = os.path.normcase(os.path.realpath(directory))
- home = os.path.normcase(self.get_home_dir(username))
- if directory == home:
- raise ValueError("can't override home directory permissions")
- if not self._issubpath(directory, home):
- raise ValueError("path escapes user home directory")
- self.user_table[username]['operms'][directory] = perm, recursive
- def validate_authentication(self, username, password, handler):
- """Raises AuthenticationFailed if supplied username and
- password don't match the stored credentials, else return
- None.
- """
- msg = "Authentication failed."
- if not self.has_user(username):
- if username == 'anonymous':
- msg = "Anonymous access not allowed."
- raise AuthenticationFailed(msg)
- if username != 'anonymous':
- if self.user_table[username]['pwd'] != password:
- raise AuthenticationFailed(msg)
- def get_home_dir(self, username):
- """Return the user's home directory.
- Since this is called during authentication (PASS),
- AuthenticationFailed can be freely raised by subclasses in case
- the provided username no longer exists.
- """
- return self.user_table[username]['home']
- def impersonate_user(self, username, password):
- """Impersonate another user (noop).
- It is always called before accessing the filesystem.
- By default it does nothing. The subclass overriding this
- method is expected to provide a mechanism to change the
- current user.
- """
- def terminate_impersonation(self, username):
- """Terminate impersonation (noop).
- It is always called after having accessed the filesystem.
- By default it does nothing. The subclass overriding this
- method is expected to provide a mechanism to switch back
- to the original user.
- """
- def has_user(self, username):
- """Whether the username exists in the virtual users table."""
- return username in self.user_table
- def has_perm(self, username, perm, path=None):
- """Whether the user has permission over path (an absolute
- pathname of a file or a directory).
- Expected perm argument is one of the following letters:
- "elradfmwM".
- """
- if path is None:
- return perm in self.user_table[username]['perm']
- path = os.path.normcase(path)
- for dir in self.user_table[username]['operms'].keys():
- operm, recursive = self.user_table[username]['operms'][dir]
- if self._issubpath(path, dir):
- if recursive:
- return perm in operm
- if (path == dir) or (os.path.dirname(path) == dir \
- and not os.path.isdir(path)):
- return perm in operm
- return perm in self.user_table[username]['perm']
- def get_perms(self, username):
- """Return current user permissions."""
- return self.user_table[username]['perm']
- def get_msg_login(self, username):
- """Return the user's login message."""
- return self.user_table[username]['msg_login']
- def get_msg_quit(self, username):
- """Return the user's quitting message."""
- return self.user_table[username]['msg_quit']
- def _check_permissions(self, username, perm):
- warned = 0
- for p in perm:
- if p not in self.read_perms + self.write_perms:
- raise ValueError('no such permission %r' % p)
- if (username == 'anonymous') and (p in self.write_perms) and not warned:
- warnings.warn("write permissions assigned to anonymous user.",
- RuntimeWarning)
- warned = 1
- def _issubpath(self, a, b):
- """Return True if a is a sub-path of b or if the paths are equal."""
- p1 = a.rstrip(os.sep).split(os.sep)
- p2 = b.rstrip(os.sep).split(os.sep)
- return p1[:len(p2)] == p2
- def replace_anonymous(callable):
- """A decorator to replace anonymous user string passed to authorizer
- methods as first argument with the actual user used to handle
- anonymous sessions.
- """
- def wrapper(self, username, *args, **kwargs):
- if username == 'anonymous':
- username = self.anonymous_user or username
- return callable(self, username, *args, **kwargs)
- return wrapper
- # ===================================================================
- # --- platform specific authorizers
- # ===================================================================
- class _Base(object):
- """Methods common to both Unix and Windows authorizers.
- Not supposed to be used directly.
- """
- msg_no_such_user = "Authentication failed."
- msg_wrong_password = "Authentication failed."
- msg_anon_not_allowed = "Anonymous access not allowed."
- msg_invalid_shell = "User %s doesn't have a valid shell."
- msg_rejected_user = "User %s is not allowed to login."
- def __init__(self):
- """Check for errors in the constructor."""
- if self.rejected_users and self.allowed_users:
- raise AuthorizerError("rejected_users and allowed_users options are "
- "mutually exclusive")
- users = self._get_system_users()
- for user in (self.allowed_users or self.rejected_users):
- if user == 'anonymous':
- raise AuthorizerError('invalid username "anonymous"')
- if user not in users:
- raise AuthorizerError('unknown user %s' % user)
- if self.anonymous_user is not None:
- if not self.has_user(self.anonymous_user):
- raise AuthorizerError('no such user %s' % self.anonymous_user)
- home = self.get_home_dir(self.anonymous_user)
- if not os.path.isdir(home):
- raise AuthorizerError('no valid home set for user %s'
- % self.anonymous_user)
- def override_user(self, username, password=None, homedir=None, perm=None,
- msg_login=None, msg_quit=None):
- """Overrides the options specified in the class constructor
- for a specific user.
- """
- if not password and not homedir and not perm and not msg_login \
- and not msg_quit:
- raise AuthorizerError("at least one keyword argument must be specified")
- if self.allowed_users and username not in self.allowed_users:
- raise AuthorizerError('%s is not an allowed user' % username)
- if self.rejected_users and username in self.rejected_users:
- raise AuthorizerError('%s is not an allowed user' % username)
- if username == "anonymous" and password:
- raise AuthorizerError("can't assign password to anonymous user")
- if not self.has_user(username):
- raise AuthorizerError('no such user %s' % username)
- if homedir is not None and not isinstance(homedir, unicode):
- homedir = homedir.decode('utf8')
- if username in self._dummy_authorizer.user_table:
- # re-set parameters
- del self._dummy_authorizer.user_table[username]
- self._dummy_authorizer.add_user(username, password or "",
- homedir or getcwdu(),
- perm or "",
- msg_login or "",
- msg_quit or "")
- if homedir is None:
- self._dummy_authorizer.user_table[username]['home'] = ""
- def get_msg_login(self, username):
- return self._get_key(username, 'msg_login') or self.msg_login
- def get_msg_quit(self, username):
- return self._get_key(username, 'msg_quit') or self.msg_quit
- def get_perms(self, username):
- overridden_perms = self._get_key(username, 'perm')
- if overridden_perms:
- return overridden_perms
- if username == 'anonymous':
- return 'elr'
- return self.global_perm
- def has_perm(self, username, perm, path=None):
- return perm in self.get_perms(username)
- def _get_key(self, username, key):
- if self._dummy_authorizer.has_user(username):
- return self._dummy_authorizer.user_table[username][key]
- def _is_rejected_user(self, username):
- """Return True if the user has been black listed via
- allowed_users or rejected_users options.
- """
- if self.allowed_users and username not in self.allowed_users:
- return True
- if self.rejected_users and username in self.rejected_users:
- return True
- return False
- # ===================================================================
- # --- UNIX
- # ===================================================================
- # Note: requires python >= 2.5
- try:
- import pwd, spwd, crypt
- except ImportError:
- pass
- else:
- __all__.extend(['BaseUnixAuthorizer', 'UnixAuthorizer'])
- # the uid/gid the server runs under
- PROCESS_UID = os.getuid()
- PROCESS_GID = os.getgid()
- class BaseUnixAuthorizer(object):
- """An authorizer compatible with Unix user account and password
- database.
- This class should not be used directly unless for subclassing.
- Use higher-level UnixAuthorizer class instead.
- """
- def __init__(self, anonymous_user=None):
- if os.geteuid() != 0 or not spwd.getspall():
- raise AuthorizerError("super user privileges are required")
- self.anonymous_user = anonymous_user
- if self.anonymous_user is not None:
- try:
- pwd.getpwnam(self.anonymous_user).pw_dir
- except KeyError:
- raise AuthorizerError('no such user %s' % anonymous_user)
- # --- overridden / private API
- def validate_authentication(self, username, password, handler):
- """Authenticates against shadow password db; raises
- AuthenticationFailed in case of failed authentication.
- """
- if username == "anonymous":
- if self.anonymous_user is None:
- raise AuthenticationFailed(self.msg_anon_not_allowed)
- else:
- try:
- pw1 = spwd.getspnam(username).sp_pwd
- pw2 = crypt.crypt(password, pw1)
- except KeyError: # no such username
- raise AuthenticationFailed(self.msg_no_such_user)
- else:
- if pw1 != pw2:
- raise AuthenticationFailed(self.msg_wrong_password)
- @replace_anonymous
- def impersonate_user(self, username, password):
- """Change process effective user/group ids to reflect
- logged in user.
- """
- try:
- pwdstruct = pwd.getpwnam(username)
- except KeyError:
- raise AuthorizerError(self.msg_no_such_user)
- else:
- os.setegid(pwdstruct.pw_gid)
- os.seteuid(pwdstruct.pw_uid)
- def terminate_impersonation(self, username):
- """Revert process effective user/group IDs."""
- os.setegid(PROCESS_GID)
- os.seteuid(PROCESS_UID)
- @replace_anonymous
- def has_user(self, username):
- """Return True if user exists on the Unix system.
- If the user has been black listed via allowed_users or
- rejected_users options always return False.
- """
- return username in self._get_system_users()
- @replace_anonymous
- def get_home_dir(self, username):
- """Return user home directory."""
- try:
- home = pwd.getpwnam(username).pw_dir
- except KeyError:
- raise AuthorizerError(self.msg_no_such_user)
- else:
- if not PY3:
- home = home.decode('utf8')
- return home
- @staticmethod
- def _get_system_users():
- """Return all users defined on the UNIX system."""
- # there should be no need to convert usernames to unicode
- # as UNIX does not allow chars outside of ASCII set
- return [entry.pw_name for entry in pwd.getpwall()]
- def get_msg_login(self, username):
- return "Login successful."
- def get_msg_quit(self, username):
- return "Goodbye."
- def get_perms(self, username):
- return "elradfmw"
- def has_perm(self, username, perm, path=None):
- return perm in self.get_perms(username)
- class UnixAuthorizer(_Base, BaseUnixAuthorizer):
- """A wrapper on top of BaseUnixAuthorizer providing options
- to specify what users should be allowed to login, per-user
- options, etc.
- Example usages:
- >>> from pyftpdlib.contrib.authorizers import UnixAuthorizer
- >>> # accept all except root
- >>> auth = UnixAuthorizer(rejected_users=["root"])
- >>>
- >>> # accept some users only
- >>> auth = UnixAuthorizer(allowed_users=["matt", "jay"])
- >>>
- >>> # accept everybody and don't care if they have not a valid shell
- >>> auth = UnixAuthorizer(require_valid_shell=False)
- >>>
- >>> # set specific options for a user
- >>> auth.override_user("matt", password="foo", perm="elr")
- """
- # --- public API
- def __init__(self, global_perm="elradfmw",
- allowed_users=None,
- rejected_users=None,
- require_valid_shell=True,
- anonymous_user=None,
- msg_login="Login successful.",
- msg_quit="Goodbye."):
- """Parameters:
- - (string) global_perm:
- a series of letters referencing the users permissions;
- defaults to "elradfmw" which means full read and write
- access for everybody (except anonymous).
- - (list) allowed_users:
- a list of users which are accepted for authenticating
- against the FTP server; defaults to [] (no restrictions).
- - (list) rejected_users:
- a list of users which are not accepted for authenticating
- against the FTP server; defaults to [] (no restrictions).
- - (bool) require_valid_shell:
- Deny access for those users which do not have a valid shell
- binary listed in /etc/shells.
- If /etc/shells cannot be found this is a no-op.
- Anonymous user is not subject to this option, and is free
- to not have a valid shell defined.
- Defaults to True (a valid shell is required for login).
- - (string) anonymous_user:
- specify it if you intend to provide anonymous access.
- The value expected is a string representing the system user
- to use for managing anonymous sessions; defaults to None
- (anonymous access disabled).
- - (string) msg_login:
- the string sent when client logs in.
- - (string) msg_quit:
- the string sent when client quits.
- """
- BaseUnixAuthorizer.__init__(self, anonymous_user)
- if allowed_users is None:
- allowed_users = []
- if rejected_users is None:
- rejected_users = []
- self.global_perm = global_perm
- self.allowed_users = allowed_users
- self.rejected_users = rejected_users
- self.anonymous_user = anonymous_user
- self.require_valid_shell = require_valid_shell
- self.msg_login = msg_login
- self.msg_quit = msg_quit
- self._dummy_authorizer = DummyAuthorizer()
- self._dummy_authorizer._check_permissions('', global_perm)
- _Base.__init__(self)
- if require_valid_shell:
- for username in self.allowed_users:
- if not self._has_valid_shell(username):
- raise AuthorizerError("user %s has not a valid shell"
- % username)
- def override_user(self, username, password=None, homedir=None, perm=None,
- msg_login=None, msg_quit=None):
- """Overrides the options specified in the class constructor
- for a specific user.
- """
- if self.require_valid_shell and username != 'anonymous':
- if not self._has_valid_shell(username):
- raise AuthorizerError(self.msg_invalid_shell % username)
- _Base.override_user(self, username, password, homedir, perm,
- msg_login, msg_quit)
- # --- overridden / private API
- def validate_authentication(self, username, password, handler):
- if username == "anonymous":
- if self.anonymous_user is None:
- raise AuthenticationFailed(self.msg_anon_not_allowed)
- return
- if self._is_rejected_user(username):
- raise AuthenticationFailed(self.msg_rejected_user % username)
- overridden_password = self._get_key(username, 'pwd')
- if overridden_password:
- if overridden_password != password:
- raise AuthenticationFailed(self.msg_wrong_password)
- else:
- BaseUnixAuthorizer.validate_authentication(self, username,
- password, handler)
- if self.require_valid_shell and username != 'anonymous':
- if not self._has_valid_shell(username):
- raise AuthenticationFailed(self.msg_invalid_shell % username)
- @replace_anonymous
- def has_user(self, username):
- if self._is_rejected_user(username):
- return False
- return username in self._get_system_users()
- @replace_anonymous
- def get_home_dir(self, username):
- overridden_home = self._get_key(username, 'home')
- if overridden_home:
- return overridden_home
- return BaseUnixAuthorizer.get_home_dir(self, username)
- @staticmethod
- def _has_valid_shell(username):
- """Return True if the user has a valid shell binary listed
- in /etc/shells. If /etc/shells can't be found return True.
- """
- file = None
- try:
- try:
- file = open('/etc/shells', 'r')
- except IOError:
- err = sys.exc_info()[1]
- if err.errno == errno.ENOENT:
- return True
- raise
- else:
- try:
- shell = pwd.getpwnam(username).pw_shell
- except KeyError: # invalid user
- return False
- for line in file:
- if line.startswith('#'):
- continue
- line = line.strip()
- if line == shell:
- return True
- return False
- finally:
- if file is not None:
- file.close()
- # ===================================================================
- # --- Windows
- # ===================================================================
- try:
- import _winreg as winreg
- except ImportError:
- try:
- import winreg # PY3
- except ImportError:
- pass
- # Note: requires pywin32 extension
- try:
- import win32security, win32net, pywintypes, win32con, win32api
- except ImportError:
- pass
- else:
- __all__.extend(['BaseWindowsAuthorizer', 'WindowsAuthorizer'])
- class BaseWindowsAuthorizer(object):
- """An authorizer compatible with Windows user account and
- password database.
- This class should not be used directly unless for subclassing.
- Use higher-level WinowsAuthorizer class instead.
- """
- def __init__(self, anonymous_user=None, anonymous_password=None):
- # actually try to impersonate the user
- self.anonymous_user = anonymous_user
- self.anonymous_password = anonymous_password
- if self.anonymous_user is not None:
- self.impersonate_user(self.anonymous_user,
- self.anonymous_password)
- self.terminate_impersonation(None)
- def validate_authentication(self, username, password, handler):
- if username == "anonymous":
- if self.anonymous_user is None:
- raise AuthenticationFailed(self.msg_anon_not_allowed)
- return
- try:
- win32security.LogonUser(username, None, password,
- win32con.LOGON32_LOGON_INTERACTIVE,
- win32con.LOGON32_PROVIDER_DEFAULT)
- except pywintypes.error:
- raise AuthenticationFailed(self.msg_wrong_password)
- @replace_anonymous
- def impersonate_user(self, username, password):
- """Impersonate the security context of another user."""
- handler = win32security.LogonUser(username, None, password,
- win32con.LOGON32_LOGON_INTERACTIVE,
- win32con.LOGON32_PROVIDER_DEFAULT)
- win32security.ImpersonateLoggedOnUser(handler)
- handler.Close()
- def terminate_impersonation(self, username):
- """Terminate the impersonation of another user."""
- win32security.RevertToSelf()
- @replace_anonymous
- def has_user(self, username):
- return username in self._get_system_users()
- @replace_anonymous
- def get_home_dir(self, username):
- """Return the user's profile directory, the closest thing
- to a user home directory we have on Windows.
- """
- try:
- sid = win32security.ConvertSidToStringSid(
- win32security.LookupAccountName(None, username)[0])
- except pywintypes.error:
- err = sys.exc_info()[1]
- raise AuthorizerError(err)
- path = r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\ProfileList" + \
- "\\" + sid
- try:
- key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path)
- except WindowsError:
- raise AuthorizerError("No profile directory defined for user %s"
- % username)
- value = winreg.QueryValueEx(key, "ProfileImagePath")[0]
- home = win32api.ExpandEnvironmentStrings(value)
- if not PY3 and not isinstance(home, unicode):
- home = home.decode('utf8')
- return home
- @classmethod
- def _get_system_users(cls):
- """Return all users defined on the Windows system."""
- # XXX - Does Windows allow usernames with chars outside of
- # ASCII set? In that case we need to convert this to unicode.
- return [entry['name'] for entry in win32net.NetUserEnum(None, 0)[0]]
- def get_msg_login(self, username):
- return "Login successful."
- def get_msg_quit(self, username):
- return "Goodbye."
- def get_perms(self, username):
- return "elradfmw"
- def has_perm(self, username, perm, path=None):
- return perm in self.get_perms(username)
- class WindowsAuthorizer(_Base, BaseWindowsAuthorizer):
- """A wrapper on top of BaseWindowsAuthorizer providing options
- to specify what users should be allowed to login, per-user
- options, etc.
- Example usages:
- >>> from pyftpdlib.contrib.authorizers import WindowsAuthorizer
- >>> # accept all except Administrator
- >>> auth = UnixAuthorizer(rejected_users=["Administrator"])
- >>>
- >>> # accept some users only
- >>> auth = UnixAuthorizer(allowed_users=["matt", "jay"])
- >>>
- >>> # set specific options for a user
- >>> auth.override_user("matt", password="foo", perm="elr")
- """
- # --- public API
- def __init__(self, global_perm="elradfmw",
- allowed_users=None,
- rejected_users=None,
- anonymous_user=None,
- anonymous_password=None,
- msg_login="Login successful.",
- msg_quit="Goodbye."):
- """Parameters:
- - (string) global_perm:
- a series of letters referencing the users permissions;
- defaults to "elradfmw" which means full read and write
- access for everybody (except anonymous).
- - (list) allowed_users:
- a list of users which are accepted for authenticating
- against the FTP server; defaults to [] (no restrictions).
- - (list) rejected_users:
- a list of users which are not accepted for authenticating
- against the FTP server; defaults to [] (no restrictions).
- - (string) anonymous_user:
- specify it if you intend to provide anonymous access.
- The value expected is a string representing the system user
- to use for managing anonymous sessions.
- As for IIS, it is recommended to use Guest account.
- The common practice is to first enable the Guest user, which
- is disabled by default and then assign an empty password.
- Defaults to None (anonymous access disabled).
- - (string) anonymous_password:
- the password of the user who has been chosen to manage the
- anonymous sessions. Defaults to None (empty password).
- - (string) msg_login:
- the string sent when client logs in.
- - (string) msg_quit:
- the string sent when client quits.
- """
- if allowed_users is None:
- allowed_users = []
- if rejected_users is None:
- rejected_users = []
- self.global_perm = global_perm
- self.allowed_users = allowed_users
- self.rejected_users = rejected_users
- self.anonymous_user = anonymous_user
- self.anonymous_password = anonymous_password
- self.msg_login = msg_login
- self.msg_quit = msg_quit
- self._dummy_authorizer = DummyAuthorizer()
- self._dummy_authorizer._check_permissions('', global_perm)
- _Base.__init__(self)
- # actually try to impersonate the user
- if self.anonymous_user is not None:
- self.impersonate_user(self.anonymous_user,
- self.anonymous_password)
- self.terminate_impersonation(None)
- def override_user(self, username, password=None, homedir=None, perm=None,
- msg_login=None, msg_quit=None):
- """Overrides the options specified in the class constructor
- for a specific user.
- """
- _Base.override_user(self, username, password, homedir, perm,
- msg_login, msg_quit)
- # --- overridden / private API
- def validate_authentication(self, username, password, handler):
- """Authenticates against Windows user database; return
- True on success.
- """
- if username == "anonymous":
- if self.anonymous_user is None:
- raise AuthenticationFailed(self.msg_anon_not_allowed)
- return
- if self.allowed_users and username not in self.allowed_users:
- raise AuthenticationFailed(self.msg_rejected_user % username)
- if self.rejected_users and username in self.rejected_users:
- raise AuthenticationFailed(self.msg_rejected_user % username)
- overridden_password = self._get_key(username, 'pwd')
- if overridden_password:
- if overridden_password != password:
- raise AuthenticationFailed(self.msg_wrong_password)
- else:
- BaseWindowsAuthorizer.validate_authentication(self, username,
- password, handler)
- def impersonate_user(self, username, password):
- """Impersonate the security context of another user."""
- if username == "anonymous":
- username = self.anonymous_user or ""
- password = self.anonymous_password or ""
- BaseWindowsAuthorizer.impersonate_user(self, username, password)
- @replace_anonymous
- def has_user(self, username):
- if self._is_rejected_user(username):
- return False
- return username in self._get_system_users()
- @replace_anonymous
- def get_home_dir(self, username):
- overridden_home = self._get_key(username, 'home')
- if overridden_home:
- home = overridden_home
- else:
- home = BaseWindowsAuthorizer.get_home_dir(self, username)
- if not PY3 and not isinstance(home, unicode):
- home = home.decode('utf8')
- return home
|