Statistics
| Revision:

gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_file.py @ 959

History | View | Annotate | Download (6.25 KB)

1
# test_file.py -- Test for git files
2
# Copyright (C) 2010 Google, Inc.
3
#
4
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
5
# General Public License as public by the Free Software Foundation; version 2.0
6
# or (at your option) any later version. You can redistribute it and/or
7
# modify it under the terms of either of these two licenses.
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
#
15
# You should have received a copy of the licenses; if not, see
16
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
17
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
18
# License, Version 2.0.
19
#
20

    
21
import errno
22
import io
23
import os
24
import shutil
25
import sys
26
import tempfile
27

    
28
from dulwich.file import GitFile, _fancy_rename
29
from dulwich.tests import (
30
    SkipTest,
31
    TestCase,
32
    )
33

    
34

    
35
class FancyRenameTests(TestCase):
36

    
37
    def setUp(self):
38
        super(FancyRenameTests, self).setUp()
39
        self._tempdir = tempfile.mkdtemp()
40
        self.foo = self.path('foo')
41
        self.bar = self.path('bar')
42
        self.create(self.foo, b'foo contents')
43

    
44
    def tearDown(self):
45
        shutil.rmtree(self._tempdir)
46
        super(FancyRenameTests, self).tearDown()
47

    
48
    def path(self, filename):
49
        return os.path.join(self._tempdir, filename)
50

    
51
    def create(self, path, contents):
52
        f = open(path, 'wb')
53
        f.write(contents)
54
        f.close()
55

    
56
    def test_no_dest_exists(self):
57
        self.assertFalse(os.path.exists(self.bar))
58
        _fancy_rename(self.foo, self.bar)
59
        self.assertFalse(os.path.exists(self.foo))
60

    
61
        new_f = open(self.bar, 'rb')
62
        self.assertEqual(b'foo contents', new_f.read())
63
        new_f.close()
64

    
65
    def test_dest_exists(self):
66
        self.create(self.bar, b'bar contents')
67
        _fancy_rename(self.foo, self.bar)
68
        self.assertFalse(os.path.exists(self.foo))
69

    
70
        new_f = open(self.bar, 'rb')
71
        self.assertEqual(b'foo contents', new_f.read())
72
        new_f.close()
73

    
74
    def test_dest_opened(self):
75
        if sys.platform != "win32":
76
            raise SkipTest("platform allows overwriting open files")
77
        self.create(self.bar, b'bar contents')
78
        dest_f = open(self.bar, 'rb')
79
        self.assertRaises(OSError, _fancy_rename, self.foo, self.bar)
80
        dest_f.close()
81
        self.assertTrue(os.path.exists(self.path('foo')))
82

    
83
        new_f = open(self.foo, 'rb')
84
        self.assertEqual(b'foo contents', new_f.read())
85
        new_f.close()
86

    
87
        new_f = open(self.bar, 'rb')
88
        self.assertEqual(b'bar contents', new_f.read())
89
        new_f.close()
90

    
91

    
92
class GitFileTests(TestCase):
93

    
94
    def setUp(self):
95
        super(GitFileTests, self).setUp()
96
        self._tempdir = tempfile.mkdtemp()
97
        f = open(self.path('foo'), 'wb')
98
        f.write(b'foo contents')
99
        f.close()
100

    
101
    def tearDown(self):
102
        shutil.rmtree(self._tempdir)
103
        super(GitFileTests, self).tearDown()
104

    
105
    def path(self, filename):
106
        return os.path.join(self._tempdir, filename)
107

    
108
    def test_invalid(self):
109
        foo = self.path('foo')
110
        self.assertRaises(IOError, GitFile, foo, mode='r')
111
        self.assertRaises(IOError, GitFile, foo, mode='ab')
112
        self.assertRaises(IOError, GitFile, foo, mode='r+b')
113
        self.assertRaises(IOError, GitFile, foo, mode='w+b')
114
        self.assertRaises(IOError, GitFile, foo, mode='a+bU')
115

    
116
    def test_readonly(self):
117
        f = GitFile(self.path('foo'), 'rb')
118
        self.assertTrue(isinstance(f, io.IOBase))
119
        self.assertEqual(b'foo contents', f.read())
120
        self.assertEqual(b'', f.read())
121
        f.seek(4)
122
        self.assertEqual(b'contents', f.read())
123
        f.close()
124

    
125
    def test_default_mode(self):
126
        f = GitFile(self.path('foo'))
127
        self.assertEqual(b'foo contents', f.read())
128
        f.close()
129

    
130
    def test_write(self):
131
        foo = self.path('foo')
132
        foo_lock = '%s.lock' % foo
133

    
134
        orig_f = open(foo, 'rb')
135
        self.assertEqual(orig_f.read(), b'foo contents')
136
        orig_f.close()
137

    
138
        self.assertFalse(os.path.exists(foo_lock))
139
        f = GitFile(foo, 'wb')
140
        self.assertFalse(f.closed)
141
        self.assertRaises(AttributeError, getattr, f, 'not_a_file_property')
142

    
143
        self.assertTrue(os.path.exists(foo_lock))
144
        f.write(b'new stuff')
145
        f.seek(4)
146
        f.write(b'contents')
147
        f.close()
148
        self.assertFalse(os.path.exists(foo_lock))
149

    
150
        new_f = open(foo, 'rb')
151
        self.assertEqual(b'new contents', new_f.read())
152
        new_f.close()
153

    
154
    def test_open_twice(self):
155
        foo = self.path('foo')
156
        f1 = GitFile(foo, 'wb')
157
        f1.write(b'new')
158
        try:
159
            f2 = GitFile(foo, 'wb')
160
            self.fail()
161
        except OSError as e:
162
            self.assertEqual(errno.EEXIST, e.errno)
163
        else:
164
            f2.close()
165
        f1.write(b' contents')
166
        f1.close()
167

    
168
        # Ensure trying to open twice doesn't affect original.
169
        f = open(foo, 'rb')
170
        self.assertEqual(b'new contents', f.read())
171
        f.close()
172

    
173
    def test_abort(self):
174
        foo = self.path('foo')
175
        foo_lock = '%s.lock' % foo
176

    
177
        orig_f = open(foo, 'rb')
178
        self.assertEqual(orig_f.read(), b'foo contents')
179
        orig_f.close()
180

    
181
        f = GitFile(foo, 'wb')
182
        f.write(b'new contents')
183
        f.abort()
184
        self.assertTrue(f.closed)
185
        self.assertFalse(os.path.exists(foo_lock))
186

    
187
        new_orig_f = open(foo, 'rb')
188
        self.assertEqual(new_orig_f.read(), b'foo contents')
189
        new_orig_f.close()
190

    
191
    def test_abort_close(self):
192
        foo = self.path('foo')
193
        f = GitFile(foo, 'wb')
194
        f.abort()
195
        try:
196
            f.close()
197
        except (IOError, OSError):
198
            self.fail()
199

    
200
        f = GitFile(foo, 'wb')
201
        f.close()
202
        try:
203
            f.abort()
204
        except (IOError, OSError):
205
            self.fail()
206

    
207
    def test_abort_close_removed(self):
208
        foo = self.path('foo')
209
        f = GitFile(foo, 'wb')
210

    
211
        f._file.close()
212
        os.remove(foo+".lock")
213

    
214
        f.abort()
215
        self.assertTrue(f._closed)