Exception handling

This commit is contained in:
Kloadut 2012-10-08 18:16:43 +02:00
parent 5dbce99e65
commit 4e1dd30c4c
6 changed files with 223 additions and 191 deletions

1
.gitignore vendored
View file

@ -4,6 +4,7 @@
*.egg
*.egg-info
*.swp
*.swo
dist
build
eggs

149
lib/yunohost.py Normal file
View file

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
import os
import sys
import ldap
import ldap.modlist as modlist
import re
import getpass
def colorize(astr, color):
color_dict = {
'red' : '31',
'green' : '32',
'yellow': '33',
'cyan' : '34',
'purple': '35'
}
return "\033["+ color_dict[color] +"m\033[1m" + astr + "\033[m"
def win_msg(astr):
if os.isatty(1):
print('\n' + colorize(_("Success: "), 'green') + astr + '\n')
def str_to_func(astr):
"""
Call a function from a string name
Keyword arguments:
astr -- Name of function to call
Returns:
Function
"""
module, _, function = astr.rpartition('.')
if module:
__import__(module)
mod = sys.modules[module]
else:
mod = sys.modules['__main__'] # default module
try:
func = getattr(mod, function)
except NameError:
raise YunoHostError(168, _('Function is not defined'))
else:
return func
class YunoHostError(Exception):
""" Custom exception """
def __init__(self, code, message):
code_dict = {
1 : _('Fail'),
13 : _('Permission denied'),
17 : _('Already exists'),
22 : _('Invalid arguments'),
87 : _('Too many users'),
111 : _('Connection refused'),
122 : _('Quota exceeded'),
125 : _('Operation canceled'),
167 : _('Not found'),
168 : _('Undefined'),
169 : _('LDAP operation error')
}
self.code = code
self.message = message
if code_dict[code]:
self.desc = code_dict[code]
else:
self.desc = code
class YunoHostLDAP:
""" Specific LDAP functions for YunoHost """
def __init__(self):
""" Connect to LDAP base """
self.conn = ldap.initialize('ldap://localhost:389')
self.base = 'dc=yunohost,dc=org'
self.pwd = getpass.getpass(_('LDAP Admin Password: '))
try:
self.conn.simple_bind_s('cn=admin,' + self.base, self.pwd)
except ldap.INVALID_CREDENTIALS:
raise YunoHostError(13, _('Invalid credentials'))
def disconnect(self):
""" Unbind from LDAP """
try:
self.conn.unbind_s()
except:
raise YunoHostError(169, _('An error occured during disconnection'))
else:
return True
def search(self, base=None, filter='(objectClass=*)', attrs=['dn']):
""" Search in LDAP base """
if not base:
base = self.base
try:
result = self.conn.search_s(base, ldap.SCOPE_SUBTREE, filter, attrs)
except:
raise YunoHostError(169, _('An error occured during LDAP search'))
if result:
result_list = []
for dn, entry in result:
if 'dn' in attrs:
entry['dn'] = [dn]
result_list.append(entry)
return result_list
else:
return False
def add(self, rdn, attr_dict):
""" Add LDAP entry """
dn = rdn + ',' + self.base
ldif = modlist.addModlist(attr_dict)
try:
self.conn.add_s(dn, ldif)
except:
raise YunoHostError(169, _('An error occured during LDAP entry creation'))
else:
return True
def validate(self, regex_dict):
for attr, pattern in regex_dict.items():
if re.match(pattern, attr):
continue
else:
raise YunoHostError(22, _('Invalid attribute') + ' ' + attr)
return True
def validate_uniqueness(self, value_dict):
for attr, value in value_dict.items():
if not self.search(filter=attr + '=' + value):
continue
else:
raise YunoHostError(17, _('Attribute already exists') + ' "' + attr + '=' + value + '"')
return True

View file

@ -1,90 +0,0 @@
# -*- coding: utf-8 -*-
import sys
import ldap
import ldap.modlist as modlist
import re
import getpass
import yunohost_messages as msg
class YunoHostLDAP:
""" Specific LDAP functions for YunoHost """
def __init__(self):
""" Connect to LDAP base """
self.conn = ldap.initialize('ldap://localhost:389')
self.base = 'dc=yunohost,dc=org'
self.pwd = getpass.getpass(_('LDAP Admin Password: '))
try:
self.conn.simple_bind_s('cn=admin,' + self.base, self.pwd)
except ldap.INVALID_CREDENTIALS:
print(msg.error + _('Wrong credentials'))
sys.exit(msg.ECONNREFUSED)
def disconnect(self):
""" Unbind from LDAP """
try:
self.conn.unbind_s()
except:
print(msg.error + _('A problem occured during LDAP unbind'))
return False
else:
return True
def search(self, base=None, filter='(objectClass=*)', attrs=['dn']):
""" Search in LDAP base """
if not base:
base = self.base
try:
result = self.conn.search_s(base, ldap.SCOPE_SUBTREE, filter, attrs)
except:
print(msg.error + _('An error occured during LDAP search'))
return False
if result:
result_list = []
for dn, entry in result:
if 'dn' in attrs:
entry['dn'] = [dn]
result_list.append(entry)
return result_list
else:
return False
def add(self, rdn, attr_dict):
""" Add LDAP entry """
dn = rdn + ',' + self.base
ldif = modlist.addModlist(attr_dict)
try:
self.conn.add_s(dn, ldif)
except:
print(msg.error + _('An error occured during LDAP entry creation'))
return False
else:
return True
def validate(self, regex_dict):
for attr, pattern in regex_dict.items():
if re.match(pattern, attr):
continue
else:
print(msg.error + _('Invalid value') + ' "' + attr + '"')
sys.exit(msg.EINVAL)
return True
def validate_uniqueness(self, value_dict):
for attr, value in value_dict.items():
if not self.search(filter=attr + '=' + value):
continue
else:
print(msg.error + _('Attribute already exists') + ' "' + attr + '=' + value + '"')
sys.exit(msg.EEXIST)
return True

View file

@ -1,20 +0,0 @@
# -*- coding: utf-8 -*-
""" Colored status messages """
error = "\033[31m\033[1m" + _("Error:") + "\033[m " # Red
interrupt = "\033[31m\033[1m" + _("Interrupt:") + "\033[m " # Red
notice = "\033[34m\033[1m" + _("Notice:") + "\033[m " # Cyan
success = "\033[32m\033[1m" + _("Success:") + "\033[m " # Green
""" Error codes """
EACCES = 13 # Permission denied
EEXIST = 17 # Exists
EINVAL = 22 # Invalid argument
EUSERS = 87 # Too many users
ECONNREFUSED = 111 # Connection refused
EDQUOTA = 122 # Quota exceeded
ECANCELED = 125 # Operation Canceled
ENOTFOUND = 167 # Not found
EUNDEFINED = 168 # Undefined
ELDAP = 169 # LDAP operation error

View file

@ -1,16 +1,16 @@
# -*- coding: utf-8 -*-
import os
import sys
import ldap
import yunohost_ldap
import yunohost_messages as msg
import crypt
import random
import string
import getpass
from yunohost import YunoHostError, YunoHostLDAP, win_msg
# Initialize LDAP
yldap = yunohost_ldap.YunoHostLDAP()
yldap = YunoHostLDAP()
def user_list(args): # TODO : fix
result = yldap.search()
@ -33,17 +33,21 @@ def user_add(args):
try:
for arg in required_args:
if not args[arg]:
if os.isatty(1):
args[arg] = raw_input(arg.capitalize()+': ')
else:
raise Exception
# Password
if not args['password']:
if os.isatty(1):
args['password'] = getpass.getpass()
pwd2 = getpass.getpass('Retype password:')
if args['password'] != pwd2:
print(msg.error + _("Passwords doesn't match"))
sys.exit(msg.EINVAL)
raise YunoHostError(22, _("Passwords doesn't match"))
else:
raise YunoHostError(22, _("Missing arguments"))
except KeyboardInterrupt, EOFError:
print("\n" + msg.interrupt + _("User not created"))
sys.exit(msg.ECANCELED)
raise YunoHostError(125, _("Interrupted, user not created"))
# Manage values
fullname = args['firstname'] + ' ' + args['lastname']
@ -51,7 +55,7 @@ def user_add(args):
char_set = string.ascii_uppercase + string.digits
salt = ''.join(random.sample(char_set,8))
salt = '$1$' + salt + '$'
pwd = "{CRYPT}" + crypt.crypt(str(args['password']), salt)
pwd = '{CRYPT}' + crypt.crypt(str(args['password']), salt)
attr_dict = {
'objectClass' : ['mailAccount', 'inetOrgPerson'],
'givenName' : args['firstname'],
@ -77,11 +81,7 @@ def user_add(args):
})
if yldap.add(rdn, attr_dict):
print('\n ' + msg.success + _('User successfully created') + '\n')
for attr, value in attr_dict.items():
if attr != 'objectClass':
print('\033[35m\033[1m ' + attr + ': \033[m' + value)
return True
win_msg(_("User successfully created"))
return attr_dict
else:
print(msg.error + _('An error occured during user creation'))
return False
raise YunoHostError(169, _('An error occured during user creation'))

108
yunohost
View file

@ -20,39 +20,44 @@ __credits__ = """
__author__ = 'Kload <kload@kload.fr>'
__version__ = '2.0 beta1'
import os
import sys
import argparse
import gettext
import json
if not __debug__:
import traceback
sys.path.append('lib') # Local temporary hack
gettext.install('YunoHost')
import yunohost_messages as msg
def str_to_func(astr):
"""
Call a function from a string name
Keyword arguments:
astr -- Name of function to call
Returns:
Function
"""
module, _, function = astr.rpartition('.')
if module:
__import__(module)
mod = sys.modules[module]
else:
mod = sys.modules['__main__'] # default module
try:
func = getattr(mod, function)
except NameError:
print(msg.error + _('Function is not defined'))
sys.exit(msg.EUNDEFINED)
else:
return func
from yunohost import YunoHostError, str_to_func, colorize
action_dict = {
'user' : {
'help' : 'Manage users',
'actions' : {
'list' : 'List users',
'add' : 'Add user'
}
},
'domain' : {
'help' : 'Manage domains',
'actions' : {}
},
'app' : {
'help' : 'Manage apps',
'actions' : {}
},
'monitor' : {
'help' : 'Monitoring functions',
'actions' : {}
},
'tools' : {
'help' : 'Specific tools',
'actions' : {}
}
}
def dict_to_parsers(action_dict):
"""
@ -82,7 +87,6 @@ def dict_to_parsers(action_dict):
return parsers
def parse_args(parsers):
"""
Add and parse arguments
@ -136,40 +140,28 @@ def parse_args(parsers):
return args
def main():
""" Main instructions """
action_dict = {
'user' : {
'help' : 'Manage users',
'actions' : {
'list' : 'List users',
'add' : 'Add user'
}
},
'domain' : {
'help' : 'Manage domains',
'actions' : {}
},
'app' : {
'help' : 'Manage apps',
'actions' : {}
},
'monitor' : {
'help' : 'Monitoring functions',
'actions' : {}
},
'tools' : {
'help' : 'Specific tools',
'actions' : {}
}
}
try:
parsers = dict_to_parsers(action_dict)
args = parse_args(parsers)
args.func(vars(args))
result = args.func(vars(args))
except YunoHostError, error:
if not __debug__ :
traceback.print_exc()
if os.isatty(1):
print('\n' + colorize(_('Error: '), 'red') + error.message)
else:
print(json.dumps({ 'error' : error.message }))
return error.code
else:
if os.isatty(1):
for attr, value in result.items():
if type(value) is str:
print(colorize(attr, 'purple') + ': ' + value)
else:
print(json.dumps(result))
return 0
if __name__ == '__main__':
main()
sys.exit(main())