Statistics
| Revision:

gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_index.py @ 959

History | View | Annotate | Download (20.2 KB)

1
# -*- coding: utf-8 -*-
2
# test_index.py -- Tests for the git index
3
# encoding: utf-8
4
# Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5
#
6
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7
# General Public License as public by the Free Software Foundation; version 2.0
8
# or (at your option) any later version. You can redistribute it and/or
9
# modify it under the terms of either of these two licenses.
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
#
17
# You should have received a copy of the licenses; if not, see
18
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20
# License, Version 2.0.
21
#
22

    
23
"""Tests for the index."""
24

    
25

    
26
from io import BytesIO
27
import os
28
import shutil
29
import stat
30
import struct
31
import sys
32
import tempfile
33

    
34
from dulwich.index import (
35
    Index,
36
    build_index_from_tree,
37
    cleanup_mode,
38
    commit_tree,
39
    get_unstaged_changes,
40
    index_entry_from_stat,
41
    read_index,
42
    read_index_dict,
43
    validate_path_element_default,
44
    validate_path_element_ntfs,
45
    write_cache_time,
46
    write_index,
47
    write_index_dict,
48
    _tree_to_fs_path,
49
    _fs_to_tree_path,
50
    )
51
from dulwich.object_store import (
52
    MemoryObjectStore,
53
    )
54
from dulwich.objects import (
55
    Blob,
56
    Commit,
57
    Tree,
58
    S_IFGITLINK,
59
    )
60
from dulwich.repo import Repo
61
from dulwich.tests import (
62
    TestCase,
63
    skipIf,
64
    )
65

    
66
class IndexTestCase(TestCase):
67

    
68
    datadir = os.path.join(os.path.dirname(__file__), 'data/indexes')
69

    
70
    def get_simple_index(self, name):
71
        return Index(os.path.join(self.datadir, name))
72

    
73

    
74
class SimpleIndexTestCase(IndexTestCase):
75

    
76
    def test_len(self):
77
        self.assertEqual(1, len(self.get_simple_index("index")))
78

    
79
    def test_iter(self):
80
        self.assertEqual([b'bla'], list(self.get_simple_index("index")))
81

    
82
    def test_getitem(self):
83
        self.assertEqual(((1230680220, 0), (1230680220, 0), 2050, 3761020,
84
                           33188, 1000, 1000, 0,
85
                           b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0),
86
                          self.get_simple_index("index")[b"bla"])
87

    
88
    def test_empty(self):
89
        i = self.get_simple_index("notanindex")
90
        self.assertEqual(0, len(i))
91
        self.assertFalse(os.path.exists(i._filename))
92

    
93
    def test_against_empty_tree(self):
94
        i = self.get_simple_index("index")
95
        changes = list(i.changes_from_tree(MemoryObjectStore(), None))
96
        self.assertEqual(1, len(changes))
97
        (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
98
        self.assertEqual(b'bla', newname)
99
        self.assertEqual(b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', newsha)
100

    
101
class SimpleIndexWriterTestCase(IndexTestCase):
102

    
103
    def setUp(self):
104
        IndexTestCase.setUp(self)
105
        self.tempdir = tempfile.mkdtemp()
106

    
107
    def tearDown(self):
108
        IndexTestCase.tearDown(self)
109
        shutil.rmtree(self.tempdir)
110

    
111
    def test_simple_write(self):
112
        entries = [(b'barbla', (1230680220, 0), (1230680220, 0), 2050, 3761020,
113
                    33188, 1000, 1000, 0,
114
                    b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)]
115
        filename = os.path.join(self.tempdir, 'test-simple-write-index')
116
        with open(filename, 'wb+') as x:
117
            write_index(x, entries)
118

    
119
        with open(filename, 'rb') as x:
120
            self.assertEqual(entries, list(read_index(x)))
121

    
122

    
123
class ReadIndexDictTests(IndexTestCase):
124

    
125
    def setUp(self):
126
        IndexTestCase.setUp(self)
127
        self.tempdir = tempfile.mkdtemp()
128

    
129
    def tearDown(self):
130
        IndexTestCase.tearDown(self)
131
        shutil.rmtree(self.tempdir)
132

    
133
    def test_simple_write(self):
134
        entries = {b'barbla': ((1230680220, 0), (1230680220, 0), 2050, 3761020,
135
                    33188, 1000, 1000, 0,
136
                    b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)}
137
        filename = os.path.join(self.tempdir, 'test-simple-write-index')
138
        with open(filename, 'wb+') as x:
139
            write_index_dict(x, entries)
140

    
141
        with open(filename, 'rb') as x:
142
            self.assertEqual(entries, read_index_dict(x))
143

    
144

    
145
class CommitTreeTests(TestCase):
146

    
147
    def setUp(self):
148
        super(CommitTreeTests, self).setUp()
149
        self.store = MemoryObjectStore()
150

    
151
    def test_single_blob(self):
152
        blob = Blob()
153
        blob.data = b"foo"
154
        self.store.add_object(blob)
155
        blobs = [(b"bla", blob.id, stat.S_IFREG)]
156
        rootid = commit_tree(self.store, blobs)
157
        self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
158
        self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
159
        self.assertEqual(set([rootid, blob.id]), set(self.store._data.keys()))
160

    
161
    def test_nested(self):
162
        blob = Blob()
163
        blob.data = b"foo"
164
        self.store.add_object(blob)
165
        blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
166
        rootid = commit_tree(self.store, blobs)
167
        self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
168
        dirid = self.store[rootid][b"bla"][1]
169
        self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
170
        self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
171
        self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
172
        self.assertEqual(set([rootid, dirid, blob.id]),
173
                          set(self.store._data.keys()))
174

    
175

    
176
class CleanupModeTests(TestCase):
177

    
178
    def test_file(self):
179
        self.assertEqual(0o100644, cleanup_mode(0o100000))
180

    
181
    def test_executable(self):
182
        self.assertEqual(0o100755, cleanup_mode(0o100711))
183

    
184
    def test_symlink(self):
185
        self.assertEqual(0o120000, cleanup_mode(0o120711))
186

    
187
    def test_dir(self):
188
        self.assertEqual(0o040000, cleanup_mode(0o40531))
189

    
190
    def test_submodule(self):
191
        self.assertEqual(0o160000, cleanup_mode(0o160744))
192

    
193

    
194
class WriteCacheTimeTests(TestCase):
195

    
196
    def test_write_string(self):
197
        f = BytesIO()
198
        self.assertRaises(TypeError, write_cache_time, f, "foo")
199

    
200
    def test_write_int(self):
201
        f = BytesIO()
202
        write_cache_time(f, 434343)
203
        self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
204

    
205
    def test_write_tuple(self):
206
        f = BytesIO()
207
        write_cache_time(f, (434343, 21))
208
        self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
209

    
210
    def test_write_float(self):
211
        f = BytesIO()
212
        write_cache_time(f, 434343.000000021)
213
        self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
214

    
215

    
216
class IndexEntryFromStatTests(TestCase):
217

    
218
    def test_simple(self):
219
        st = os.stat_result((16877, 131078, 64769,
220
                154, 1000, 1000, 12288,
221
                1323629595, 1324180496, 1324180496))
222
        entry = index_entry_from_stat(st, "22" * 20, 0)
223
        self.assertEqual(entry, (
224
            1324180496,
225
            1324180496,
226
            64769,
227
            131078,
228
            16384,
229
            1000,
230
            1000,
231
            12288,
232
            '2222222222222222222222222222222222222222',
233
            0))
234

    
235
    def test_override_mode(self):
236
        st = os.stat_result((stat.S_IFREG + 0o644, 131078, 64769,
237
                154, 1000, 1000, 12288,
238
                1323629595, 1324180496, 1324180496))
239
        entry = index_entry_from_stat(st, "22" * 20, 0,
240
                mode=stat.S_IFREG + 0o755)
241
        self.assertEqual(entry, (
242
            1324180496,
243
            1324180496,
244
            64769,
245
            131078,
246
            33261,
247
            1000,
248
            1000,
249
            12288,
250
            '2222222222222222222222222222222222222222',
251
            0))
252

    
253

    
254
class BuildIndexTests(TestCase):
255

    
256
    def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha):
257
        self.assertEqual(index_entry[4], mode)  # mode
258
        self.assertEqual(index_entry[7], filesize)  # filesize
259
        self.assertEqual(index_entry[8], sha)  # sha
260

    
261
    def assertFileContents(self, path, contents, symlink=False):
262
        if symlink:
263
            self.assertEqual(os.readlink(path), contents)
264
        else:
265
            with open(path, 'rb') as f:
266
                self.assertEqual(f.read(), contents)
267

    
268
    def test_empty(self):
269
        repo_dir = tempfile.mkdtemp()
270
        self.addCleanup(shutil.rmtree, repo_dir)
271
        with Repo.init(repo_dir) as repo:
272
            tree = Tree()
273
            repo.object_store.add_object(tree)
274

    
275
            build_index_from_tree(repo.path, repo.index_path(),
276
                    repo.object_store, tree.id)
277

    
278
            # Verify index entries
279
            index = repo.open_index()
280
            self.assertEqual(len(index), 0)
281

    
282
            # Verify no files
283
            self.assertEqual(['.git'], os.listdir(repo.path))
284

    
285
    def test_git_dir(self):
286
        repo_dir = tempfile.mkdtemp()
287
        self.addCleanup(shutil.rmtree, repo_dir)
288
        with Repo.init(repo_dir) as repo:
289

    
290
            # Populate repo
291
            filea = Blob.from_string(b'file a')
292
            filee = Blob.from_string(b'd')
293

    
294
            tree = Tree()
295
            tree[b'.git/a'] = (stat.S_IFREG | 0o644, filea.id)
296
            tree[b'c/e'] = (stat.S_IFREG | 0o644, filee.id)
297

    
298
            repo.object_store.add_objects([(o, None)
299
                for o in [filea, filee, tree]])
300

    
301
            build_index_from_tree(repo.path, repo.index_path(),
302
                    repo.object_store, tree.id)
303

    
304
            # Verify index entries
305
            index = repo.open_index()
306
            self.assertEqual(len(index), 1)
307

    
308
            # filea
309
            apath = os.path.join(repo.path, '.git', 'a')
310
            self.assertFalse(os.path.exists(apath))
311

    
312
            # filee
313
            epath = os.path.join(repo.path, 'c', 'e')
314
            self.assertTrue(os.path.exists(epath))
315
            self.assertReasonableIndexEntry(index[b'c/e'],
316
                stat.S_IFREG | 0o644, 1, filee.id)
317
            self.assertFileContents(epath, b'd')
318

    
319
    def test_nonempty(self):
320
        repo_dir = tempfile.mkdtemp()
321
        self.addCleanup(shutil.rmtree, repo_dir)
322
        with Repo.init(repo_dir) as repo:
323

    
324
            # Populate repo
325
            filea = Blob.from_string(b'file a')
326
            fileb = Blob.from_string(b'file b')
327
            filed = Blob.from_string(b'file d')
328

    
329
            tree = Tree()
330
            tree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
331
            tree[b'b'] = (stat.S_IFREG | 0o644, fileb.id)
332
            tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
333

    
334
            repo.object_store.add_objects([(o, None)
335
                for o in [filea, fileb, filed, tree]])
336

    
337
            build_index_from_tree(repo.path, repo.index_path(),
338
                    repo.object_store, tree.id)
339

    
340
            # Verify index entries
341
            index = repo.open_index()
342
            self.assertEqual(len(index), 3)
343

    
344
            # filea
345
            apath = os.path.join(repo.path, 'a')
346
            self.assertTrue(os.path.exists(apath))
347
            self.assertReasonableIndexEntry(index[b'a'],
348
                stat.S_IFREG | 0o644, 6, filea.id)
349
            self.assertFileContents(apath, b'file a')
350

    
351
            # fileb
352
            bpath = os.path.join(repo.path, 'b')
353
            self.assertTrue(os.path.exists(bpath))
354
            self.assertReasonableIndexEntry(index[b'b'],
355
                stat.S_IFREG | 0o644, 6, fileb.id)
356
            self.assertFileContents(bpath, b'file b')
357

    
358
            # filed
359
            dpath = os.path.join(repo.path, 'c', 'd')
360
            self.assertTrue(os.path.exists(dpath))
361
            self.assertReasonableIndexEntry(index[b'c/d'],
362
                stat.S_IFREG | 0o644, 6, filed.id)
363
            self.assertFileContents(dpath, b'file d')
364

    
365
            # Verify no extra files
366
            self.assertEqual(['.git', 'a', 'b', 'c'],
367
                sorted(os.listdir(repo.path)))
368
            self.assertEqual(['d'],
369
                sorted(os.listdir(os.path.join(repo.path, 'c'))))
370

    
371
    def test_norewrite(self):
372
        sync = getattr(os, 'sync', lambda: os.system('sync'))
373
        repo_dir = tempfile.mkdtemp()
374
        self.addCleanup(shutil.rmtree, repo_dir)
375
        with Repo.init(repo_dir) as repo:
376
            # Populate repo
377
            filea = Blob.from_string(b'file a')
378
            filea_path = os.path.join(repo_dir, 'a')
379
            tree = Tree()
380
            tree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
381

    
382
            repo.object_store.add_objects([(o, None)
383
                for o in [filea, tree]])
384

    
385
            # First Write
386
            build_index_from_tree(repo.path, repo.index_path(),
387
                                  repo.object_store, tree.id)
388
            # Use sync as metadata can be cached on some FS
389
            sync()
390
            mtime = os.stat(filea_path).st_mtime
391

    
392
            # Test Rewrite
393
            build_index_from_tree(repo.path, repo.index_path(),
394
                                  repo.object_store, tree.id)
395
            sync()
396
            self.assertEqual(mtime, os.stat(filea_path).st_mtime)
397

    
398
            # Modify content
399
            with open(filea_path, 'wb') as fh:
400
                fh.write(b'test a')
401
            sync()
402
            mtime = os.stat(filea_path).st_mtime
403

    
404
            # Test rewrite
405
            build_index_from_tree(repo.path, repo.index_path(),
406
                                  repo.object_store, tree.id)
407
            sync()
408
            with open(filea_path, 'rb') as fh:
409
                self.assertEqual(b'file a', fh.read())
410

    
411

    
412
    @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
413
    def test_symlink(self):
414
        repo_dir = tempfile.mkdtemp()
415
        self.addCleanup(shutil.rmtree, repo_dir)
416
        with Repo.init(repo_dir) as repo:
417

    
418
            # Populate repo
419
            filed = Blob.from_string(b'file d')
420
            filee = Blob.from_string(b'd')
421

    
422
            tree = Tree()
423
            tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
424
            tree[b'c/e'] = (stat.S_IFLNK, filee.id)  # symlink
425

    
426
            repo.object_store.add_objects([(o, None)
427
                for o in [filed, filee, tree]])
428

    
429
            build_index_from_tree(repo.path, repo.index_path(),
430
                    repo.object_store, tree.id)
431

    
432
            # Verify index entries
433
            index = repo.open_index()
434

    
435
            # symlink to d
436
            epath = os.path.join(repo.path, 'c', 'e')
437
            self.assertTrue(os.path.exists(epath))
438
            self.assertReasonableIndexEntry(
439
                index[b'c/e'], stat.S_IFLNK,
440
                0 if sys.platform == 'win32' else 1,
441
                filee.id)
442
            self.assertFileContents(epath, 'd', symlink=True)
443

    
444
    def test_no_decode_encode(self):
445
        repo_dir = tempfile.mkdtemp()
446
        repo_dir_bytes = repo_dir.encode(sys.getfilesystemencoding())
447
        self.addCleanup(shutil.rmtree, repo_dir)
448
        with Repo.init(repo_dir) as repo:
449

    
450
            # Populate repo
451
            file = Blob.from_string(b'foo')
452

    
453
            tree = Tree()
454
            latin1_name = u'À'.encode('latin1')
455
            utf8_name = u'À'.encode('utf8')
456
            tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
457
            tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
458

    
459
            repo.object_store.add_objects(
460
                [(o, None) for o in [file, tree]])
461

    
462
            build_index_from_tree(
463
                repo.path, repo.index_path(),
464
                repo.object_store, tree.id)
465

    
466
            # Verify index entries
467
            index = repo.open_index()
468

    
469
            latin1_path = os.path.join(repo_dir_bytes, latin1_name)
470
            self.assertTrue(os.path.exists(latin1_path))
471

    
472
            utf8_path = os.path.join(repo_dir_bytes, utf8_name)
473
            self.assertTrue(os.path.exists(utf8_path))
474

    
475
    def test_git_submodule(self):
476
        repo_dir = tempfile.mkdtemp()
477
        self.addCleanup(shutil.rmtree, repo_dir)
478
        with Repo.init(repo_dir) as repo:
479
            filea = Blob.from_string(b'file alalala')
480

    
481
            subtree = Tree()
482
            subtree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
483

    
484
            c = Commit()
485
            c.tree = subtree.id
486
            c.committer = c.author = b'Somebody <somebody@example.com>'
487
            c.commit_time = c.author_time = 42342
488
            c.commit_timezone = c.author_timezone = 0
489
            c.parents = []
490
            c.message = b'Subcommit'
491

    
492
            tree = Tree()
493
            tree[b'c'] = (S_IFGITLINK, c.id)
494

    
495
            repo.object_store.add_objects(
496
                [(o, None) for o in [tree]])
497

    
498
            build_index_from_tree(repo.path, repo.index_path(),
499
                    repo.object_store, tree.id)
500

    
501
            # Verify index entries
502
            index = repo.open_index()
503
            self.assertEqual(len(index), 1)
504

    
505
            # filea
506
            apath = os.path.join(repo.path, 'c/a')
507
            self.assertFalse(os.path.exists(apath))
508

    
509
            # dir c
510
            cpath = os.path.join(repo.path, 'c')
511
            self.assertTrue(os.path.isdir(cpath))
512
            self.assertEqual(index[b'c'][4], S_IFGITLINK)  # mode
513
            self.assertEqual(index[b'c'][8], c.id)  # sha
514

    
515

    
516
class GetUnstagedChangesTests(TestCase):
517

    
518
    def test_get_unstaged_changes(self):
519
        """Unit test for get_unstaged_changes."""
520

    
521
        repo_dir = tempfile.mkdtemp()
522
        self.addCleanup(shutil.rmtree, repo_dir)
523
        with Repo.init(repo_dir) as repo:
524

    
525
            # Commit a dummy file then modify it
526
            foo1_fullpath = os.path.join(repo_dir, 'foo1')
527
            with open(foo1_fullpath, 'wb') as f:
528
                f.write(b'origstuff')
529

    
530
            foo2_fullpath = os.path.join(repo_dir, 'foo2')
531
            with open(foo2_fullpath, 'wb') as f:
532
                f.write(b'origstuff')
533

    
534
            repo.stage(['foo1', 'foo2'])
535
            repo.do_commit(b'test status', author=b'', committer=b'')
536

    
537
            with open(foo1_fullpath, 'wb') as f:
538
                f.write(b'newstuff')
539

    
540
            # modify access and modify time of path
541
            os.utime(foo1_fullpath, (0, 0))
542

    
543
            changes = get_unstaged_changes(repo.open_index(), repo_dir)
544

    
545
            self.assertEqual(list(changes), [b'foo1'])
546

    
547
    def test_get_unstaged_deleted_changes(self):
548
        """Unit test for get_unstaged_changes."""
549

    
550
        repo_dir = tempfile.mkdtemp()
551
        self.addCleanup(shutil.rmtree, repo_dir)
552
        with Repo.init(repo_dir) as repo:
553

    
554
            # Commit a dummy file then remove it
555
            foo1_fullpath = os.path.join(repo_dir, 'foo1')
556
            with open(foo1_fullpath, 'wb') as f:
557
                f.write(b'origstuff')
558

    
559
            repo.stage(['foo1'])
560
            repo.do_commit(b'test status', author=b'', committer=b'')
561

    
562
            os.unlink(foo1_fullpath)
563

    
564
            changes = get_unstaged_changes(repo.open_index(), repo_dir)
565

    
566
            self.assertEqual(list(changes), [b'foo1'])
567

    
568

    
569
class TestValidatePathElement(TestCase):
570

    
571
    def test_default(self):
572
        self.assertTrue(validate_path_element_default(b"bla"))
573
        self.assertTrue(validate_path_element_default(b".bla"))
574
        self.assertFalse(validate_path_element_default(b".git"))
575
        self.assertFalse(validate_path_element_default(b".giT"))
576
        self.assertFalse(validate_path_element_default(b".."))
577
        self.assertTrue(validate_path_element_default(b"git~1"))
578

    
579
    def test_ntfs(self):
580
        self.assertTrue(validate_path_element_ntfs(b"bla"))
581
        self.assertTrue(validate_path_element_ntfs(b".bla"))
582
        self.assertFalse(validate_path_element_ntfs(b".git"))
583
        self.assertFalse(validate_path_element_ntfs(b".giT"))
584
        self.assertFalse(validate_path_element_ntfs(b".."))
585
        self.assertFalse(validate_path_element_ntfs(b"git~1"))
586

    
587

    
588
class TestTreeFSPathConversion(TestCase):
589

    
590
    def test_tree_to_fs_path(self):
591
        tree_path = u'délwíçh/foo'.encode('utf8')
592
        fs_path = _tree_to_fs_path(b'/prefix/path', tree_path)
593
        self.assertEqual(
594
            fs_path,
595
            os.path.join(u'/prefix/path', u'délwíçh', u'foo').encode('utf8'))
596

    
597
    def test_fs_to_tree_path_str(self):
598
        fs_path = os.path.join(os.path.join(u'délwíçh', u'foo'))
599
        tree_path = _fs_to_tree_path(fs_path, "utf-8")
600
        self.assertEqual(tree_path, u'délwíçh/foo'.encode("utf-8"))
601

    
602
    def test_fs_to_tree_path_bytes(self):
603
        fs_path = os.path.join(os.path.join(u'délwíçh', u'foo').encode('utf8'))
604
        tree_path = _fs_to_tree_path(fs_path, "utf-8")
605
        self.assertEqual(tree_path, u'délwíçh/foo'.encode('utf8'))