gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_file.py @ 959
History | View | Annotate | Download (6.25 KB)
1 |
# test_file.py -- Test for git files
|
---|---|
2 |
# Copyright (C) 2010 Google, Inc.
|
3 |
#
|
4 |
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
|
5 |
# General Public License as public by the Free Software Foundation; version 2.0
|
6 |
# or (at your option) any later version. You can redistribute it and/or
|
7 |
# modify it under the terms of either of these two licenses.
|
8 |
#
|
9 |
# Unless required by applicable law or agreed to in writing, software
|
10 |
# distributed under the License is distributed on an "AS IS" BASIS,
|
11 |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 |
# See the License for the specific language governing permissions and
|
13 |
# limitations under the License.
|
14 |
#
|
15 |
# You should have received a copy of the licenses; if not, see
|
16 |
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
|
17 |
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
|
18 |
# License, Version 2.0.
|
19 |
#
|
20 |
|
21 |
import errno |
22 |
import io |
23 |
import os |
24 |
import shutil |
25 |
import sys |
26 |
import tempfile |
27 |
|
28 |
from dulwich.file import GitFile, _fancy_rename |
29 |
from dulwich.tests import ( |
30 |
SkipTest, |
31 |
TestCase, |
32 |
) |
33 |
|
34 |
|
35 |
class FancyRenameTests(TestCase): |
36 |
|
37 |
def setUp(self): |
38 |
super(FancyRenameTests, self).setUp() |
39 |
self._tempdir = tempfile.mkdtemp()
|
40 |
self.foo = self.path('foo') |
41 |
self.bar = self.path('bar') |
42 |
self.create(self.foo, b'foo contents') |
43 |
|
44 |
def tearDown(self): |
45 |
shutil.rmtree(self._tempdir)
|
46 |
super(FancyRenameTests, self).tearDown() |
47 |
|
48 |
def path(self, filename): |
49 |
return os.path.join(self._tempdir, filename) |
50 |
|
51 |
def create(self, path, contents): |
52 |
f = open(path, 'wb') |
53 |
f.write(contents) |
54 |
f.close() |
55 |
|
56 |
def test_no_dest_exists(self): |
57 |
self.assertFalse(os.path.exists(self.bar)) |
58 |
_fancy_rename(self.foo, self.bar) |
59 |
self.assertFalse(os.path.exists(self.foo)) |
60 |
|
61 |
new_f = open(self.bar, 'rb') |
62 |
self.assertEqual(b'foo contents', new_f.read()) |
63 |
new_f.close() |
64 |
|
65 |
def test_dest_exists(self): |
66 |
self.create(self.bar, b'bar contents') |
67 |
_fancy_rename(self.foo, self.bar) |
68 |
self.assertFalse(os.path.exists(self.foo)) |
69 |
|
70 |
new_f = open(self.bar, 'rb') |
71 |
self.assertEqual(b'foo contents', new_f.read()) |
72 |
new_f.close() |
73 |
|
74 |
def test_dest_opened(self): |
75 |
if sys.platform != "win32": |
76 |
raise SkipTest("platform allows overwriting open files") |
77 |
self.create(self.bar, b'bar contents') |
78 |
dest_f = open(self.bar, 'rb') |
79 |
self.assertRaises(OSError, _fancy_rename, self.foo, self.bar) |
80 |
dest_f.close() |
81 |
self.assertTrue(os.path.exists(self.path('foo'))) |
82 |
|
83 |
new_f = open(self.foo, 'rb') |
84 |
self.assertEqual(b'foo contents', new_f.read()) |
85 |
new_f.close() |
86 |
|
87 |
new_f = open(self.bar, 'rb') |
88 |
self.assertEqual(b'bar contents', new_f.read()) |
89 |
new_f.close() |
90 |
|
91 |
|
92 |
class GitFileTests(TestCase): |
93 |
|
94 |
def setUp(self): |
95 |
super(GitFileTests, self).setUp() |
96 |
self._tempdir = tempfile.mkdtemp()
|
97 |
f = open(self.path('foo'), 'wb') |
98 |
f.write(b'foo contents')
|
99 |
f.close() |
100 |
|
101 |
def tearDown(self): |
102 |
shutil.rmtree(self._tempdir)
|
103 |
super(GitFileTests, self).tearDown() |
104 |
|
105 |
def path(self, filename): |
106 |
return os.path.join(self._tempdir, filename) |
107 |
|
108 |
def test_invalid(self): |
109 |
foo = self.path('foo') |
110 |
self.assertRaises(IOError, GitFile, foo, mode='r') |
111 |
self.assertRaises(IOError, GitFile, foo, mode='ab') |
112 |
self.assertRaises(IOError, GitFile, foo, mode='r+b') |
113 |
self.assertRaises(IOError, GitFile, foo, mode='w+b') |
114 |
self.assertRaises(IOError, GitFile, foo, mode='a+bU') |
115 |
|
116 |
def test_readonly(self): |
117 |
f = GitFile(self.path('foo'), 'rb') |
118 |
self.assertTrue(isinstance(f, io.IOBase)) |
119 |
self.assertEqual(b'foo contents', f.read()) |
120 |
self.assertEqual(b'', f.read()) |
121 |
f.seek(4)
|
122 |
self.assertEqual(b'contents', f.read()) |
123 |
f.close() |
124 |
|
125 |
def test_default_mode(self): |
126 |
f = GitFile(self.path('foo')) |
127 |
self.assertEqual(b'foo contents', f.read()) |
128 |
f.close() |
129 |
|
130 |
def test_write(self): |
131 |
foo = self.path('foo') |
132 |
foo_lock = '%s.lock' % foo
|
133 |
|
134 |
orig_f = open(foo, 'rb') |
135 |
self.assertEqual(orig_f.read(), b'foo contents') |
136 |
orig_f.close() |
137 |
|
138 |
self.assertFalse(os.path.exists(foo_lock))
|
139 |
f = GitFile(foo, 'wb')
|
140 |
self.assertFalse(f.closed)
|
141 |
self.assertRaises(AttributeError, getattr, f, 'not_a_file_property') |
142 |
|
143 |
self.assertTrue(os.path.exists(foo_lock))
|
144 |
f.write(b'new stuff')
|
145 |
f.seek(4)
|
146 |
f.write(b'contents')
|
147 |
f.close() |
148 |
self.assertFalse(os.path.exists(foo_lock))
|
149 |
|
150 |
new_f = open(foo, 'rb') |
151 |
self.assertEqual(b'new contents', new_f.read()) |
152 |
new_f.close() |
153 |
|
154 |
def test_open_twice(self): |
155 |
foo = self.path('foo') |
156 |
f1 = GitFile(foo, 'wb')
|
157 |
f1.write(b'new')
|
158 |
try:
|
159 |
f2 = GitFile(foo, 'wb')
|
160 |
self.fail()
|
161 |
except OSError as e: |
162 |
self.assertEqual(errno.EEXIST, e.errno)
|
163 |
else:
|
164 |
f2.close() |
165 |
f1.write(b' contents')
|
166 |
f1.close() |
167 |
|
168 |
# Ensure trying to open twice doesn't affect original.
|
169 |
f = open(foo, 'rb') |
170 |
self.assertEqual(b'new contents', f.read()) |
171 |
f.close() |
172 |
|
173 |
def test_abort(self): |
174 |
foo = self.path('foo') |
175 |
foo_lock = '%s.lock' % foo
|
176 |
|
177 |
orig_f = open(foo, 'rb') |
178 |
self.assertEqual(orig_f.read(), b'foo contents') |
179 |
orig_f.close() |
180 |
|
181 |
f = GitFile(foo, 'wb')
|
182 |
f.write(b'new contents')
|
183 |
f.abort() |
184 |
self.assertTrue(f.closed)
|
185 |
self.assertFalse(os.path.exists(foo_lock))
|
186 |
|
187 |
new_orig_f = open(foo, 'rb') |
188 |
self.assertEqual(new_orig_f.read(), b'foo contents') |
189 |
new_orig_f.close() |
190 |
|
191 |
def test_abort_close(self): |
192 |
foo = self.path('foo') |
193 |
f = GitFile(foo, 'wb')
|
194 |
f.abort() |
195 |
try:
|
196 |
f.close() |
197 |
except (IOError, OSError): |
198 |
self.fail()
|
199 |
|
200 |
f = GitFile(foo, 'wb')
|
201 |
f.close() |
202 |
try:
|
203 |
f.abort() |
204 |
except (IOError, OSError): |
205 |
self.fail()
|
206 |
|
207 |
def test_abort_close_removed(self): |
208 |
foo = self.path('foo') |
209 |
f = GitFile(foo, 'wb')
|
210 |
|
211 |
f._file.close() |
212 |
os.remove(foo+".lock")
|
213 |
|
214 |
f.abort() |
215 |
self.assertTrue(f._closed)
|