Commit 683a4557 authored by nednguyen's avatar nednguyen Committed by Commit Bot

Revert of Remove find_dependencies script (patchset #1 id:1 of...

Revert of Remove find_dependencies script (patchset #1 id:1 of https://codereview.chromium.org/2932213002/ )

Reason for revert:
find_dependencies is still used.

Original issue's description:
> Remove find_dependencies script
>
> BUG=chromium:728177
>
> Review-Url: https://codereview.chromium.org/2932213002
> Cr-Commit-Position: refs/heads/master@{#478658}
> Committed: https://chromium.googlesource.com/chromium/src/+/e5ee4f9d1a7815c320dadb315675449a710c19aa

TBR=charliea@chromium.org,stevenjb@chromium.org,achuith@chromium.org
# Skipping CQ checks because original CL landed less than 1 days ago.
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true
BUG=chromium:728177

Review-Url: https://codereview.chromium.org/2933163002
Cr-Commit-Position: refs/heads/master@{#478675}
parent e5999cc4
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Bootstrap Chrome Telemetry by downloading all its files from SVN servers.
Requires a DEPS file to specify which directories on which SVN servers
are required to run Telemetry. Format of that DEPS file is a subset of the
normal DEPS file format[1]; currently only only the "deps" dictionary is
supported and nothing else.
Fetches all files in the specified directories using WebDAV (SVN is WebDAV under
the hood).
[1] http://dev.chromium.org/developers/how-tos/depottools#TOC-DEPS-file
"""
import imp
import logging
import os
import urllib
import urlparse
# Dummy module for DAVclient.
davclient = None
# TODO(eakuefner): Switch this link to tools/perf version after verifying.
# Link to file containing the 'davclient' WebDAV client library.
_DAVCLIENT_URL = ('https://src.chromium.org/chrome/trunk/src/tools/'
'telemetry/third_party/davclient/davclient.py')
def _DownloadAndImportDAVClientModule():
"""Dynamically import davclient helper library."""
global davclient
davclient_src = urllib.urlopen(_DAVCLIENT_URL).read()
davclient = imp.new_module('davclient')
exec davclient_src in davclient.__dict__ # pylint: disable=exec-used
class DAVClientWrapper(object):
"""Knows how to retrieve subdirectories and files from WebDAV/SVN servers."""
def __init__(self, root_url):
"""Initialize SVN server root_url, save files to local dest_dir.
Args:
root_url: string url of SVN/WebDAV server
"""
self.root_url = root_url
self.client = davclient.DAVClient(root_url)
@staticmethod
def __norm_path_keys(dict_with_path_keys):
"""Returns a dictionary with os.path.normpath called on every key."""
return dict((os.path.normpath(k), v) for (k, v) in
dict_with_path_keys.items())
def GetDirList(self, path):
"""Returns string names of all files and subdirs of path on the server."""
props = self.__norm_path_keys(self.client.propfind(path, depth=1))
# remove this path
del props[os.path.normpath(path)]
return [os.path.basename(p) for p in props.keys()]
def IsFile(self, path):
"""Returns True if the path is a file on the server, False if directory."""
props = self.__norm_path_keys(self.client.propfind(path, depth=1))
return props[os.path.normpath(path)]['resourcetype'] is None
def Traverse(self, src_path, dst_path):
"""Walks the directory hierarchy pointed to by src_path download all files.
Recursively walks src_path and saves all files and subfolders into
dst_path.
Args:
src_path: string path on SVN server to save (absolute path on server).
dest_path: string local path (relative or absolute) to save to.
"""
if self.IsFile(src_path):
if not os.path.exists(os.path.dirname(dst_path)):
logging.info('Creating %s', os.path.dirname(dst_path))
os.makedirs(os.path.dirname(dst_path))
if os.path.isfile(dst_path):
logging.info('Skipping %s', dst_path)
else:
logging.info('Saving %s to %s', self.root_url + src_path, dst_path)
urllib.urlretrieve(self.root_url + src_path, dst_path)
return
else:
for subdir in self.GetDirList(src_path):
self.Traverse(os.path.join(src_path, subdir),
os.path.join(dst_path, subdir))
def ListAllDepsPaths(deps_file):
"""Recursively returns a list of all paths indicated in this deps file.
Note that this discards information about where path dependencies come from,
so this is only useful in the context of a Chromium source checkout that has
already fetched all dependencies.
Args:
deps_file: File containing deps information to be evaluated, in the
format given in the header of this file.
Returns:
A list of string paths starting under src that are required by the
given deps file, and all of its sub-dependencies. This amounts to
the keys of the 'deps' dictionary.
"""
deps = {}
deps_includes = {}
chrome_root = os.path.dirname(__file__)
while os.path.basename(chrome_root) != 'src':
chrome_root = os.path.abspath(os.path.join(chrome_root, '..'))
exec open(deps_file).read() # pylint: disable=exec-used
deps_paths = deps.keys()
for path in deps_includes.keys():
# Need to localize the paths.
path = os.path.join(chrome_root, '..', path)
deps_paths += ListAllDepsPaths(path)
return deps_paths
def DownloadDeps(destination_dir, url):
"""Saves all the dependencies in deps_path.
Opens and reads url, assuming the contents are in the simple DEPS-like file
format specified in the header of this file, then download all
files/directories listed to the destination_dir.
Args:
destination_dir: String path to directory to download files into.
url: URL containing deps information to be evaluated.
"""
logging.warning('Downloading deps from %s...', url)
# TODO(wiltzius): Add a parameter for which revision to pull.
_DownloadAndImportDAVClientModule()
deps = {}
deps_includes = {}
exec urllib.urlopen(url).read() # pylint: disable=exec-used
for dst_path, src_path in deps.iteritems():
full_dst_path = os.path.join(destination_dir, dst_path)
parsed_url = urlparse.urlparse(src_path)
root_url = parsed_url.scheme + '://' + parsed_url.netloc
dav_client = DAVClientWrapper(root_url)
dav_client.Traverse(parsed_url.path, full_dst_path)
for url in deps_includes.values():
DownloadDeps(destination_dir, url)
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import fnmatch
import imp
import logging
import optparse
import os
import sys
import zipfile
from telemetry import benchmark
from telemetry.core import discover
from telemetry.internal.util import command_line
from telemetry.internal.util import path
from telemetry.internal.util import path_set
try:
from modulegraph import modulegraph # pylint: disable=import-error
except ImportError as err:
modulegraph = None
import_error = err
from core import bootstrap
from core import path_util
DEPS_FILE = 'bootstrap_deps'
def FindBootstrapDependencies(base_dir):
deps_file = os.path.join(base_dir, DEPS_FILE)
if not os.path.exists(deps_file):
return []
deps_paths = bootstrap.ListAllDepsPaths(deps_file)
return set(os.path.realpath(os.path.join(
path_util.GetChromiumSrcDir(), '..', deps_path))
for deps_path in deps_paths)
def FindPythonDependencies(module_path):
logging.info('Finding Python dependencies of %s', module_path)
if modulegraph is None:
raise import_error
sys_path = sys.path
sys.path = list(sys_path)
try:
# Load the module to inherit its sys.path modifications.
sys.path.insert(0, os.path.abspath(os.path.dirname(module_path)))
imp.load_source(
os.path.splitext(os.path.basename(module_path))[0], module_path)
# Analyze the module for its imports.
graph = modulegraph.ModuleGraph()
graph.run_script(module_path)
# Filter for only imports in Chromium.
for node in graph.nodes():
if not node.filename:
continue
module_path = os.path.realpath(node.filename)
_, incoming_edges = graph.get_edges(node)
message = 'Discovered %s (Imported by: %s)' % (
node.filename, ', '.join(
d.filename for d in incoming_edges
if d is not None and d.filename is not None))
logging.info(message)
# This check is done after the logging/printing above to make sure that
# we also print out the dependency edges that include python packages
# that are not in chromium.
if not path.IsSubpath(module_path, path_util.GetChromiumSrcDir()):
continue
yield module_path
if node.packagepath is not None:
for p in node.packagepath:
yield p
finally:
sys.path = sys_path
def FindPageSetDependencies(base_dir):
logging.info('Finding page sets in %s', base_dir)
# Add base_dir to path so our imports relative to base_dir will work.
sys.path.append(base_dir)
tests = discover.DiscoverClasses(base_dir, base_dir, benchmark.Benchmark,
index_by_class_name=True)
for test_class in tests.itervalues():
test_obj = test_class()
# Ensure the test's default options are set if needed.
parser = optparse.OptionParser()
test_obj.AddCommandLineArgs(parser, None)
options = optparse.Values()
for k, v in parser.get_default_values().__dict__.iteritems():
options.ensure_value(k, v)
# Page set paths are relative to their runner script, not relative to us.
path.GetBaseDir = lambda: base_dir
# TODO: Loading the page set will automatically download its Cloud Storage
# deps. This is really expensive, and we don't want to do this by default.
story_set = test_obj.CreateStorySet(options)
# Add all of its serving_dirs as dependencies.
for serving_dir in story_set.serving_dirs:
yield serving_dir
def FindExcludedFiles(files, options):
# Define some filters for files.
def IsHidden(path_string):
for pathname_component in path_string.split(os.sep):
if pathname_component.startswith('.'):
return True
return False
def IsPyc(path_string):
return os.path.splitext(path_string)[1] == '.pyc'
def IsInCloudStorage(path_string):
return os.path.exists(path_string + '.sha1')
def MatchesExcludeOptions(path_string):
for pattern in options.exclude:
if (fnmatch.fnmatch(path_string, pattern) or
fnmatch.fnmatch(os.path.basename(path_string), pattern)):
return True
return False
# Collect filters we're going to use to exclude files.
exclude_conditions = [
IsHidden,
IsPyc,
IsInCloudStorage,
MatchesExcludeOptions,
]
# Check all the files against the filters.
for file_path in files:
if any(condition(file_path) for condition in exclude_conditions):
yield file_path
def FindDependencies(target_paths, options):
# Verify arguments.
for target_path in target_paths:
if not os.path.exists(target_path):
raise ValueError('Path does not exist: %s' % target_path)
dependencies = path_set.PathSet()
# Including Telemetry's major entry points will (hopefully) include Telemetry
# and all its dependencies. If the user doesn't pass any arguments, we just
# have Telemetry.
dependencies |= FindPythonDependencies(os.path.realpath(
os.path.join(path_util.GetTelemetryDir(),
'telemetry', 'benchmark_runner.py')))
dependencies |= FindPythonDependencies(os.path.realpath(
os.path.join(path_util.GetTelemetryDir(),
'telemetry', 'testing', 'run_tests.py')))
# Add dependencies.
for target_path in target_paths:
base_dir = os.path.dirname(os.path.realpath(target_path))
dependencies.add(base_dir)
dependencies |= FindBootstrapDependencies(base_dir)
dependencies |= FindPythonDependencies(target_path)
if options.include_page_set_data:
dependencies |= FindPageSetDependencies(base_dir)
# Remove excluded files.
dependencies -= FindExcludedFiles(set(dependencies), options)
return dependencies
def ZipDependencies(target_paths, dependencies, options):
base_dir = os.path.dirname(os.path.realpath(path_util.GetChromiumSrcDir()))
with zipfile.ZipFile(options.zip, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Add dependencies to archive.
for dependency_path in dependencies:
path_in_archive = os.path.join(
'telemetry', os.path.relpath(dependency_path, base_dir))
zip_file.write(dependency_path, path_in_archive)
# Add symlinks to executable paths, for ease of use.
for target_path in target_paths:
link_info = zipfile.ZipInfo(
os.path.join('telemetry', os.path.basename(target_path)))
link_info.create_system = 3 # Unix attributes.
# 010 is regular file, 0111 is the permission bits rwxrwxrwx.
link_info.external_attr = 0100777 << 16 # Octal.
relative_path = os.path.relpath(target_path, base_dir)
link_script = (
'#!/usr/bin/env python\n\n'
'import os\n'
'import sys\n\n\n'
'script = os.path.join(os.path.dirname(__file__), \'%s\')\n'
'os.execv(sys.executable, [sys.executable, script] + sys.argv[1:])'
% relative_path)
zip_file.writestr(link_info, link_script)
class FindDependenciesCommand(command_line.OptparseCommand):
"""Prints all dependencies"""
@classmethod
def AddCommandLineArgs(cls, parser, _):
parser.add_option(
'-v', '--verbose', action='count', dest='verbosity',
help='Increase verbosity level (repeat as needed).')
parser.add_option(
'-p', '--include-page-set-data', action='store_true', default=False,
help='Scan tests for page set data and include them.')
parser.add_option(
'-e', '--exclude', action='append', default=[],
help='Exclude paths matching EXCLUDE. Can be used multiple times.')
parser.add_option(
'-z', '--zip',
help='Store files in a zip archive at ZIP.')
@classmethod
def ProcessCommandLineArgs(cls, parser, args, _):
if args.verbosity >= 2:
logging.getLogger().setLevel(logging.DEBUG)
elif args.verbosity:
logging.getLogger().setLevel(logging.INFO)
else:
logging.getLogger().setLevel(logging.WARNING)
def Run(self, args):
target_paths = args.positional_args
dependencies = FindDependencies(target_paths, args)
if args.zip:
ZipDependencies(target_paths, dependencies, args)
print 'Zip archive written to %s.' % args.zip
else:
print '\n'.join(sorted(dependencies))
return 0
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import unittest
from telemetry.core import util
from core import find_dependencies
class FindDependenciesTest(unittest.TestCase):
def testFindPythonDependencies(self):
try:
dog_object_path = os.path.join(
util.GetUnittestDataDir(),
'dependency_test_dir', 'dog', 'dog', 'dog_object.py')
cat_module_path = os.path.join(
util.GetUnittestDataDir(),
'dependency_test_dir', 'other_animals', 'cat', 'cat')
cat_module_init_path = os.path.join(cat_module_path, '__init__.py')
cat_object_path = os.path.join(cat_module_path, 'cat_object.py')
self.assertEquals(
set(p for p in
find_dependencies.FindPythonDependencies(dog_object_path)),
{dog_object_path, cat_module_path, cat_module_init_path,
cat_object_path})
except ImportError: # crbug.com/559527
pass
def testFindPythonDependenciesWithNestedImport(self):
try:
moose_module_path = os.path.join(
util.GetUnittestDataDir(),
'dependency_test_dir', 'other_animals', 'moose', 'moose')
moose_object_path = os.path.join(moose_module_path, 'moose_object.py')
horn_module_path = os.path.join(moose_module_path, 'horn')
horn_module_init_path = os.path.join(horn_module_path, '__init__.py')
horn_object_path = os.path.join(horn_module_path, 'horn_object.py')
self.assertEquals(
set(p for p in
find_dependencies.FindPythonDependencies(moose_object_path)),
{moose_object_path,
horn_module_path, horn_module_init_path, horn_object_path})
except ImportError: # crbug.com/559527
pass
#!/usr/bin/env python
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import sys
from core import find_dependencies
if __name__ == '__main__':
find_dependencies.FindDependenciesCommand.main()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
Name: Davclient
Short Name: davclient
URL: http://svn.osafoundation.org/tools/davclient/
Version: 0.2.1
Date: Undated, retrieved Dec 14, 2012
Revision: r1008
License: Apache 2.0
License File: NOT_SHIPPED
Security Critical: no
Description:
A simple Python WebDAV client. Used for the Telemetry bootstrap to easily
fetch files from SVN servers.
Local Modifications:
None. However packaging and installation code from original repository have not
been copied.
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Copyright (c) 2006-2007 Open Source Applications Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urlparse, httplib, copy, base64, StringIO
import urllib
try:
from xml.etree import ElementTree
except:
from elementtree import ElementTree
__all__ = ['DAVClient']
def object_to_etree(parent, obj, namespace=''):
"""This function takes in a python object, traverses it, and adds it to an existing etree object"""
if type(obj) is int or type(obj) is float or type(obj) is str:
# If object is a string, int, or float just add it
obj = str(obj)
if obj.startswith('{') is False:
ElementTree.SubElement(parent, '{%s}%s' % (namespace, obj))
else:
ElementTree.SubElement(parent, obj)
elif type(obj) is dict:
# If the object is a dictionary we'll need to parse it and send it back recusively
for key, value in obj.items():
if key.startswith('{') is False:
key_etree = ElementTree.SubElement(parent, '{%s}%s' % (namespace, key))
object_to_etree(key_etree, value, namespace=namespace)
else:
key_etree = ElementTree.SubElement(parent, key)
object_to_etree(key_etree, value, namespace=namespace)
elif type(obj) is list:
# If the object is a list parse it and send it back recursively
for item in obj:
object_to_etree(parent, item, namespace=namespace)
else:
# If it's none of previous types then raise
raise TypeError, '%s is an unsupported type' % type(obj)
class DAVClient(object):
def __init__(self, url='http://localhost:8080'):
"""Initialization"""
self._url = urlparse.urlparse(url)
self.headers = {'Host':self._url[1],
'User-Agent': 'python.davclient.DAVClient/0.1'}
def _request(self, method, path='', body=None, headers=None):
"""Internal request method"""
self.response = None
if headers is None:
headers = copy.copy(self.headers)
else:
new_headers = copy.copy(self.headers)
new_headers.update(headers)
headers = new_headers
if self._url.scheme == 'http':
self._connection = httplib.HTTPConnection(self._url[1])
elif self._url.scheme == 'https':
self._connection = httplib.HTTPSConnection(self._url[1])
else:
raise Exception, 'Unsupported scheme'
self._connection.request(method, path, body, headers)
self.response = self._connection.getresponse()
self.response.body = self.response.read()
# Try to parse and get an etree
try:
self._get_response_tree()
except:
pass
def _get_response_tree(self):
"""Parse the response body into an elementree object"""
self.response.tree = ElementTree.fromstring(self.response.body)
return self.response.tree
def set_basic_auth(self, username, password):
"""Set basic authentication"""
auth = 'Basic %s' % base64.encodestring('%s:%s' % (username, password)).strip()
self._username = username
self._password = password
self.headers['Authorization'] = auth
## HTTP DAV methods ##
def get(self, path, headers=None):
"""Simple get request"""
self._request('GET', path, headers=headers)
return self.response.body
def head(self, path, headers=None):
"""Basic HEAD request"""
self._request('HEAD', path, headers=headers)
def put(self, path, body=None, f=None, headers=None):
"""Put resource with body"""
if f is not None:
body = f.read()
self._request('PUT', path, body=body, headers=headers)
def post(self, path, body=None, headers=None):
"""POST resource with body"""
self._request('POST', path, body=body, headers=headers)
def mkcol(self, path, headers=None):
"""Make DAV collection"""
self._request('MKCOL', path=path, headers=headers)
make_collection = mkcol
def delete(self, path, headers=None):
"""Delete DAV resource"""
self._request('DELETE', path=path, headers=headers)
def copy(self, source, destination, body=None, depth='infinity', overwrite=True, headers=None):
"""Copy DAV resource"""
# Set all proper headers
if headers is None:
headers = {'Destination':destination}
else:
headers['Destination'] = self._url.geturl() + destination
if overwrite is False:
headers['Overwrite'] = 'F'
headers['Depth'] = depth
self._request('COPY', source, body=body, headers=headers)
def copy_collection(self, source, destination, depth='infinity', overwrite=True, headers=None):
"""Copy DAV collection"""
body = '<?xml version="1.0" encoding="utf-8" ?><d:propertybehavior xmlns:d="DAV:"><d:keepalive>*</d:keepalive></d:propertybehavior>'
# Add proper headers
if headers is None:
headers = {}
headers['Content-Type'] = 'text/xml; charset="utf-8"'
self.copy(source, destination, body=unicode(body, 'utf-8'), depth=depth, overwrite=overwrite, headers=headers)
def move(self, source, destination, body=None, depth='infinity', overwrite=True, headers=None):
"""Move DAV resource"""
# Set all proper headers
if headers is None:
headers = {'Destination':destination}
else:
headers['Destination'] = self._url.geturl() + destination
if overwrite is False:
headers['Overwrite'] = 'F'
headers['Depth'] = depth
self._request('MOVE', source, body=body, headers=headers)
def move_collection(self, source, destination, depth='infinity', overwrite=True, headers=None):
"""Move DAV collection and copy all properties"""
body = '<?xml version="1.0" encoding="utf-8" ?><d:propertybehavior xmlns:d="DAV:"><d:keepalive>*</d:keepalive></d:propertybehavior>'
# Add proper headers
if headers is None:
headers = {}
headers['Content-Type'] = 'text/xml; charset="utf-8"'
self.move(source, destination, unicode(body, 'utf-8'), depth=depth, overwrite=overwrite, headers=headers)
def propfind(self, path, properties='allprop', namespace='DAV:', depth=None, headers=None):
"""Property find. If properties arg is unspecified it defaults to 'allprop'"""
# Build propfind xml
root = ElementTree.Element('{DAV:}propfind')
if type(properties) is str:
ElementTree.SubElement(root, '{DAV:}%s' % properties)
else:
props = ElementTree.SubElement(root, '{DAV:}prop')
object_to_etree(props, properties, namespace=namespace)
tree = ElementTree.ElementTree(root)
# Etree won't just return a normal string, so we have to do this
body = StringIO.StringIO()
tree.write(body)
body = body.getvalue()
# Add proper headers
if headers is None:
headers = {}
if depth is not None:
headers['Depth'] = depth
headers['Content-Type'] = 'text/xml; charset="utf-8"'
# Body encoding must be utf-8, 207 is proper response
self._request('PROPFIND', path, body=unicode('<?xml version="1.0" encoding="utf-8" ?>\n'+body, 'utf-8'), headers=headers)
if self.response is not None and hasattr(self.response, 'tree') is True:
property_responses = {}
for response in self.response.tree._children:
property_href = response.find('{DAV:}href')
property_stat = response.find('{DAV:}propstat')
def parse_props(props):
property_dict = {}
for prop in props:
if prop.tag.find('{DAV:}') is not -1:
name = prop.tag.split('}')[-1]
else:
name = prop.tag
if len(prop._children) is not 0:
property_dict[name] = parse_props(prop._children)
else:
property_dict[name] = prop.text
return property_dict
if property_href is not None and property_stat is not None:
property_dict = parse_props(property_stat.find('{DAV:}prop')._children)
property_responses[property_href.text] = property_dict
return property_responses
def proppatch(self, path, set_props=None, remove_props=None, namespace='DAV:', headers=None):
"""Patch properties on a DAV resource. If namespace is not specified the DAV namespace is used for all properties"""
root = ElementTree.Element('{DAV:}propertyupdate')
if set_props is not None:
prop_set = ElementTree.SubElement(root, '{DAV:}set')
object_to_etree(prop_set, set_props, namespace=namespace)
if remove_props is not None:
prop_remove = ElementTree.SubElement(root, '{DAV:}remove')
object_to_etree(prop_remove, remove_props, namespace=namespace)
tree = ElementTree.ElementTree(root)
# Add proper headers
if headers is None:
headers = {}
headers['Content-Type'] = 'text/xml; charset="utf-8"'
self._request('PROPPATCH', path, body=unicode('<?xml version="1.0" encoding="utf-8" ?>\n'+body, 'utf-8'), headers=headers)
def set_lock(self, path, owner, locktype='exclusive', lockscope='write', depth=None, headers=None):
"""Set a lock on a dav resource"""
root = ElementTree.Element('{DAV:}lockinfo')
object_to_etree(root, {'locktype':locktype, 'lockscope':lockscope, 'owner':{'href':owner}}, namespace='DAV:')
tree = ElementTree.ElementTree(root)
# Add proper headers
if headers is None:
headers = {}
if depth is not None:
headers['Depth'] = depth
headers['Content-Type'] = 'text/xml; charset="utf-8"'
headers['Timeout'] = 'Infinite, Second-4100000000'
self._request('LOCK', path, body=unicode('<?xml version="1.0" encoding="utf-8" ?>\n'+body, 'utf-8'), headers=headers)
locks = self.response.etree.finall('.//{DAV:}locktoken')
lock_list = []
for lock in locks:
lock_list.append(lock.getchildren()[0].text.strip().strip('\n'))
return lock_list
def refresh_lock(self, path, token, headers=None):
"""Refresh lock with token"""
if headers is None:
headers = {}
headers['If'] = '(<%s>)' % token
headers['Timeout'] = 'Infinite, Second-4100000000'
self._request('LOCK', path, body=None, headers=headers)
def unlock(self, path, token, headers=None):
"""Unlock DAV resource with token"""
if headers is None:
headers = {}
headers['Lock-Tocken'] = '<%s>' % token
self._request('UNLOCK', path, body=None, headers=headers)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment