mirror of
https://github.com/YunoHost-Apps/hubzilla_ynh.git
synced 2024-09-03 19:26:21 +02:00
202 lines
6.9 KiB
Python
202 lines
6.9 KiB
Python
import requests
|
|
import platform
|
|
from numbers import Number
|
|
import xml.etree.cElementTree as xml
|
|
from collections import namedtuple
|
|
|
|
py_majversion, py_minversion, py_revversion = platform.python_version_tuple()
|
|
|
|
if py_majversion == '2':
|
|
from httplib import responses as HTTP_CODES
|
|
from urlparse import urlparse
|
|
else:
|
|
from http.client import responses as HTTP_CODES
|
|
from urllib.parse import urlparse
|
|
|
|
DOWNLOAD_CHUNK_SIZE_BYTES = 1 * 1024 * 1024
|
|
|
|
class WebdavException(Exception):
|
|
pass
|
|
|
|
class ConnectionFailed(WebdavException):
|
|
pass
|
|
|
|
|
|
def codestr(code):
|
|
return HTTP_CODES.get(code, 'UNKNOWN')
|
|
|
|
|
|
File = namedtuple('File', ['name', 'size', 'mtime', 'ctime', 'contenttype'])
|
|
|
|
|
|
def prop(elem, name, default=None):
|
|
child = elem.find('.//{DAV:}' + name)
|
|
return default if child is None else child.text
|
|
|
|
|
|
def elem2file(elem):
|
|
return File(
|
|
prop(elem, 'href'),
|
|
int(prop(elem, 'getcontentlength', 0)),
|
|
prop(elem, 'getlastmodified', ''),
|
|
prop(elem, 'creationdate', ''),
|
|
prop(elem, 'getcontenttype', ''),
|
|
)
|
|
|
|
|
|
class OperationFailed(WebdavException):
|
|
_OPERATIONS = dict(
|
|
HEAD = "get header",
|
|
GET = "download",
|
|
PUT = "upload",
|
|
DELETE = "delete",
|
|
MKCOL = "create directory",
|
|
PROPFIND = "list directory",
|
|
)
|
|
|
|
def __init__(self, method, path, expected_code, actual_code):
|
|
self.method = method
|
|
self.path = path
|
|
self.expected_code = expected_code
|
|
self.actual_code = actual_code
|
|
operation_name = self._OPERATIONS[method]
|
|
self.reason = 'Failed to {operation_name} "{path}"'.format(**locals())
|
|
expected_codes = (expected_code,) if isinstance(expected_code, Number) else expected_code
|
|
expected_codes_str = ", ".join('{0} {1}'.format(code, codestr(code)) for code in expected_codes)
|
|
actual_code_str = codestr(actual_code)
|
|
msg = '''\
|
|
{self.reason}.
|
|
Operation : {method} {path}
|
|
Expected code : {expected_codes_str}
|
|
Actual code : {actual_code} {actual_code_str}'''.format(**locals())
|
|
super(OperationFailed, self).__init__(msg)
|
|
|
|
class Client(object):
|
|
def __init__(self, host, port=0, auth=None, username=None, password=None,
|
|
protocol='http', verify_ssl=True, path=None, cert=None, session=None):
|
|
if not port:
|
|
port = 443 if protocol == 'https' else 80
|
|
self.baseurl = '{0}://{1}:{2}'.format(protocol, host, port)
|
|
if path:
|
|
self.baseurl = '{0}/{1}'.format(self.baseurl, path)
|
|
self.cwd = '/'
|
|
if session is None:
|
|
self.session = requests.session()
|
|
else:
|
|
self.session = session
|
|
self.session.verify = verify_ssl
|
|
self.session.stream = True
|
|
|
|
if cert:
|
|
self.session.cert = cert
|
|
|
|
if auth:
|
|
self.session.auth = auth
|
|
elif username and password:
|
|
self.session.auth = (username, password)
|
|
|
|
def _send(self, method, path, expected_code, **kwargs):
|
|
url = self._get_url(path).strip(".")
|
|
#~ print self.session
|
|
#~ print self.session.verify
|
|
#~ print self.session.params
|
|
#~ print self.session.cookies
|
|
response = self.session.request(method, url, allow_redirects=False, **kwargs)
|
|
#~ print response.request.method
|
|
#~ print response.request.url
|
|
if isinstance(expected_code, Number) and response.status_code != expected_code \
|
|
or not isinstance(expected_code, Number) and response.status_code not in expected_code:
|
|
raise OperationFailed(method, path, expected_code, response.status_code)
|
|
return response
|
|
|
|
def _get_url(self, path):
|
|
path = str(path).strip()
|
|
if path.startswith('/'):
|
|
return self.baseurl + path
|
|
return "".join((self.baseurl, self.cwd, path))
|
|
|
|
def cd(self, path):
|
|
path = path.strip()
|
|
if not path:
|
|
return
|
|
stripped_path = '/'.join(part for part in path.split('/') if part) + '/'
|
|
|
|
if stripped_path == '/':
|
|
self.cwd = stripped_path
|
|
elif path.startswith('/'):
|
|
self.cwd = '/' + stripped_path
|
|
elif stripped_path == "./":
|
|
return
|
|
elif stripped_path == "../":
|
|
self.cwd ='/'.join( self.cwd.split('/')[:-2] ) + '/'
|
|
else:
|
|
self.cwd += stripped_path
|
|
|
|
def mkdir(self, path, safe=False):
|
|
expected_codes = 201 if not safe else (201, 301, 405)
|
|
self._send('MKCOL', path, expected_codes)
|
|
|
|
def mkdirs(self, path):
|
|
dirs = [d for d in path.split('/') if d]
|
|
if not dirs:
|
|
return
|
|
if path.startswith('/'):
|
|
dirs[0] = '/' + dirs[0]
|
|
old_cwd = self.cwd
|
|
try:
|
|
for dir in dirs:
|
|
try:
|
|
self.mkdir(dir, safe=True)
|
|
except Exception as e:
|
|
if e.actual_code == 409:
|
|
raise
|
|
finally:
|
|
self.cd(dir)
|
|
finally:
|
|
self.cd(old_cwd)
|
|
|
|
def rmdir(self, path, safe=False):
|
|
path = str(path).rstrip('/') + '/'
|
|
expected_codes = 204 if not safe else (204, 404)
|
|
self._send('DELETE', path, expected_codes)
|
|
|
|
def delete(self, path):
|
|
self._send('DELETE', path, 204)
|
|
|
|
def upload(self, local_path_or_fileobj, remote_path):
|
|
if isinstance(local_path_or_fileobj, basestring):
|
|
with open(local_path_or_fileobj, 'rb') as f:
|
|
self._upload(f, remote_path)
|
|
else:
|
|
self._upload(local_path_or_fileobj, remote_path)
|
|
|
|
def _upload(self, fileobj, remote_path):
|
|
self._send('PUT', remote_path, (200, 201, 204), data=fileobj)
|
|
|
|
def download(self, remote_path, local_path_or_fileobj):
|
|
response = self._send('GET', remote_path, 200, stream=True)
|
|
if isinstance(local_path_or_fileobj, basestring):
|
|
with open(local_path_or_fileobj, 'wb') as f:
|
|
self._download(f, response)
|
|
else:
|
|
self._download(local_path_or_fileobj, response)
|
|
|
|
def _download(self, fileobj, response):
|
|
for chunk in response.iter_content(DOWNLOAD_CHUNK_SIZE_BYTES):
|
|
fileobj.write(chunk)
|
|
|
|
def ls(self, remote_path='.'):
|
|
headers = {'Depth': '1'}
|
|
response = self._send('PROPFIND', remote_path, (207, 301), headers=headers)
|
|
|
|
# Redirect
|
|
if response.status_code == 301:
|
|
url = urlparse(response.headers['location'])
|
|
return self.ls(url.path)
|
|
|
|
tree = xml.fromstring(response.content)
|
|
return [elem2file(elem) for elem in tree.findall('{DAV:}response')]
|
|
|
|
def exists(self, remote_path):
|
|
response = self._send('HEAD', remote_path, (200, 301, 404))
|
|
return True if response.status_code != 404 else False
|