mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-10-12 21:05:36 +00:00
cd63971121
--HG-- rename : testing/mozbase/mozfile/tests/test_remove.py => testing/mozbase/mozfile/tests/test_move_remove.py extra : commitid : 9JROGo4tYHS extra : rebase_source : f19d3b4eaf2e181f35f0bdbdb74e009246d8d1ce extra : amend_source : 388c870bdf7aa06956b8d8641fdf55303453835e
231 lines
7.9 KiB
Python
231 lines
7.9 KiB
Python
#!/usr/bin/env python
|
|
|
|
import os
|
|
import stat
|
|
import shutil
|
|
import threading
|
|
import time
|
|
import unittest
|
|
import errno
|
|
from contextlib import contextmanager
|
|
|
|
import mozfile
|
|
import mozinfo
|
|
|
|
import stubs
|
|
|
|
|
|
def mark_readonly(path):
|
|
"""Removes all write permissions from given file/directory.
|
|
|
|
:param path: path of directory/file of which modes must be changed
|
|
"""
|
|
mode = os.stat(path)[stat.ST_MODE]
|
|
os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
|
|
|
|
|
|
class FileOpenCloseThread(threading.Thread):
|
|
"""Helper thread for asynchronous file handling"""
|
|
def __init__(self, path, delay, delete=False):
|
|
threading.Thread.__init__(self)
|
|
self.file_opened = threading.Event()
|
|
self.delay = delay
|
|
self.path = path
|
|
self.delete = delete
|
|
|
|
def run(self):
|
|
with open(self.path):
|
|
self.file_opened.set()
|
|
time.sleep(self.delay)
|
|
if self.delete:
|
|
try:
|
|
os.remove(self.path)
|
|
except:
|
|
pass
|
|
|
|
|
|
@contextmanager
|
|
def wait_file_opened_in_thread(*args, **kwargs):
|
|
thread = FileOpenCloseThread(*args, **kwargs)
|
|
thread.start()
|
|
thread.file_opened.wait()
|
|
try:
|
|
yield thread
|
|
finally:
|
|
thread.join()
|
|
|
|
|
|
class MozfileRemoveTestCase(unittest.TestCase):
|
|
"""Test our ability to remove directories and files"""
|
|
|
|
def setUp(self):
|
|
# Generate a stub
|
|
self.tempdir = stubs.create_stub()
|
|
|
|
def tearDown(self):
|
|
if os.path.isdir(self.tempdir):
|
|
shutil.rmtree(self.tempdir)
|
|
|
|
def test_remove_directory(self):
|
|
"""Test the removal of a directory"""
|
|
self.assertTrue(os.path.isdir(self.tempdir))
|
|
mozfile.remove(self.tempdir)
|
|
self.assertFalse(os.path.exists(self.tempdir))
|
|
|
|
def test_remove_directory_with_open_file(self):
|
|
"""Test removing a directory with an open file"""
|
|
# Open a file in the generated stub
|
|
filepath = os.path.join(self.tempdir, *stubs.files[1])
|
|
f = file(filepath, 'w')
|
|
f.write('foo-bar')
|
|
|
|
# keep file open and then try removing the dir-tree
|
|
if mozinfo.isWin:
|
|
# On the Windows family WindowsError should be raised.
|
|
self.assertRaises(OSError, mozfile.remove, self.tempdir)
|
|
self.assertTrue(os.path.exists(self.tempdir))
|
|
else:
|
|
# Folder should be deleted on all other platforms
|
|
mozfile.remove(self.tempdir)
|
|
self.assertFalse(os.path.exists(self.tempdir))
|
|
|
|
def test_remove_closed_file(self):
|
|
"""Test removing a closed file"""
|
|
# Open a file in the generated stub
|
|
filepath = os.path.join(self.tempdir, *stubs.files[1])
|
|
with open(filepath, 'w') as f:
|
|
f.write('foo-bar')
|
|
|
|
# Folder should be deleted on all platforms
|
|
mozfile.remove(self.tempdir)
|
|
self.assertFalse(os.path.exists(self.tempdir))
|
|
|
|
def test_removing_open_file_with_retry(self):
|
|
"""Test removing a file in use with retry"""
|
|
filepath = os.path.join(self.tempdir, *stubs.files[1])
|
|
|
|
with wait_file_opened_in_thread(filepath, 0.2):
|
|
# on windows first attempt will fail,
|
|
# and it will be retried until the thread leave the handle
|
|
mozfile.remove(filepath)
|
|
|
|
# Check deletion was successful
|
|
self.assertFalse(os.path.exists(filepath))
|
|
|
|
def test_removing_already_deleted_file_with_retry(self):
|
|
"""Test removing a meanwhile removed file with retry"""
|
|
filepath = os.path.join(self.tempdir, *stubs.files[1])
|
|
|
|
with wait_file_opened_in_thread(filepath, 0.2, True):
|
|
# on windows first attempt will fail, and before
|
|
# the retry the opened file will be deleted in the thread
|
|
mozfile.remove(filepath)
|
|
|
|
# Check deletion was successful
|
|
self.assertFalse(os.path.exists(filepath))
|
|
|
|
def test_remove_readonly_tree(self):
|
|
"""Test removing a read-only directory"""
|
|
|
|
dirpath = os.path.join(self.tempdir, "nested_tree")
|
|
mark_readonly(dirpath)
|
|
|
|
# However, mozfile should change write permissions and remove dir.
|
|
mozfile.remove(dirpath)
|
|
|
|
self.assertFalse(os.path.exists(dirpath))
|
|
|
|
def test_remove_readonly_file(self):
|
|
"""Test removing read-only files"""
|
|
filepath = os.path.join(self.tempdir, *stubs.files[1])
|
|
mark_readonly(filepath)
|
|
|
|
# However, mozfile should change write permission and then remove file.
|
|
mozfile.remove(filepath)
|
|
|
|
self.assertFalse(os.path.exists(filepath))
|
|
|
|
@unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
|
|
def test_remove_symlink(self):
|
|
"""Test removing a symlink"""
|
|
file_path = os.path.join(self.tempdir, *stubs.files[1])
|
|
symlink_path = os.path.join(self.tempdir, 'symlink')
|
|
|
|
os.symlink(file_path, symlink_path)
|
|
self.assertTrue(os.path.islink(symlink_path))
|
|
|
|
# The linked folder and files should not be deleted
|
|
mozfile.remove(symlink_path)
|
|
self.assertFalse(os.path.exists(symlink_path))
|
|
self.assertTrue(os.path.exists(file_path))
|
|
|
|
@unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
|
|
def test_remove_symlink_in_subfolder(self):
|
|
"""Test removing a folder with an contained symlink"""
|
|
file_path = os.path.join(self.tempdir, *stubs.files[0])
|
|
dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1]))
|
|
symlink_path = os.path.join(dir_path, 'symlink')
|
|
|
|
os.symlink(file_path, symlink_path)
|
|
self.assertTrue(os.path.islink(symlink_path))
|
|
|
|
# The folder with the contained symlink will be deleted but not the
|
|
# original linked file
|
|
mozfile.remove(dir_path)
|
|
self.assertFalse(os.path.exists(dir_path))
|
|
self.assertFalse(os.path.exists(symlink_path))
|
|
self.assertTrue(os.path.exists(file_path))
|
|
|
|
@unittest.skipIf(mozinfo.isWin or not os.geteuid(),
|
|
"Symlinks are not supported on Windows and cannot run test as root")
|
|
def test_remove_symlink_for_system_path(self):
|
|
"""Test removing a symlink which points to a system folder"""
|
|
symlink_path = os.path.join(self.tempdir, 'symlink')
|
|
|
|
os.symlink(os.path.dirname(self.tempdir), symlink_path)
|
|
self.assertTrue(os.path.islink(symlink_path))
|
|
|
|
# The folder with the contained symlink will be deleted but not the
|
|
# original linked file
|
|
mozfile.remove(symlink_path)
|
|
self.assertFalse(os.path.exists(symlink_path))
|
|
|
|
def test_remove_path_that_does_not_exists(self):
|
|
not_existing_path = os.path.join(self.tempdir, 'I_do_not_not_exists')
|
|
try:
|
|
mozfile.remove(not_existing_path)
|
|
except OSError, exc:
|
|
if exc.errno == errno.ENOENT:
|
|
self.fail("removing non existing path must not raise error")
|
|
raise
|
|
|
|
|
|
class MozFileMoveTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
# Generate a stub
|
|
self.tempdir = stubs.create_stub()
|
|
self.addCleanup(mozfile.rmtree, self.tempdir)
|
|
|
|
def test_move_file(self):
|
|
file_path = os.path.join(self.tempdir, *stubs.files[1])
|
|
moved_path = file_path + '.moved'
|
|
self.assertTrue(os.path.isfile(file_path))
|
|
self.assertFalse(os.path.exists(moved_path))
|
|
mozfile.move(file_path, moved_path)
|
|
self.assertFalse(os.path.exists(file_path))
|
|
self.assertTrue(os.path.isfile(moved_path))
|
|
|
|
def test_move_file_with_retry(self):
|
|
file_path = os.path.join(self.tempdir, *stubs.files[1])
|
|
moved_path = file_path + '.moved'
|
|
|
|
with wait_file_opened_in_thread(file_path, 0.2):
|
|
# first move attempt should fail on windows and be retried
|
|
mozfile.move(file_path, moved_path)
|
|
self.assertFalse(os.path.exists(file_path))
|
|
self.assertTrue(os.path.isfile(moved_path))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|