Statistics
| Revision:

gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_pack.py @ 959

History | View | Annotate | Download (38.3 KB)

1
# test_pack.py -- Tests for the handling of git packs.
2
# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3
# Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
4
#
5
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6
# General Public License as public by the Free Software Foundation; version 2.0
7
# or (at your option) any later version. You can redistribute it and/or
8
# modify it under the terms of either of these two licenses.
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
#
16
# You should have received a copy of the licenses; if not, see
17
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19
# License, Version 2.0.
20
#
21

    
22
"""Tests for Dulwich packs."""
23

    
24

    
25
from io import BytesIO
26
from hashlib import sha1
27
import os
28
import shutil
29
import tempfile
30
import zlib
31

    
32
from dulwich.errors import (
33
    ApplyDeltaError,
34
    ChecksumMismatch,
35
    )
36
from dulwich.file import (
37
    GitFile,
38
    )
39
from dulwich.object_store import (
40
    MemoryObjectStore,
41
    )
42
from dulwich.objects import (
43
    hex_to_sha,
44
    sha_to_hex,
45
    Commit,
46
    Tree,
47
    Blob,
48
    )
49
from dulwich.pack import (
50
    OFS_DELTA,
51
    REF_DELTA,
52
    MemoryPackIndex,
53
    Pack,
54
    PackData,
55
    apply_delta,
56
    create_delta,
57
    deltify_pack_objects,
58
    load_pack_index,
59
    UnpackedObject,
60
    read_zlib_chunks,
61
    write_pack_header,
62
    write_pack_index_v1,
63
    write_pack_index_v2,
64
    write_pack_object,
65
    write_pack,
66
    unpack_object,
67
    compute_file_sha,
68
    PackStreamReader,
69
    DeltaChainIterator,
70
    _delta_encode_size,
71
    _encode_copy_operation,
72
    )
73
from dulwich.tests import (
74
    TestCase,
75
    )
76
from dulwich.tests.utils import (
77
    make_object,
78
    build_pack,
79
    )
80

    
81
pack1_sha = b'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
82

    
83
a_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
84
tree_sha = b'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
85
commit_sha = b'f18faa16531ac570a3fdc8c7ca16682548dafd12'
86

    
87

    
88
class PackTests(TestCase):
89
    """Base class for testing packs"""
90

    
91
    def setUp(self):
92
        super(PackTests, self).setUp()
93
        self.tempdir = tempfile.mkdtemp()
94
        self.addCleanup(shutil.rmtree, self.tempdir)
95

    
96
    datadir = os.path.abspath(os.path.join(os.path.dirname(__file__),
97
        'data/packs'))
98

    
99
    def get_pack_index(self, sha):
100
        """Returns a PackIndex from the datadir with the given sha"""
101
        return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha.decode('ascii')))
102

    
103
    def get_pack_data(self, sha):
104
        """Returns a PackData object from the datadir with the given sha"""
105
        return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha.decode('ascii')))
106

    
107
    def get_pack(self, sha):
108
        return Pack(os.path.join(self.datadir, 'pack-%s' % sha.decode('ascii')))
109

    
110
    def assertSucceeds(self, func, *args, **kwargs):
111
        try:
112
            func(*args, **kwargs)
113
        except ChecksumMismatch as e:
114
            self.fail(e)
115

    
116

    
117
class PackIndexTests(PackTests):
118
    """Class that tests the index of packfiles"""
119

    
120
    def test_object_index(self):
121
        """Tests that the correct object offset is returned from the index."""
122
        p = self.get_pack_index(pack1_sha)
123
        self.assertRaises(KeyError, p.object_index, pack1_sha)
124
        self.assertEqual(p.object_index(a_sha), 178)
125
        self.assertEqual(p.object_index(tree_sha), 138)
126
        self.assertEqual(p.object_index(commit_sha), 12)
127

    
128
    def test_index_len(self):
129
        p = self.get_pack_index(pack1_sha)
130
        self.assertEqual(3, len(p))
131

    
132
    def test_get_stored_checksum(self):
133
        p = self.get_pack_index(pack1_sha)
134
        self.assertEqual(b'f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
135
                         sha_to_hex(p.get_stored_checksum()))
136
        self.assertEqual(b'721980e866af9a5f93ad674144e1459b8ba3e7b7',
137
                         sha_to_hex(p.get_pack_checksum()))
138

    
139
    def test_index_check(self):
140
        p = self.get_pack_index(pack1_sha)
141
        self.assertSucceeds(p.check)
142

    
143
    def test_iterentries(self):
144
        p = self.get_pack_index(pack1_sha)
145
        entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
146
        self.assertEqual([
147
            (b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
148
            (b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
149
            (b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
150
        ], entries)
151

    
152
    def test_iter(self):
153
        p = self.get_pack_index(pack1_sha)
154
        self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p))
155

    
156

    
157
class TestPackDeltas(TestCase):
158

    
159
    test_string1 = b'The answer was flailing in the wind'
160
    test_string2 = b'The answer was falling down the pipe'
161
    test_string3 = b'zzzzz'
162

    
163
    test_string_empty = b''
164
    test_string_big = b'Z' * 8192
165
    test_string_huge = b'Z' * 100000
166

    
167
    def _test_roundtrip(self, base, target):
168
        self.assertEqual(target,
169
                          b''.join(apply_delta(base, create_delta(base, target))))
170

    
171
    def test_nochange(self):
172
        self._test_roundtrip(self.test_string1, self.test_string1)
173

    
174
    def test_nochange_huge(self):
175
        self._test_roundtrip(self.test_string_huge, self.test_string_huge)
176

    
177
    def test_change(self):
178
        self._test_roundtrip(self.test_string1, self.test_string2)
179

    
180
    def test_rewrite(self):
181
        self._test_roundtrip(self.test_string1, self.test_string3)
182

    
183
    def test_empty_to_big(self):
184
        self._test_roundtrip(self.test_string_empty, self.test_string_big)
185

    
186
    def test_empty_to_huge(self):
187
        self._test_roundtrip(self.test_string_empty, self.test_string_huge)
188

    
189
    def test_huge_copy(self):
190
        self._test_roundtrip(self.test_string_huge + self.test_string1,
191
                             self.test_string_huge + self.test_string2)
192

    
193
    def test_dest_overflow(self):
194
        self.assertRaises(
195
            ApplyDeltaError,
196
            apply_delta, b'a'*0x10000, b'\x80\x80\x04\x80\x80\x04\x80' + b'a'*0x10000)
197
        self.assertRaises(
198
            ApplyDeltaError,
199
            apply_delta, b'', b'\x00\x80\x02\xb0\x11\x11')
200

    
201

    
202
class TestPackData(PackTests):
203
    """Tests getting the data from the packfile."""
204

    
205
    def test_create_pack(self):
206
        self.get_pack_data(pack1_sha).close()
207

    
208
    def test_from_file(self):
209
        path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha.decode('ascii'))
210
        with open(path, 'rb') as f:
211
            PackData.from_file(f, os.path.getsize(path))
212

    
213
    def test_pack_len(self):
214
        with self.get_pack_data(pack1_sha) as p:
215
            self.assertEqual(3, len(p))
216

    
217
    def test_index_check(self):
218
        with self.get_pack_data(pack1_sha) as p:
219
            self.assertSucceeds(p.check)
220

    
221
    def test_iterobjects(self):
222
        with self.get_pack_data(pack1_sha) as p:
223
            commit_data = (b'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
224
                           b'author James Westby <jw+debian@jameswestby.net> '
225
                           b'1174945067 +0100\n'
226
                           b'committer James Westby <jw+debian@jameswestby.net> '
227
                           b'1174945067 +0100\n'
228
                           b'\n'
229
                           b'Test commit\n')
230
            blob_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
231
            tree_data = b'100644 a\0' + hex_to_sha(blob_sha)
232
            actual = []
233
            for offset, type_num, chunks, crc32 in p.iterobjects():
234
                actual.append((offset, type_num, b''.join(chunks), crc32))
235
            self.assertEqual([
236
                (12, 1, commit_data, 3775879613),
237
                (138, 2, tree_data, 912998690),
238
                (178, 3, b'test 1\n', 1373561701)
239
                ], actual)
240

    
241
    def test_iterentries(self):
242
        with self.get_pack_data(pack1_sha) as p:
243
            entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
244
            self.assertEqual(set([
245
              (b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701),
246
              (b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690),
247
              (b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613),
248
              ]), entries)
249

    
250
    def test_create_index_v1(self):
251
        with self.get_pack_data(pack1_sha) as p:
252
            filename = os.path.join(self.tempdir, 'v1test.idx')
253
            p.create_index_v1(filename)
254
            idx1 = load_pack_index(filename)
255
            idx2 = self.get_pack_index(pack1_sha)
256
            self.assertEqual(idx1, idx2)
257

    
258
    def test_create_index_v2(self):
259
        with self.get_pack_data(pack1_sha) as p:
260
            filename = os.path.join(self.tempdir, 'v2test.idx')
261
            p.create_index_v2(filename)
262
            idx1 = load_pack_index(filename)
263
            idx2 = self.get_pack_index(pack1_sha)
264
            self.assertEqual(idx1, idx2)
265

    
266
    def test_compute_file_sha(self):
267
        f = BytesIO(b'abcd1234wxyz')
268
        self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(),
269
                         compute_file_sha(f).hexdigest())
270
        self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(),
271
                         compute_file_sha(f, buffer_size=5).hexdigest())
272
        self.assertEqual(sha1(b'abcd1234').hexdigest(),
273
                         compute_file_sha(f, end_ofs=-4).hexdigest())
274
        self.assertEqual(sha1(b'1234wxyz').hexdigest(),
275
                         compute_file_sha(f, start_ofs=4).hexdigest())
276
        self.assertEqual(
277
            sha1(b'1234').hexdigest(),
278
            compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest())
279

    
280
    def test_compute_file_sha_short_file(self):
281
        f = BytesIO(b'abcd1234wxyz')
282
        self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
283
        self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
284
        self.assertRaises(AssertionError, compute_file_sha, f, start_ofs=10,
285
            end_ofs=-12)
286

    
287

    
288
class TestPack(PackTests):
289

    
290
    def test_len(self):
291
        with self.get_pack(pack1_sha) as p:
292
            self.assertEqual(3, len(p))
293

    
294
    def test_contains(self):
295
        with self.get_pack(pack1_sha) as p:
296
            self.assertTrue(tree_sha in p)
297

    
298
    def test_get(self):
299
        with self.get_pack(pack1_sha) as p:
300
            self.assertEqual(type(p[tree_sha]), Tree)
301

    
302
    def test_iter(self):
303
        with self.get_pack(pack1_sha) as p:
304
            self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p))
305

    
306
    def test_iterobjects(self):
307
        with self.get_pack(pack1_sha) as p:
308
            expected = set([p[s] for s in [commit_sha, tree_sha, a_sha]])
309
            self.assertEqual(expected, set(list(p.iterobjects())))
310

    
311
    def test_pack_tuples(self):
312
        with self.get_pack(pack1_sha) as p:
313
            tuples = p.pack_tuples()
314
            expected = set([(p[s], None) for s in [commit_sha, tree_sha, a_sha]])
315
            self.assertEqual(expected, set(list(tuples)))
316
            self.assertEqual(expected, set(list(tuples)))
317
            self.assertEqual(3, len(tuples))
318

    
319
    def test_get_object_at(self):
320
        """Tests random access for non-delta objects"""
321
        with self.get_pack(pack1_sha) as p:
322
            obj = p[a_sha]
323
            self.assertEqual(obj.type_name, b'blob')
324
            self.assertEqual(obj.sha().hexdigest().encode('ascii'), a_sha)
325
            obj = p[tree_sha]
326
            self.assertEqual(obj.type_name, b'tree')
327
            self.assertEqual(obj.sha().hexdigest().encode('ascii'), tree_sha)
328
            obj = p[commit_sha]
329
            self.assertEqual(obj.type_name, b'commit')
330
            self.assertEqual(obj.sha().hexdigest().encode('ascii'), commit_sha)
331

    
332
    def test_copy(self):
333
        with self.get_pack(pack1_sha) as origpack:
334
            self.assertSucceeds(origpack.index.check)
335
            basename = os.path.join(self.tempdir, 'Elch')
336
            write_pack(basename, origpack.pack_tuples())
337

    
338
            with Pack(basename) as newpack:
339
                self.assertEqual(origpack, newpack)
340
                self.assertSucceeds(newpack.index.check)
341
                self.assertEqual(origpack.name(), newpack.name())
342
                self.assertEqual(origpack.index.get_pack_checksum(),
343
                                  newpack.index.get_pack_checksum())
344

    
345
                wrong_version = origpack.index.version != newpack.index.version
346
                orig_checksum = origpack.index.get_stored_checksum()
347
                new_checksum = newpack.index.get_stored_checksum()
348
                self.assertTrue(wrong_version or orig_checksum == new_checksum)
349

    
350
    def test_commit_obj(self):
351
        with self.get_pack(pack1_sha) as p:
352
            commit = p[commit_sha]
353
            self.assertEqual(b'James Westby <jw+debian@jameswestby.net>',
354
                             commit.author)
355
            self.assertEqual([], commit.parents)
356

    
357
    def _copy_pack(self, origpack):
358
        basename = os.path.join(self.tempdir, 'somepack')
359
        write_pack(basename, origpack.pack_tuples())
360
        return Pack(basename)
361

    
362
    def test_keep_no_message(self):
363
        with self.get_pack(pack1_sha) as p:
364
            p = self._copy_pack(p)
365

    
366
        with p:
367
            keepfile_name = p.keep()
368

    
369
        # file should exist
370
        self.assertTrue(os.path.exists(keepfile_name))
371

    
372
        with open(keepfile_name, 'r') as f:
373
            buf = f.read()
374
            self.assertEqual('', buf)
375

    
376
    def test_keep_message(self):
377
        with self.get_pack(pack1_sha) as p:
378
            p = self._copy_pack(p)
379

    
380
        msg = b'some message'
381
        with p:
382
            keepfile_name = p.keep(msg)
383

    
384
        # file should exist
385
        self.assertTrue(os.path.exists(keepfile_name))
386

    
387
        # and contain the right message, with a linefeed
388
        with open(keepfile_name, 'rb') as f:
389
            buf = f.read()
390
            self.assertEqual(msg + b'\n', buf)
391

    
392
    def test_name(self):
393
        with self.get_pack(pack1_sha) as p:
394
            self.assertEqual(pack1_sha, p.name())
395

    
396
    def test_length_mismatch(self):
397
        with self.get_pack_data(pack1_sha) as data:
398
            index = self.get_pack_index(pack1_sha)
399
            Pack.from_objects(data, index).check_length_and_checksum()
400

    
401
            data._file.seek(12)
402
            bad_file = BytesIO()
403
            write_pack_header(bad_file, 9999)
404
            bad_file.write(data._file.read())
405
            bad_file = BytesIO(bad_file.getvalue())
406
            bad_data = PackData('', file=bad_file)
407
            bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
408
            self.assertRaises(AssertionError, lambda: bad_pack.data)
409
            self.assertRaises(AssertionError,
410
                              lambda: bad_pack.check_length_and_checksum())
411

    
412
    def test_checksum_mismatch(self):
413
        with self.get_pack_data(pack1_sha) as data:
414
            index = self.get_pack_index(pack1_sha)
415
            Pack.from_objects(data, index).check_length_and_checksum()
416

    
417
            data._file.seek(0)
418
            bad_file = BytesIO(data._file.read()[:-20] + (b'\xff' * 20))
419
            bad_data = PackData('', file=bad_file)
420
            bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
421
            self.assertRaises(ChecksumMismatch, lambda: bad_pack.data)
422
            self.assertRaises(ChecksumMismatch, lambda:
423
                              bad_pack.check_length_and_checksum())
424

    
425
    def test_iterobjects_2(self):
426
        with self.get_pack(pack1_sha) as p:
427
            objs = dict((o.id, o) for o in p.iterobjects())
428
            self.assertEqual(3, len(objs))
429
            self.assertEqual(sorted(objs), sorted(p.index))
430
            self.assertTrue(isinstance(objs[a_sha], Blob))
431
            self.assertTrue(isinstance(objs[tree_sha], Tree))
432
            self.assertTrue(isinstance(objs[commit_sha], Commit))
433

    
434

    
435
class TestThinPack(PackTests):
436

    
437
    def setUp(self):
438
        super(TestThinPack, self).setUp()
439
        self.store = MemoryObjectStore()
440
        self.blobs = {}
441
        for blob in (b'foo', b'bar', b'foo1234', b'bar2468'):
442
            self.blobs[blob] = make_object(Blob, data=blob)
443
        self.store.add_object(self.blobs[b'foo'])
444
        self.store.add_object(self.blobs[b'bar'])
445

    
446
        # Build a thin pack. 'foo' is as an external reference, 'bar' an
447
        # internal reference.
448
        self.pack_dir = tempfile.mkdtemp()
449
        self.addCleanup(shutil.rmtree, self.pack_dir)
450
        self.pack_prefix = os.path.join(self.pack_dir, 'pack')
451

    
452
        with open(self.pack_prefix + '.pack', 'wb') as f:
453
            build_pack(f, [
454
                (REF_DELTA, (self.blobs[b'foo'].id, b'foo1234')),
455
                (Blob.type_num, b'bar'),
456
                (REF_DELTA, (self.blobs[b'bar'].id, b'bar2468'))],
457
                store=self.store)
458

    
459
        # Index the new pack.
460
        with self.make_pack(True) as pack:
461
            with PackData(pack._data_path) as data:
462
                data.pack = pack
463
                data.create_index(self.pack_prefix + '.idx')
464

    
465
        del self.store[self.blobs[b'bar'].id]
466

    
467
    def make_pack(self, resolve_ext_ref):
468
        return Pack(
469
            self.pack_prefix,
470
            resolve_ext_ref=self.store.get_raw if resolve_ext_ref else None)
471

    
472
    def test_get_raw(self):
473
        with self.make_pack(False) as p:
474
            self.assertRaises(
475
                KeyError, p.get_raw, self.blobs[b'foo1234'].id)
476
        with self.make_pack(True) as p:
477
            self.assertEqual(
478
                (3, b'foo1234'),
479
                p.get_raw(self.blobs[b'foo1234'].id))
480

    
481
    def test_iterobjects(self):
482
        with self.make_pack(False) as p:
483
            self.assertRaises(KeyError, list, p.iterobjects())
484
        with self.make_pack(True) as p:
485
            self.assertEqual(
486
                sorted([self.blobs[b'foo1234'].id, self.blobs[b'bar'].id,
487
                        self.blobs[b'bar2468'].id]),
488
                sorted(o.id for o in p.iterobjects()))
489

    
490

    
491
class WritePackTests(TestCase):
492

    
493
    def test_write_pack_header(self):
494
        f = BytesIO()
495
        write_pack_header(f, 42)
496
        self.assertEqual(b'PACK\x00\x00\x00\x02\x00\x00\x00*',
497
                         f.getvalue())
498

    
499
    def test_write_pack_object(self):
500
        f = BytesIO()
501
        f.write(b'header')
502
        offset = f.tell()
503
        crc32 = write_pack_object(f, Blob.type_num, b'blob')
504
        self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xffffffff)
505

    
506
        f.write(b'x')  # unpack_object needs extra trailing data.
507
        f.seek(offset)
508
        unpacked, unused = unpack_object(f.read, compute_crc32=True)
509
        self.assertEqual(Blob.type_num, unpacked.pack_type_num)
510
        self.assertEqual(Blob.type_num, unpacked.obj_type_num)
511
        self.assertEqual([b'blob'], unpacked.decomp_chunks)
512
        self.assertEqual(crc32, unpacked.crc32)
513
        self.assertEqual(b'x', unused)
514

    
515
    def test_write_pack_object_sha(self):
516
        f = BytesIO()
517
        f.write(b'header')
518
        offset = f.tell()
519
        sha_a = sha1(b'foo')
520
        sha_b = sha_a.copy()
521
        write_pack_object(f, Blob.type_num, b'blob', sha=sha_a)
522
        self.assertNotEqual(sha_a.digest(), sha_b.digest())
523
        sha_b.update(f.getvalue()[offset:])
524
        self.assertEqual(sha_a.digest(), sha_b.digest())
525

    
526

    
527
pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
528

    
529

    
530
class BaseTestPackIndexWriting(object):
531

    
532
    def assertSucceeds(self, func, *args, **kwargs):
533
        try:
534
            func(*args, **kwargs)
535
        except ChecksumMismatch as e:
536
            self.fail(e)
537

    
538
    def index(self, filename, entries, pack_checksum):
539
        raise NotImplementedError(self.index)
540

    
541
    def test_empty(self):
542
        idx = self.index('empty.idx', [], pack_checksum)
543
        self.assertEqual(idx.get_pack_checksum(), pack_checksum)
544
        self.assertEqual(0, len(idx))
545

    
546
    def test_large(self):
547
        entry1_sha = hex_to_sha('4e6388232ec39792661e2e75db8fb117fc869ce6')
548
        entry2_sha = hex_to_sha('e98f071751bd77f59967bfa671cd2caebdccc9a2')
549
        entries = [(entry1_sha, 0xf2972d0830529b87, 24),
550
                   (entry2_sha, (~0xf2972d0830529b87)&(2**64-1), 92)]
551
        if not self._supports_large:
552
            self.assertRaises(TypeError, self.index, 'single.idx',
553
                entries, pack_checksum)
554
            return
555
        idx = self.index('single.idx', entries, pack_checksum)
556
        self.assertEqual(idx.get_pack_checksum(), pack_checksum)
557
        self.assertEqual(2, len(idx))
558
        actual_entries = list(idx.iterentries())
559
        self.assertEqual(len(entries), len(actual_entries))
560
        for mine, actual in zip(entries, actual_entries):
561
            my_sha, my_offset, my_crc = mine
562
            actual_sha, actual_offset, actual_crc = actual
563
            self.assertEqual(my_sha, actual_sha)
564
            self.assertEqual(my_offset, actual_offset)
565
            if self._has_crc32_checksum:
566
                self.assertEqual(my_crc, actual_crc)
567
            else:
568
                self.assertTrue(actual_crc is None)
569

    
570
    def test_single(self):
571
        entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
572
        my_entries = [(entry_sha, 178, 42)]
573
        idx = self.index('single.idx', my_entries, pack_checksum)
574
        self.assertEqual(idx.get_pack_checksum(), pack_checksum)
575
        self.assertEqual(1, len(idx))
576
        actual_entries = list(idx.iterentries())
577
        self.assertEqual(len(my_entries), len(actual_entries))
578
        for mine, actual in zip(my_entries, actual_entries):
579
            my_sha, my_offset, my_crc = mine
580
            actual_sha, actual_offset, actual_crc = actual
581
            self.assertEqual(my_sha, actual_sha)
582
            self.assertEqual(my_offset, actual_offset)
583
            if self._has_crc32_checksum:
584
                self.assertEqual(my_crc, actual_crc)
585
            else:
586
                self.assertTrue(actual_crc is None)
587

    
588

    
589
class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting):
590

    
591
    def setUp(self):
592
        self.tempdir = tempfile.mkdtemp()
593

    
594
    def tearDown(self):
595
        shutil.rmtree(self.tempdir)
596

    
597
    def index(self, filename, entries, pack_checksum):
598
        path = os.path.join(self.tempdir, filename)
599
        self.writeIndex(path, entries, pack_checksum)
600
        idx = load_pack_index(path)
601
        self.assertSucceeds(idx.check)
602
        self.assertEqual(idx.version, self._expected_version)
603
        return idx
604

    
605
    def writeIndex(self, filename, entries, pack_checksum):
606
        # FIXME: Write to BytesIO instead rather than hitting disk ?
607
        with GitFile(filename, "wb") as f:
608
            self._write_fn(f, entries, pack_checksum)
609

    
610

    
611
class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting):
612

    
613
    def setUp(self):
614
        TestCase.setUp(self)
615
        self._has_crc32_checksum = True
616
        self._supports_large = True
617

    
618
    def index(self, filename, entries, pack_checksum):
619
        return MemoryPackIndex(entries, pack_checksum)
620

    
621
    def tearDown(self):
622
        TestCase.tearDown(self)
623

    
624

    
625
class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting):
626

    
627
    def setUp(self):
628
        TestCase.setUp(self)
629
        BaseTestFilePackIndexWriting.setUp(self)
630
        self._has_crc32_checksum = False
631
        self._expected_version = 1
632
        self._supports_large = False
633
        self._write_fn = write_pack_index_v1
634

    
635
    def tearDown(self):
636
        TestCase.tearDown(self)
637
        BaseTestFilePackIndexWriting.tearDown(self)
638

    
639

    
640
class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting):
641

    
642
    def setUp(self):
643
        TestCase.setUp(self)
644
        BaseTestFilePackIndexWriting.setUp(self)
645
        self._has_crc32_checksum = True
646
        self._supports_large = True
647
        self._expected_version = 2
648
        self._write_fn = write_pack_index_v2
649

    
650
    def tearDown(self):
651
        TestCase.tearDown(self)
652
        BaseTestFilePackIndexWriting.tearDown(self)
653

    
654

    
655
class ReadZlibTests(TestCase):
656

    
657
    decomp = (
658
      b'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
659
      b'parent None\n'
660
      b'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
661
      b'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
662
      b'\n'
663
      b"Provide replacement for mmap()'s offset argument.")
664
    comp = zlib.compress(decomp)
665
    extra = b'nextobject'
666

    
667
    def setUp(self):
668
        super(ReadZlibTests, self).setUp()
669
        self.read = BytesIO(self.comp + self.extra).read
670
        self.unpacked = UnpackedObject(Tree.type_num, None, len(self.decomp), 0)
671

    
672
    def test_decompress_size(self):
673
        good_decomp_len = len(self.decomp)
674
        self.unpacked.decomp_len = -1
675
        self.assertRaises(ValueError, read_zlib_chunks, self.read,
676
                          self.unpacked)
677
        self.unpacked.decomp_len = good_decomp_len - 1
678
        self.assertRaises(zlib.error, read_zlib_chunks, self.read,
679
                          self.unpacked)
680
        self.unpacked.decomp_len = good_decomp_len + 1
681
        self.assertRaises(zlib.error, read_zlib_chunks, self.read,
682
                          self.unpacked)
683

    
684
    def test_decompress_truncated(self):
685
        read = BytesIO(self.comp[:10]).read
686
        self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)
687

    
688
        read = BytesIO(self.comp).read
689
        self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)
690

    
691
    def test_decompress_empty(self):
692
        unpacked = UnpackedObject(Tree.type_num, None, 0, None)
693
        comp = zlib.compress(b'')
694
        read = BytesIO(comp + self.extra).read
695
        unused = read_zlib_chunks(read, unpacked)
696
        self.assertEqual(b'', b''.join(unpacked.decomp_chunks))
697
        self.assertNotEqual(b'', unused)
698
        self.assertEqual(self.extra, unused + read())
699

    
700
    def test_decompress_no_crc32(self):
701
        self.unpacked.crc32 = None
702
        read_zlib_chunks(self.read, self.unpacked)
703
        self.assertEqual(None, self.unpacked.crc32)
704

    
705
    def _do_decompress_test(self, buffer_size, **kwargs):
706
        unused = read_zlib_chunks(self.read, self.unpacked,
707
                                  buffer_size=buffer_size, **kwargs)
708
        self.assertEqual(self.decomp, b''.join(self.unpacked.decomp_chunks))
709
        self.assertEqual(zlib.crc32(self.comp), self.unpacked.crc32)
710
        self.assertNotEqual(b'', unused)
711
        self.assertEqual(self.extra, unused + self.read())
712

    
713
    def test_simple_decompress(self):
714
        self._do_decompress_test(4096)
715
        self.assertEqual(None, self.unpacked.comp_chunks)
716

    
717
    # These buffer sizes are not intended to be realistic, but rather simulate
718
    # larger buffer sizes that may end at various places.
719
    def test_decompress_buffer_size_1(self):
720
        self._do_decompress_test(1)
721

    
722
    def test_decompress_buffer_size_2(self):
723
        self._do_decompress_test(2)
724

    
725
    def test_decompress_buffer_size_3(self):
726
        self._do_decompress_test(3)
727

    
728
    def test_decompress_buffer_size_4(self):
729
        self._do_decompress_test(4)
730

    
731
    def test_decompress_include_comp(self):
732
        self._do_decompress_test(4096, include_comp=True)
733
        self.assertEqual(self.comp, b''.join(self.unpacked.comp_chunks))
734

    
735

    
736
class DeltifyTests(TestCase):
737

    
738
    def test_empty(self):
739
        self.assertEqual([], list(deltify_pack_objects([])))
740

    
741
    def test_single(self):
742
        b = Blob.from_string(b"foo")
743
        self.assertEqual(
744
            [(b.type_num, b.sha().digest(), None, b.as_raw_string())],
745
            list(deltify_pack_objects([(b, b"")])))
746

    
747
    def test_simple_delta(self):
748
        b1 = Blob.from_string(b"a" * 101)
749
        b2 = Blob.from_string(b"a" * 100)
750
        delta = create_delta(b1.as_raw_string(), b2.as_raw_string())
751
        self.assertEqual([
752
            (b1.type_num, b1.sha().digest(), None, b1.as_raw_string()),
753
            (b2.type_num, b2.sha().digest(), b1.sha().digest(), delta)
754
            ],
755
            list(deltify_pack_objects([(b1, b""), (b2, b"")])))
756

    
757

    
758
class TestPackStreamReader(TestCase):
759

    
760
    def test_read_objects_emtpy(self):
761
        f = BytesIO()
762
        build_pack(f, [])
763
        reader = PackStreamReader(f.read)
764
        self.assertEqual(0, len(list(reader.read_objects())))
765

    
766
    def test_read_objects(self):
767
        f = BytesIO()
768
        entries = build_pack(f, [
769
            (Blob.type_num, b'blob'),
770
            (OFS_DELTA, (0, b'blob1')),
771
        ])
772
        reader = PackStreamReader(f.read)
773
        objects = list(reader.read_objects(compute_crc32=True))
774
        self.assertEqual(2, len(objects))
775

    
776
        unpacked_blob, unpacked_delta = objects
777

    
778
        self.assertEqual(entries[0][0], unpacked_blob.offset)
779
        self.assertEqual(Blob.type_num, unpacked_blob.pack_type_num)
780
        self.assertEqual(Blob.type_num, unpacked_blob.obj_type_num)
781
        self.assertEqual(None, unpacked_blob.delta_base)
782
        self.assertEqual(b'blob', b''.join(unpacked_blob.decomp_chunks))
783
        self.assertEqual(entries[0][4], unpacked_blob.crc32)
784

    
785
        self.assertEqual(entries[1][0], unpacked_delta.offset)
786
        self.assertEqual(OFS_DELTA, unpacked_delta.pack_type_num)
787
        self.assertEqual(None, unpacked_delta.obj_type_num)
788
        self.assertEqual(unpacked_delta.offset - unpacked_blob.offset,
789
                         unpacked_delta.delta_base)
790
        delta = create_delta(b'blob', b'blob1')
791
        self.assertEqual(delta, b''.join(unpacked_delta.decomp_chunks))
792
        self.assertEqual(entries[1][4], unpacked_delta.crc32)
793

    
794
    def test_read_objects_buffered(self):
795
        f = BytesIO()
796
        build_pack(f, [
797
            (Blob.type_num, b'blob'),
798
            (OFS_DELTA, (0, b'blob1')),
799
        ])
800
        reader = PackStreamReader(f.read, zlib_bufsize=4)
801
        self.assertEqual(2, len(list(reader.read_objects())))
802

    
803
    def test_read_objects_empty(self):
804
        reader = PackStreamReader(BytesIO().read)
805
        self.assertEqual([], list(reader.read_objects()))
806

    
807

    
808
class TestPackIterator(DeltaChainIterator):
809

    
810
    _compute_crc32 = True
811

    
812
    def __init__(self, *args, **kwargs):
813
        super(TestPackIterator, self).__init__(*args, **kwargs)
814
        self._unpacked_offsets = set()
815

    
816
    def _result(self, unpacked):
817
        """Return entries in the same format as build_pack."""
818
        return (unpacked.offset, unpacked.obj_type_num,
819
                b''.join(unpacked.obj_chunks), unpacked.sha(), unpacked.crc32)
820

    
821
    def _resolve_object(self, offset, pack_type_num, base_chunks):
822
        assert offset not in self._unpacked_offsets, (
823
                'Attempted to re-inflate offset %i' % offset)
824
        self._unpacked_offsets.add(offset)
825
        return super(TestPackIterator, self)._resolve_object(
826
          offset, pack_type_num, base_chunks)
827

    
828

    
829
class DeltaChainIteratorTests(TestCase):
830

    
831
    def setUp(self):
832
        super(DeltaChainIteratorTests, self).setUp()
833
        self.store = MemoryObjectStore()
834
        self.fetched = set()
835

    
836
    def store_blobs(self, blobs_data):
837
        blobs = []
838
        for data in blobs_data:
839
            blob = make_object(Blob, data=data)
840
            blobs.append(blob)
841
            self.store.add_object(blob)
842
        return blobs
843

    
844
    def get_raw_no_repeat(self, bin_sha):
845
        """Wrapper around store.get_raw that doesn't allow repeat lookups."""
846
        hex_sha = sha_to_hex(bin_sha)
847
        self.assertFalse(hex_sha in self.fetched,
848
                         'Attempted to re-fetch object %s' % hex_sha)
849
        self.fetched.add(hex_sha)
850
        return self.store.get_raw(hex_sha)
851

    
852
    def make_pack_iter(self, f, thin=None):
853
        if thin is None:
854
            thin = bool(list(self.store))
855
        resolve_ext_ref = thin and self.get_raw_no_repeat or None
856
        data = PackData('test.pack', file=f)
857
        return TestPackIterator.for_pack_data(
858
          data, resolve_ext_ref=resolve_ext_ref)
859

    
860
    def assertEntriesMatch(self, expected_indexes, entries, pack_iter):
861
        expected = [entries[i] for i in expected_indexes]
862
        self.assertEqual(expected, list(pack_iter._walk_all_chains()))
863

    
864
    def test_no_deltas(self):
865
        f = BytesIO()
866
        entries = build_pack(f, [
867
            (Commit.type_num, b'commit'),
868
            (Blob.type_num, b'blob'),
869
            (Tree.type_num, b'tree'),
870
        ])
871
        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
872

    
873
    def test_ofs_deltas(self):
874
        f = BytesIO()
875
        entries = build_pack(f, [
876
            (Blob.type_num, b'blob'),
877
            (OFS_DELTA, (0, b'blob1')),
878
            (OFS_DELTA, (0, b'blob2')),
879
        ])
880
        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
881

    
882
    def test_ofs_deltas_chain(self):
883
        f = BytesIO()
884
        entries = build_pack(f, [
885
            (Blob.type_num, b'blob'),
886
            (OFS_DELTA, (0, b'blob1')),
887
            (OFS_DELTA, (1, b'blob2')),
888
        ])
889
        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
890

    
891
    def test_ref_deltas(self):
892
        f = BytesIO()
893
        entries = build_pack(f, [
894
            (REF_DELTA, (1, b'blob1')),
895
            (Blob.type_num, (b'blob')),
896
            (REF_DELTA, (1, b'blob2')),
897
        ])
898
        self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f))
899

    
900
    def test_ref_deltas_chain(self):
901
        f = BytesIO()
902
        entries = build_pack(f, [
903
            (REF_DELTA, (2, b'blob1')),
904
            (Blob.type_num, (b'blob')),
905
            (REF_DELTA, (1, b'blob2')),
906
        ])
907
        self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
908

    
909
    def test_ofs_and_ref_deltas(self):
910
        # Deltas pending on this offset are popped before deltas depending on
911
        # this ref.
912
        f = BytesIO()
913
        entries = build_pack(f, [
914
            (REF_DELTA, (1, b'blob1')),
915
            (Blob.type_num, (b'blob')),
916
            (OFS_DELTA, (1, b'blob2')),
917
        ])
918
        self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
919

    
920
    def test_mixed_chain(self):
921
        f = BytesIO()
922
        entries = build_pack(f, [
923
            (Blob.type_num, b'blob'),
924
            (REF_DELTA, (2, b'blob2')),
925
            (OFS_DELTA, (0, b'blob1')),
926
            (OFS_DELTA, (1, b'blob3')),
927
            (OFS_DELTA, (0, b'bob')),
928
        ])
929
        self.assertEntriesMatch([0, 2, 4, 1, 3], entries,
930
                                self.make_pack_iter(f))
931

    
932
    def test_long_chain(self):
933
        n = 100
934
        objects_spec = [(Blob.type_num, b'blob')]
935
        for i in range(n):
936
            objects_spec.append((OFS_DELTA, (i, b'blob' + str(i).encode('ascii'))))
937
        f = BytesIO()
938
        entries = build_pack(f, objects_spec)
939
        self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))
940

    
941
    def test_branchy_chain(self):
942
        n = 100
943
        objects_spec = [(Blob.type_num, b'blob')]
944
        for i in range(n):
945
            objects_spec.append((OFS_DELTA, (0, b'blob' + str(i).encode('ascii'))))
946
        f = BytesIO()
947
        entries = build_pack(f, objects_spec)
948
        self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))
949

    
950
    def test_ext_ref(self):
951
        blob, = self.store_blobs([b'blob'])
952
        f = BytesIO()
953
        entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))],
954
                             store=self.store)
955
        pack_iter = self.make_pack_iter(f)
956
        self.assertEntriesMatch([0], entries, pack_iter)
957
        self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
958

    
959
    def test_ext_ref_chain(self):
960
        blob, = self.store_blobs([b'blob'])
961
        f = BytesIO()
962
        entries = build_pack(f, [
963
            (REF_DELTA, (1, b'blob2')),
964
            (REF_DELTA, (blob.id, b'blob1')),
965
        ], store=self.store)
966
        pack_iter = self.make_pack_iter(f)
967
        self.assertEntriesMatch([1, 0], entries, pack_iter)
968
        self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
969

    
970
    def test_ext_ref_chain_degenerate(self):
971
        # Test a degenerate case where the sender is sending a REF_DELTA
972
        # object that expands to an object already in the repository.
973
        blob, = self.store_blobs([b'blob'])
974
        blob2, = self.store_blobs([b'blob2'])
975
        assert blob.id < blob2.id
976

    
977
        f = BytesIO()
978
        entries = build_pack(f, [
979
          (REF_DELTA, (blob.id, b'blob2')),
980
          (REF_DELTA, (0, b'blob3')),
981
          ], store=self.store)
982
        pack_iter = self.make_pack_iter(f)
983
        self.assertEntriesMatch([0, 1], entries, pack_iter)
984
        self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
985

    
986
    def test_ext_ref_multiple_times(self):
987
        blob, = self.store_blobs([b'blob'])
988
        f = BytesIO()
989
        entries = build_pack(f, [
990
            (REF_DELTA, (blob.id, b'blob1')),
991
            (REF_DELTA, (blob.id, b'blob2')),
992
        ], store=self.store)
993
        pack_iter = self.make_pack_iter(f)
994
        self.assertEntriesMatch([0, 1], entries, pack_iter)
995
        self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
996

    
997
    def test_multiple_ext_refs(self):
998
        b1, b2 = self.store_blobs([b'foo', b'bar'])
999
        f = BytesIO()
1000
        entries = build_pack(f, [
1001
            (REF_DELTA, (b1.id, b'foo1')),
1002
            (REF_DELTA, (b2.id, b'bar2')),
1003
        ], store=self.store)
1004
        pack_iter = self.make_pack_iter(f)
1005
        self.assertEntriesMatch([0, 1], entries, pack_iter)
1006
        self.assertEqual([hex_to_sha(b1.id), hex_to_sha(b2.id)],
1007
                         pack_iter.ext_refs())
1008

    
1009
    def test_bad_ext_ref_non_thin_pack(self):
1010
        blob, = self.store_blobs([b'blob'])
1011
        f = BytesIO()
1012
        entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))],
1013
                             store=self.store)
1014
        pack_iter = self.make_pack_iter(f, thin=False)
1015
        try:
1016
            list(pack_iter._walk_all_chains())
1017
            self.fail()
1018
        except KeyError as e:
1019
            self.assertEqual(([blob.id],), e.args)
1020

    
1021
    def test_bad_ext_ref_thin_pack(self):
1022
        b1, b2, b3 = self.store_blobs([b'foo', b'bar', b'baz'])
1023
        f = BytesIO()
1024
        build_pack(f, [
1025
          (REF_DELTA, (1, b'foo99')),
1026
          (REF_DELTA, (b1.id, b'foo1')),
1027
          (REF_DELTA, (b2.id, b'bar2')),
1028
          (REF_DELTA, (b3.id, b'baz3')),
1029
          ], store=self.store)
1030
        del self.store[b2.id]
1031
        del self.store[b3.id]
1032
        pack_iter = self.make_pack_iter(f)
1033
        try:
1034
            list(pack_iter._walk_all_chains())
1035
            self.fail()
1036
        except KeyError as e:
1037
            self.assertEqual((sorted([b2.id, b3.id]),), (sorted(e.args[0]),))
1038

    
1039

    
1040
class DeltaEncodeSizeTests(TestCase):
1041

    
1042
    def test_basic(self):
1043
        self.assertEqual(b'\x00', _delta_encode_size(0))
1044
        self.assertEqual(b'\x01', _delta_encode_size(1))
1045
        self.assertEqual(b'\xfa\x01', _delta_encode_size(250))
1046
        self.assertEqual(b'\xe8\x07', _delta_encode_size(1000))
1047
        self.assertEqual(b'\xa0\x8d\x06', _delta_encode_size(100000))
1048

    
1049

    
1050
class EncodeCopyOperationTests(TestCase):
1051

    
1052
    def test_basic(self):
1053
        self.assertEqual(b'\x80', _encode_copy_operation(0, 0))
1054
        self.assertEqual(b'\x91\x01\x0a', _encode_copy_operation(1, 10))
1055
        self.assertEqual(b'\xb1\x64\xe8\x03', _encode_copy_operation(100, 1000))
1056
        self.assertEqual(b'\x93\xe8\x03\x01', _encode_copy_operation(1000, 1))