gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_pack.py @ 959
History | View | Annotate | Download (38.3 KB)
1 |
# test_pack.py -- Tests for the handling of git packs.
|
---|---|
2 |
# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
|
3 |
# Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
|
4 |
#
|
5 |
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
|
6 |
# General Public License as public by the Free Software Foundation; version 2.0
|
7 |
# or (at your option) any later version. You can redistribute it and/or
|
8 |
# modify it under the terms of either of these two licenses.
|
9 |
#
|
10 |
# Unless required by applicable law or agreed to in writing, software
|
11 |
# distributed under the License is distributed on an "AS IS" BASIS,
|
12 |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 |
# See the License for the specific language governing permissions and
|
14 |
# limitations under the License.
|
15 |
#
|
16 |
# You should have received a copy of the licenses; if not, see
|
17 |
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
|
18 |
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
|
19 |
# License, Version 2.0.
|
20 |
#
|
21 |
|
22 |
"""Tests for Dulwich packs."""
|
23 |
|
24 |
|
25 |
from io import BytesIO |
26 |
from hashlib import sha1 |
27 |
import os |
28 |
import shutil |
29 |
import tempfile |
30 |
import zlib |
31 |
|
32 |
from dulwich.errors import ( |
33 |
ApplyDeltaError, |
34 |
ChecksumMismatch, |
35 |
) |
36 |
from dulwich.file import ( |
37 |
GitFile, |
38 |
) |
39 |
from dulwich.object_store import ( |
40 |
MemoryObjectStore, |
41 |
) |
42 |
from dulwich.objects import ( |
43 |
hex_to_sha, |
44 |
sha_to_hex, |
45 |
Commit, |
46 |
Tree, |
47 |
Blob, |
48 |
) |
49 |
from dulwich.pack import ( |
50 |
OFS_DELTA, |
51 |
REF_DELTA, |
52 |
MemoryPackIndex, |
53 |
Pack, |
54 |
PackData, |
55 |
apply_delta, |
56 |
create_delta, |
57 |
deltify_pack_objects, |
58 |
load_pack_index, |
59 |
UnpackedObject, |
60 |
read_zlib_chunks, |
61 |
write_pack_header, |
62 |
write_pack_index_v1, |
63 |
write_pack_index_v2, |
64 |
write_pack_object, |
65 |
write_pack, |
66 |
unpack_object, |
67 |
compute_file_sha, |
68 |
PackStreamReader, |
69 |
DeltaChainIterator, |
70 |
_delta_encode_size, |
71 |
_encode_copy_operation, |
72 |
) |
73 |
from dulwich.tests import ( |
74 |
TestCase, |
75 |
) |
76 |
from dulwich.tests.utils import ( |
77 |
make_object, |
78 |
build_pack, |
79 |
) |
80 |
|
81 |
pack1_sha = b'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
|
82 |
|
83 |
a_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
|
84 |
tree_sha = b'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
|
85 |
commit_sha = b'f18faa16531ac570a3fdc8c7ca16682548dafd12'
|
86 |
|
87 |
|
88 |
class PackTests(TestCase): |
89 |
"""Base class for testing packs"""
|
90 |
|
91 |
def setUp(self): |
92 |
super(PackTests, self).setUp() |
93 |
self.tempdir = tempfile.mkdtemp()
|
94 |
self.addCleanup(shutil.rmtree, self.tempdir) |
95 |
|
96 |
datadir = os.path.abspath(os.path.join(os.path.dirname(__file__), |
97 |
'data/packs'))
|
98 |
|
99 |
def get_pack_index(self, sha): |
100 |
"""Returns a PackIndex from the datadir with the given sha"""
|
101 |
return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha.decode('ascii'))) |
102 |
|
103 |
def get_pack_data(self, sha): |
104 |
"""Returns a PackData object from the datadir with the given sha"""
|
105 |
return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha.decode('ascii'))) |
106 |
|
107 |
def get_pack(self, sha): |
108 |
return Pack(os.path.join(self.datadir, 'pack-%s' % sha.decode('ascii'))) |
109 |
|
110 |
def assertSucceeds(self, func, *args, **kwargs): |
111 |
try:
|
112 |
func(*args, **kwargs) |
113 |
except ChecksumMismatch as e: |
114 |
self.fail(e)
|
115 |
|
116 |
|
117 |
class PackIndexTests(PackTests): |
118 |
"""Class that tests the index of packfiles"""
|
119 |
|
120 |
def test_object_index(self): |
121 |
"""Tests that the correct object offset is returned from the index."""
|
122 |
p = self.get_pack_index(pack1_sha)
|
123 |
self.assertRaises(KeyError, p.object_index, pack1_sha) |
124 |
self.assertEqual(p.object_index(a_sha), 178) |
125 |
self.assertEqual(p.object_index(tree_sha), 138) |
126 |
self.assertEqual(p.object_index(commit_sha), 12) |
127 |
|
128 |
def test_index_len(self): |
129 |
p = self.get_pack_index(pack1_sha)
|
130 |
self.assertEqual(3, len(p)) |
131 |
|
132 |
def test_get_stored_checksum(self): |
133 |
p = self.get_pack_index(pack1_sha)
|
134 |
self.assertEqual(b'f2848e2ad16f329ae1c92e3b95e91888daa5bd01', |
135 |
sha_to_hex(p.get_stored_checksum())) |
136 |
self.assertEqual(b'721980e866af9a5f93ad674144e1459b8ba3e7b7', |
137 |
sha_to_hex(p.get_pack_checksum())) |
138 |
|
139 |
def test_index_check(self): |
140 |
p = self.get_pack_index(pack1_sha)
|
141 |
self.assertSucceeds(p.check)
|
142 |
|
143 |
def test_iterentries(self): |
144 |
p = self.get_pack_index(pack1_sha)
|
145 |
entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()] |
146 |
self.assertEqual([
|
147 |
(b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None), |
148 |
(b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None), |
149 |
(b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None) |
150 |
], entries) |
151 |
|
152 |
def test_iter(self): |
153 |
p = self.get_pack_index(pack1_sha)
|
154 |
self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p)) |
155 |
|
156 |
|
157 |
class TestPackDeltas(TestCase): |
158 |
|
159 |
test_string1 = b'The answer was flailing in the wind'
|
160 |
test_string2 = b'The answer was falling down the pipe'
|
161 |
test_string3 = b'zzzzz'
|
162 |
|
163 |
test_string_empty = b''
|
164 |
test_string_big = b'Z' * 8192 |
165 |
test_string_huge = b'Z' * 100000 |
166 |
|
167 |
def _test_roundtrip(self, base, target): |
168 |
self.assertEqual(target,
|
169 |
b''.join(apply_delta(base, create_delta(base, target))))
|
170 |
|
171 |
def test_nochange(self): |
172 |
self._test_roundtrip(self.test_string1, self.test_string1) |
173 |
|
174 |
def test_nochange_huge(self): |
175 |
self._test_roundtrip(self.test_string_huge, self.test_string_huge) |
176 |
|
177 |
def test_change(self): |
178 |
self._test_roundtrip(self.test_string1, self.test_string2) |
179 |
|
180 |
def test_rewrite(self): |
181 |
self._test_roundtrip(self.test_string1, self.test_string3) |
182 |
|
183 |
def test_empty_to_big(self): |
184 |
self._test_roundtrip(self.test_string_empty, self.test_string_big) |
185 |
|
186 |
def test_empty_to_huge(self): |
187 |
self._test_roundtrip(self.test_string_empty, self.test_string_huge) |
188 |
|
189 |
def test_huge_copy(self): |
190 |
self._test_roundtrip(self.test_string_huge + self.test_string1, |
191 |
self.test_string_huge + self.test_string2) |
192 |
|
193 |
def test_dest_overflow(self): |
194 |
self.assertRaises(
|
195 |
ApplyDeltaError, |
196 |
apply_delta, b'a'*0x10000, b'\x80\x80\x04\x80\x80\x04\x80' + b'a'*0x10000) |
197 |
self.assertRaises(
|
198 |
ApplyDeltaError, |
199 |
apply_delta, b'', b'\x00\x80\x02\xb0\x11\x11') |
200 |
|
201 |
|
202 |
class TestPackData(PackTests): |
203 |
"""Tests getting the data from the packfile."""
|
204 |
|
205 |
def test_create_pack(self): |
206 |
self.get_pack_data(pack1_sha).close()
|
207 |
|
208 |
def test_from_file(self): |
209 |
path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha.decode('ascii')) |
210 |
with open(path, 'rb') as f: |
211 |
PackData.from_file(f, os.path.getsize(path)) |
212 |
|
213 |
def test_pack_len(self): |
214 |
with self.get_pack_data(pack1_sha) as p: |
215 |
self.assertEqual(3, len(p)) |
216 |
|
217 |
def test_index_check(self): |
218 |
with self.get_pack_data(pack1_sha) as p: |
219 |
self.assertSucceeds(p.check)
|
220 |
|
221 |
def test_iterobjects(self): |
222 |
with self.get_pack_data(pack1_sha) as p: |
223 |
commit_data = (b'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
|
224 |
b'author James Westby <jw+debian@jameswestby.net> '
|
225 |
b'1174945067 +0100\n'
|
226 |
b'committer James Westby <jw+debian@jameswestby.net> '
|
227 |
b'1174945067 +0100\n'
|
228 |
b'\n'
|
229 |
b'Test commit\n')
|
230 |
blob_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
|
231 |
tree_data = b'100644 a\0' + hex_to_sha(blob_sha)
|
232 |
actual = [] |
233 |
for offset, type_num, chunks, crc32 in p.iterobjects(): |
234 |
actual.append((offset, type_num, b''.join(chunks), crc32))
|
235 |
self.assertEqual([
|
236 |
(12, 1, commit_data, 3775879613), |
237 |
(138, 2, tree_data, 912998690), |
238 |
(178, 3, b'test 1\n', 1373561701) |
239 |
], actual) |
240 |
|
241 |
def test_iterentries(self): |
242 |
with self.get_pack_data(pack1_sha) as p: |
243 |
entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries()) |
244 |
self.assertEqual(set([ |
245 |
(b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701), |
246 |
(b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690), |
247 |
(b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613), |
248 |
]), entries) |
249 |
|
250 |
def test_create_index_v1(self): |
251 |
with self.get_pack_data(pack1_sha) as p: |
252 |
filename = os.path.join(self.tempdir, 'v1test.idx') |
253 |
p.create_index_v1(filename) |
254 |
idx1 = load_pack_index(filename) |
255 |
idx2 = self.get_pack_index(pack1_sha)
|
256 |
self.assertEqual(idx1, idx2)
|
257 |
|
258 |
def test_create_index_v2(self): |
259 |
with self.get_pack_data(pack1_sha) as p: |
260 |
filename = os.path.join(self.tempdir, 'v2test.idx') |
261 |
p.create_index_v2(filename) |
262 |
idx1 = load_pack_index(filename) |
263 |
idx2 = self.get_pack_index(pack1_sha)
|
264 |
self.assertEqual(idx1, idx2)
|
265 |
|
266 |
def test_compute_file_sha(self): |
267 |
f = BytesIO(b'abcd1234wxyz')
|
268 |
self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(), |
269 |
compute_file_sha(f).hexdigest()) |
270 |
self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(), |
271 |
compute_file_sha(f, buffer_size=5).hexdigest())
|
272 |
self.assertEqual(sha1(b'abcd1234').hexdigest(), |
273 |
compute_file_sha(f, end_ofs=-4).hexdigest())
|
274 |
self.assertEqual(sha1(b'1234wxyz').hexdigest(), |
275 |
compute_file_sha(f, start_ofs=4).hexdigest())
|
276 |
self.assertEqual(
|
277 |
sha1(b'1234').hexdigest(),
|
278 |
compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest()) |
279 |
|
280 |
def test_compute_file_sha_short_file(self): |
281 |
f = BytesIO(b'abcd1234wxyz')
|
282 |
self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20) |
283 |
self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20) |
284 |
self.assertRaises(AssertionError, compute_file_sha, f, start_ofs=10, |
285 |
end_ofs=-12)
|
286 |
|
287 |
|
288 |
class TestPack(PackTests): |
289 |
|
290 |
def test_len(self): |
291 |
with self.get_pack(pack1_sha) as p: |
292 |
self.assertEqual(3, len(p)) |
293 |
|
294 |
def test_contains(self): |
295 |
with self.get_pack(pack1_sha) as p: |
296 |
self.assertTrue(tree_sha in p) |
297 |
|
298 |
def test_get(self): |
299 |
with self.get_pack(pack1_sha) as p: |
300 |
self.assertEqual(type(p[tree_sha]), Tree) |
301 |
|
302 |
def test_iter(self): |
303 |
with self.get_pack(pack1_sha) as p: |
304 |
self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p)) |
305 |
|
306 |
def test_iterobjects(self): |
307 |
with self.get_pack(pack1_sha) as p: |
308 |
expected = set([p[s] for s in [commit_sha, tree_sha, a_sha]]) |
309 |
self.assertEqual(expected, set(list(p.iterobjects()))) |
310 |
|
311 |
def test_pack_tuples(self): |
312 |
with self.get_pack(pack1_sha) as p: |
313 |
tuples = p.pack_tuples() |
314 |
expected = set([(p[s], None) for s in [commit_sha, tree_sha, a_sha]]) |
315 |
self.assertEqual(expected, set(list(tuples))) |
316 |
self.assertEqual(expected, set(list(tuples))) |
317 |
self.assertEqual(3, len(tuples)) |
318 |
|
319 |
def test_get_object_at(self): |
320 |
"""Tests random access for non-delta objects"""
|
321 |
with self.get_pack(pack1_sha) as p: |
322 |
obj = p[a_sha] |
323 |
self.assertEqual(obj.type_name, b'blob') |
324 |
self.assertEqual(obj.sha().hexdigest().encode('ascii'), a_sha) |
325 |
obj = p[tree_sha] |
326 |
self.assertEqual(obj.type_name, b'tree') |
327 |
self.assertEqual(obj.sha().hexdigest().encode('ascii'), tree_sha) |
328 |
obj = p[commit_sha] |
329 |
self.assertEqual(obj.type_name, b'commit') |
330 |
self.assertEqual(obj.sha().hexdigest().encode('ascii'), commit_sha) |
331 |
|
332 |
def test_copy(self): |
333 |
with self.get_pack(pack1_sha) as origpack: |
334 |
self.assertSucceeds(origpack.index.check)
|
335 |
basename = os.path.join(self.tempdir, 'Elch') |
336 |
write_pack(basename, origpack.pack_tuples()) |
337 |
|
338 |
with Pack(basename) as newpack: |
339 |
self.assertEqual(origpack, newpack)
|
340 |
self.assertSucceeds(newpack.index.check)
|
341 |
self.assertEqual(origpack.name(), newpack.name())
|
342 |
self.assertEqual(origpack.index.get_pack_checksum(),
|
343 |
newpack.index.get_pack_checksum()) |
344 |
|
345 |
wrong_version = origpack.index.version != newpack.index.version |
346 |
orig_checksum = origpack.index.get_stored_checksum() |
347 |
new_checksum = newpack.index.get_stored_checksum() |
348 |
self.assertTrue(wrong_version or orig_checksum == new_checksum) |
349 |
|
350 |
def test_commit_obj(self): |
351 |
with self.get_pack(pack1_sha) as p: |
352 |
commit = p[commit_sha] |
353 |
self.assertEqual(b'James Westby <jw+debian@jameswestby.net>', |
354 |
commit.author) |
355 |
self.assertEqual([], commit.parents)
|
356 |
|
357 |
def _copy_pack(self, origpack): |
358 |
basename = os.path.join(self.tempdir, 'somepack') |
359 |
write_pack(basename, origpack.pack_tuples()) |
360 |
return Pack(basename)
|
361 |
|
362 |
def test_keep_no_message(self): |
363 |
with self.get_pack(pack1_sha) as p: |
364 |
p = self._copy_pack(p)
|
365 |
|
366 |
with p:
|
367 |
keepfile_name = p.keep() |
368 |
|
369 |
# file should exist
|
370 |
self.assertTrue(os.path.exists(keepfile_name))
|
371 |
|
372 |
with open(keepfile_name, 'r') as f: |
373 |
buf = f.read() |
374 |
self.assertEqual('', buf) |
375 |
|
376 |
def test_keep_message(self): |
377 |
with self.get_pack(pack1_sha) as p: |
378 |
p = self._copy_pack(p)
|
379 |
|
380 |
msg = b'some message'
|
381 |
with p:
|
382 |
keepfile_name = p.keep(msg) |
383 |
|
384 |
# file should exist
|
385 |
self.assertTrue(os.path.exists(keepfile_name))
|
386 |
|
387 |
# and contain the right message, with a linefeed
|
388 |
with open(keepfile_name, 'rb') as f: |
389 |
buf = f.read() |
390 |
self.assertEqual(msg + b'\n', buf) |
391 |
|
392 |
def test_name(self): |
393 |
with self.get_pack(pack1_sha) as p: |
394 |
self.assertEqual(pack1_sha, p.name())
|
395 |
|
396 |
def test_length_mismatch(self): |
397 |
with self.get_pack_data(pack1_sha) as data: |
398 |
index = self.get_pack_index(pack1_sha)
|
399 |
Pack.from_objects(data, index).check_length_and_checksum() |
400 |
|
401 |
data._file.seek(12)
|
402 |
bad_file = BytesIO() |
403 |
write_pack_header(bad_file, 9999)
|
404 |
bad_file.write(data._file.read()) |
405 |
bad_file = BytesIO(bad_file.getvalue()) |
406 |
bad_data = PackData('', file=bad_file)
|
407 |
bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index) |
408 |
self.assertRaises(AssertionError, lambda: bad_pack.data) |
409 |
self.assertRaises(AssertionError, |
410 |
lambda: bad_pack.check_length_and_checksum())
|
411 |
|
412 |
def test_checksum_mismatch(self): |
413 |
with self.get_pack_data(pack1_sha) as data: |
414 |
index = self.get_pack_index(pack1_sha)
|
415 |
Pack.from_objects(data, index).check_length_and_checksum() |
416 |
|
417 |
data._file.seek(0)
|
418 |
bad_file = BytesIO(data._file.read()[:-20] + (b'\xff' * 20)) |
419 |
bad_data = PackData('', file=bad_file)
|
420 |
bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index) |
421 |
self.assertRaises(ChecksumMismatch, lambda: bad_pack.data) |
422 |
self.assertRaises(ChecksumMismatch, lambda: |
423 |
bad_pack.check_length_and_checksum()) |
424 |
|
425 |
def test_iterobjects_2(self): |
426 |
with self.get_pack(pack1_sha) as p: |
427 |
objs = dict((o.id, o) for o in p.iterobjects()) |
428 |
self.assertEqual(3, len(objs)) |
429 |
self.assertEqual(sorted(objs), sorted(p.index)) |
430 |
self.assertTrue(isinstance(objs[a_sha], Blob)) |
431 |
self.assertTrue(isinstance(objs[tree_sha], Tree)) |
432 |
self.assertTrue(isinstance(objs[commit_sha], Commit)) |
433 |
|
434 |
|
435 |
class TestThinPack(PackTests): |
436 |
|
437 |
def setUp(self): |
438 |
super(TestThinPack, self).setUp() |
439 |
self.store = MemoryObjectStore()
|
440 |
self.blobs = {}
|
441 |
for blob in (b'foo', b'bar', b'foo1234', b'bar2468'): |
442 |
self.blobs[blob] = make_object(Blob, data=blob)
|
443 |
self.store.add_object(self.blobs[b'foo']) |
444 |
self.store.add_object(self.blobs[b'bar']) |
445 |
|
446 |
# Build a thin pack. 'foo' is as an external reference, 'bar' an
|
447 |
# internal reference.
|
448 |
self.pack_dir = tempfile.mkdtemp()
|
449 |
self.addCleanup(shutil.rmtree, self.pack_dir) |
450 |
self.pack_prefix = os.path.join(self.pack_dir, 'pack') |
451 |
|
452 |
with open(self.pack_prefix + '.pack', 'wb') as f: |
453 |
build_pack(f, [ |
454 |
(REF_DELTA, (self.blobs[b'foo'].id, b'foo1234')), |
455 |
(Blob.type_num, b'bar'),
|
456 |
(REF_DELTA, (self.blobs[b'bar'].id, b'bar2468'))], |
457 |
store=self.store)
|
458 |
|
459 |
# Index the new pack.
|
460 |
with self.make_pack(True) as pack: |
461 |
with PackData(pack._data_path) as data: |
462 |
data.pack = pack |
463 |
data.create_index(self.pack_prefix + '.idx') |
464 |
|
465 |
del self.store[self.blobs[b'bar'].id] |
466 |
|
467 |
def make_pack(self, resolve_ext_ref): |
468 |
return Pack(
|
469 |
self.pack_prefix,
|
470 |
resolve_ext_ref=self.store.get_raw if resolve_ext_ref else None) |
471 |
|
472 |
def test_get_raw(self): |
473 |
with self.make_pack(False) as p: |
474 |
self.assertRaises(
|
475 |
KeyError, p.get_raw, self.blobs[b'foo1234'].id) |
476 |
with self.make_pack(True) as p: |
477 |
self.assertEqual(
|
478 |
(3, b'foo1234'), |
479 |
p.get_raw(self.blobs[b'foo1234'].id)) |
480 |
|
481 |
def test_iterobjects(self): |
482 |
with self.make_pack(False) as p: |
483 |
self.assertRaises(KeyError, list, p.iterobjects()) |
484 |
with self.make_pack(True) as p: |
485 |
self.assertEqual(
|
486 |
sorted([self.blobs[b'foo1234'].id, self.blobs[b'bar'].id, |
487 |
self.blobs[b'bar2468'].id]), |
488 |
sorted(o.id for o in p.iterobjects())) |
489 |
|
490 |
|
491 |
class WritePackTests(TestCase): |
492 |
|
493 |
def test_write_pack_header(self): |
494 |
f = BytesIO() |
495 |
write_pack_header(f, 42)
|
496 |
self.assertEqual(b'PACK\x00\x00\x00\x02\x00\x00\x00*', |
497 |
f.getvalue()) |
498 |
|
499 |
def test_write_pack_object(self): |
500 |
f = BytesIO() |
501 |
f.write(b'header')
|
502 |
offset = f.tell() |
503 |
crc32 = write_pack_object(f, Blob.type_num, b'blob')
|
504 |
self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xffffffff) |
505 |
|
506 |
f.write(b'x') # unpack_object needs extra trailing data. |
507 |
f.seek(offset) |
508 |
unpacked, unused = unpack_object(f.read, compute_crc32=True)
|
509 |
self.assertEqual(Blob.type_num, unpacked.pack_type_num)
|
510 |
self.assertEqual(Blob.type_num, unpacked.obj_type_num)
|
511 |
self.assertEqual([b'blob'], unpacked.decomp_chunks) |
512 |
self.assertEqual(crc32, unpacked.crc32)
|
513 |
self.assertEqual(b'x', unused) |
514 |
|
515 |
def test_write_pack_object_sha(self): |
516 |
f = BytesIO() |
517 |
f.write(b'header')
|
518 |
offset = f.tell() |
519 |
sha_a = sha1(b'foo')
|
520 |
sha_b = sha_a.copy() |
521 |
write_pack_object(f, Blob.type_num, b'blob', sha=sha_a)
|
522 |
self.assertNotEqual(sha_a.digest(), sha_b.digest())
|
523 |
sha_b.update(f.getvalue()[offset:]) |
524 |
self.assertEqual(sha_a.digest(), sha_b.digest())
|
525 |
|
526 |
|
527 |
pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
|
528 |
|
529 |
|
530 |
class BaseTestPackIndexWriting(object): |
531 |
|
532 |
def assertSucceeds(self, func, *args, **kwargs): |
533 |
try:
|
534 |
func(*args, **kwargs) |
535 |
except ChecksumMismatch as e: |
536 |
self.fail(e)
|
537 |
|
538 |
def index(self, filename, entries, pack_checksum): |
539 |
raise NotImplementedError(self.index) |
540 |
|
541 |
def test_empty(self): |
542 |
idx = self.index('empty.idx', [], pack_checksum) |
543 |
self.assertEqual(idx.get_pack_checksum(), pack_checksum)
|
544 |
self.assertEqual(0, len(idx)) |
545 |
|
546 |
def test_large(self): |
547 |
entry1_sha = hex_to_sha('4e6388232ec39792661e2e75db8fb117fc869ce6')
|
548 |
entry2_sha = hex_to_sha('e98f071751bd77f59967bfa671cd2caebdccc9a2')
|
549 |
entries = [(entry1_sha, 0xf2972d0830529b87, 24), |
550 |
(entry2_sha, (~0xf2972d0830529b87)&(2**64-1), 92)] |
551 |
if not self._supports_large: |
552 |
self.assertRaises(TypeError, self.index, 'single.idx', |
553 |
entries, pack_checksum) |
554 |
return
|
555 |
idx = self.index('single.idx', entries, pack_checksum) |
556 |
self.assertEqual(idx.get_pack_checksum(), pack_checksum)
|
557 |
self.assertEqual(2, len(idx)) |
558 |
actual_entries = list(idx.iterentries())
|
559 |
self.assertEqual(len(entries), len(actual_entries)) |
560 |
for mine, actual in zip(entries, actual_entries): |
561 |
my_sha, my_offset, my_crc = mine |
562 |
actual_sha, actual_offset, actual_crc = actual |
563 |
self.assertEqual(my_sha, actual_sha)
|
564 |
self.assertEqual(my_offset, actual_offset)
|
565 |
if self._has_crc32_checksum: |
566 |
self.assertEqual(my_crc, actual_crc)
|
567 |
else:
|
568 |
self.assertTrue(actual_crc is None) |
569 |
|
570 |
def test_single(self): |
571 |
entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
|
572 |
my_entries = [(entry_sha, 178, 42)] |
573 |
idx = self.index('single.idx', my_entries, pack_checksum) |
574 |
self.assertEqual(idx.get_pack_checksum(), pack_checksum)
|
575 |
self.assertEqual(1, len(idx)) |
576 |
actual_entries = list(idx.iterentries())
|
577 |
self.assertEqual(len(my_entries), len(actual_entries)) |
578 |
for mine, actual in zip(my_entries, actual_entries): |
579 |
my_sha, my_offset, my_crc = mine |
580 |
actual_sha, actual_offset, actual_crc = actual |
581 |
self.assertEqual(my_sha, actual_sha)
|
582 |
self.assertEqual(my_offset, actual_offset)
|
583 |
if self._has_crc32_checksum: |
584 |
self.assertEqual(my_crc, actual_crc)
|
585 |
else:
|
586 |
self.assertTrue(actual_crc is None) |
587 |
|
588 |
|
589 |
class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting): |
590 |
|
591 |
def setUp(self): |
592 |
self.tempdir = tempfile.mkdtemp()
|
593 |
|
594 |
def tearDown(self): |
595 |
shutil.rmtree(self.tempdir)
|
596 |
|
597 |
def index(self, filename, entries, pack_checksum): |
598 |
path = os.path.join(self.tempdir, filename)
|
599 |
self.writeIndex(path, entries, pack_checksum)
|
600 |
idx = load_pack_index(path) |
601 |
self.assertSucceeds(idx.check)
|
602 |
self.assertEqual(idx.version, self._expected_version) |
603 |
return idx
|
604 |
|
605 |
def writeIndex(self, filename, entries, pack_checksum): |
606 |
# FIXME: Write to BytesIO instead rather than hitting disk ?
|
607 |
with GitFile(filename, "wb") as f: |
608 |
self._write_fn(f, entries, pack_checksum)
|
609 |
|
610 |
|
611 |
class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting): |
612 |
|
613 |
def setUp(self): |
614 |
TestCase.setUp(self)
|
615 |
self._has_crc32_checksum = True |
616 |
self._supports_large = True |
617 |
|
618 |
def index(self, filename, entries, pack_checksum): |
619 |
return MemoryPackIndex(entries, pack_checksum)
|
620 |
|
621 |
def tearDown(self): |
622 |
TestCase.tearDown(self)
|
623 |
|
624 |
|
625 |
class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting): |
626 |
|
627 |
def setUp(self): |
628 |
TestCase.setUp(self)
|
629 |
BaseTestFilePackIndexWriting.setUp(self)
|
630 |
self._has_crc32_checksum = False |
631 |
self._expected_version = 1 |
632 |
self._supports_large = False |
633 |
self._write_fn = write_pack_index_v1
|
634 |
|
635 |
def tearDown(self): |
636 |
TestCase.tearDown(self)
|
637 |
BaseTestFilePackIndexWriting.tearDown(self)
|
638 |
|
639 |
|
640 |
class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting): |
641 |
|
642 |
def setUp(self): |
643 |
TestCase.setUp(self)
|
644 |
BaseTestFilePackIndexWriting.setUp(self)
|
645 |
self._has_crc32_checksum = True |
646 |
self._supports_large = True |
647 |
self._expected_version = 2 |
648 |
self._write_fn = write_pack_index_v2
|
649 |
|
650 |
def tearDown(self): |
651 |
TestCase.tearDown(self)
|
652 |
BaseTestFilePackIndexWriting.tearDown(self)
|
653 |
|
654 |
|
655 |
class ReadZlibTests(TestCase): |
656 |
|
657 |
decomp = ( |
658 |
b'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
|
659 |
b'parent None\n'
|
660 |
b'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
|
661 |
b'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
|
662 |
b'\n'
|
663 |
b"Provide replacement for mmap()'s offset argument.")
|
664 |
comp = zlib.compress(decomp) |
665 |
extra = b'nextobject'
|
666 |
|
667 |
def setUp(self): |
668 |
super(ReadZlibTests, self).setUp() |
669 |
self.read = BytesIO(self.comp + self.extra).read |
670 |
self.unpacked = UnpackedObject(Tree.type_num, None, len(self.decomp), 0) |
671 |
|
672 |
def test_decompress_size(self): |
673 |
good_decomp_len = len(self.decomp) |
674 |
self.unpacked.decomp_len = -1 |
675 |
self.assertRaises(ValueError, read_zlib_chunks, self.read, |
676 |
self.unpacked)
|
677 |
self.unpacked.decomp_len = good_decomp_len - 1 |
678 |
self.assertRaises(zlib.error, read_zlib_chunks, self.read, |
679 |
self.unpacked)
|
680 |
self.unpacked.decomp_len = good_decomp_len + 1 |
681 |
self.assertRaises(zlib.error, read_zlib_chunks, self.read, |
682 |
self.unpacked)
|
683 |
|
684 |
def test_decompress_truncated(self): |
685 |
read = BytesIO(self.comp[:10]).read |
686 |
self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked) |
687 |
|
688 |
read = BytesIO(self.comp).read
|
689 |
self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked) |
690 |
|
691 |
def test_decompress_empty(self): |
692 |
unpacked = UnpackedObject(Tree.type_num, None, 0, None) |
693 |
comp = zlib.compress(b'')
|
694 |
read = BytesIO(comp + self.extra).read
|
695 |
unused = read_zlib_chunks(read, unpacked) |
696 |
self.assertEqual(b'', b''.join(unpacked.decomp_chunks)) |
697 |
self.assertNotEqual(b'', unused) |
698 |
self.assertEqual(self.extra, unused + read()) |
699 |
|
700 |
def test_decompress_no_crc32(self): |
701 |
self.unpacked.crc32 = None |
702 |
read_zlib_chunks(self.read, self.unpacked) |
703 |
self.assertEqual(None, self.unpacked.crc32) |
704 |
|
705 |
def _do_decompress_test(self, buffer_size, **kwargs): |
706 |
unused = read_zlib_chunks(self.read, self.unpacked, |
707 |
buffer_size=buffer_size, **kwargs) |
708 |
self.assertEqual(self.decomp, b''.join(self.unpacked.decomp_chunks)) |
709 |
self.assertEqual(zlib.crc32(self.comp), self.unpacked.crc32) |
710 |
self.assertNotEqual(b'', unused) |
711 |
self.assertEqual(self.extra, unused + self.read()) |
712 |
|
713 |
def test_simple_decompress(self): |
714 |
self._do_decompress_test(4096) |
715 |
self.assertEqual(None, self.unpacked.comp_chunks) |
716 |
|
717 |
# These buffer sizes are not intended to be realistic, but rather simulate
|
718 |
# larger buffer sizes that may end at various places.
|
719 |
def test_decompress_buffer_size_1(self): |
720 |
self._do_decompress_test(1) |
721 |
|
722 |
def test_decompress_buffer_size_2(self): |
723 |
self._do_decompress_test(2) |
724 |
|
725 |
def test_decompress_buffer_size_3(self): |
726 |
self._do_decompress_test(3) |
727 |
|
728 |
def test_decompress_buffer_size_4(self): |
729 |
self._do_decompress_test(4) |
730 |
|
731 |
def test_decompress_include_comp(self): |
732 |
self._do_decompress_test(4096, include_comp=True) |
733 |
self.assertEqual(self.comp, b''.join(self.unpacked.comp_chunks)) |
734 |
|
735 |
|
736 |
class DeltifyTests(TestCase): |
737 |
|
738 |
def test_empty(self): |
739 |
self.assertEqual([], list(deltify_pack_objects([]))) |
740 |
|
741 |
def test_single(self): |
742 |
b = Blob.from_string(b"foo")
|
743 |
self.assertEqual(
|
744 |
[(b.type_num, b.sha().digest(), None, b.as_raw_string())],
|
745 |
list(deltify_pack_objects([(b, b"")]))) |
746 |
|
747 |
def test_simple_delta(self): |
748 |
b1 = Blob.from_string(b"a" * 101) |
749 |
b2 = Blob.from_string(b"a" * 100) |
750 |
delta = create_delta(b1.as_raw_string(), b2.as_raw_string()) |
751 |
self.assertEqual([
|
752 |
(b1.type_num, b1.sha().digest(), None, b1.as_raw_string()),
|
753 |
(b2.type_num, b2.sha().digest(), b1.sha().digest(), delta) |
754 |
], |
755 |
list(deltify_pack_objects([(b1, b""), (b2, b"")]))) |
756 |
|
757 |
|
758 |
class TestPackStreamReader(TestCase): |
759 |
|
760 |
def test_read_objects_emtpy(self): |
761 |
f = BytesIO() |
762 |
build_pack(f, []) |
763 |
reader = PackStreamReader(f.read) |
764 |
self.assertEqual(0, len(list(reader.read_objects()))) |
765 |
|
766 |
def test_read_objects(self): |
767 |
f = BytesIO() |
768 |
entries = build_pack(f, [ |
769 |
(Blob.type_num, b'blob'),
|
770 |
(OFS_DELTA, (0, b'blob1')), |
771 |
]) |
772 |
reader = PackStreamReader(f.read) |
773 |
objects = list(reader.read_objects(compute_crc32=True)) |
774 |
self.assertEqual(2, len(objects)) |
775 |
|
776 |
unpacked_blob, unpacked_delta = objects |
777 |
|
778 |
self.assertEqual(entries[0][0], unpacked_blob.offset) |
779 |
self.assertEqual(Blob.type_num, unpacked_blob.pack_type_num)
|
780 |
self.assertEqual(Blob.type_num, unpacked_blob.obj_type_num)
|
781 |
self.assertEqual(None, unpacked_blob.delta_base) |
782 |
self.assertEqual(b'blob', b''.join(unpacked_blob.decomp_chunks)) |
783 |
self.assertEqual(entries[0][4], unpacked_blob.crc32) |
784 |
|
785 |
self.assertEqual(entries[1][0], unpacked_delta.offset) |
786 |
self.assertEqual(OFS_DELTA, unpacked_delta.pack_type_num)
|
787 |
self.assertEqual(None, unpacked_delta.obj_type_num) |
788 |
self.assertEqual(unpacked_delta.offset - unpacked_blob.offset,
|
789 |
unpacked_delta.delta_base) |
790 |
delta = create_delta(b'blob', b'blob1') |
791 |
self.assertEqual(delta, b''.join(unpacked_delta.decomp_chunks)) |
792 |
self.assertEqual(entries[1][4], unpacked_delta.crc32) |
793 |
|
794 |
def test_read_objects_buffered(self): |
795 |
f = BytesIO() |
796 |
build_pack(f, [ |
797 |
(Blob.type_num, b'blob'),
|
798 |
(OFS_DELTA, (0, b'blob1')), |
799 |
]) |
800 |
reader = PackStreamReader(f.read, zlib_bufsize=4)
|
801 |
self.assertEqual(2, len(list(reader.read_objects()))) |
802 |
|
803 |
def test_read_objects_empty(self): |
804 |
reader = PackStreamReader(BytesIO().read) |
805 |
self.assertEqual([], list(reader.read_objects())) |
806 |
|
807 |
|
808 |
class TestPackIterator(DeltaChainIterator): |
809 |
|
810 |
_compute_crc32 = True
|
811 |
|
812 |
def __init__(self, *args, **kwargs): |
813 |
super(TestPackIterator, self).__init__(*args, **kwargs) |
814 |
self._unpacked_offsets = set() |
815 |
|
816 |
def _result(self, unpacked): |
817 |
"""Return entries in the same format as build_pack."""
|
818 |
return (unpacked.offset, unpacked.obj_type_num,
|
819 |
b''.join(unpacked.obj_chunks), unpacked.sha(), unpacked.crc32)
|
820 |
|
821 |
def _resolve_object(self, offset, pack_type_num, base_chunks): |
822 |
assert offset not in self._unpacked_offsets, ( |
823 |
'Attempted to re-inflate offset %i' % offset)
|
824 |
self._unpacked_offsets.add(offset)
|
825 |
return super(TestPackIterator, self)._resolve_object( |
826 |
offset, pack_type_num, base_chunks) |
827 |
|
828 |
|
829 |
class DeltaChainIteratorTests(TestCase): |
830 |
|
831 |
def setUp(self): |
832 |
super(DeltaChainIteratorTests, self).setUp() |
833 |
self.store = MemoryObjectStore()
|
834 |
self.fetched = set() |
835 |
|
836 |
def store_blobs(self, blobs_data): |
837 |
blobs = [] |
838 |
for data in blobs_data: |
839 |
blob = make_object(Blob, data=data) |
840 |
blobs.append(blob) |
841 |
self.store.add_object(blob)
|
842 |
return blobs
|
843 |
|
844 |
def get_raw_no_repeat(self, bin_sha): |
845 |
"""Wrapper around store.get_raw that doesn't allow repeat lookups."""
|
846 |
hex_sha = sha_to_hex(bin_sha) |
847 |
self.assertFalse(hex_sha in self.fetched, |
848 |
'Attempted to re-fetch object %s' % hex_sha)
|
849 |
self.fetched.add(hex_sha)
|
850 |
return self.store.get_raw(hex_sha) |
851 |
|
852 |
def make_pack_iter(self, f, thin=None): |
853 |
if thin is None: |
854 |
thin = bool(list(self.store)) |
855 |
resolve_ext_ref = thin and self.get_raw_no_repeat or None |
856 |
data = PackData('test.pack', file=f)
|
857 |
return TestPackIterator.for_pack_data(
|
858 |
data, resolve_ext_ref=resolve_ext_ref) |
859 |
|
860 |
def assertEntriesMatch(self, expected_indexes, entries, pack_iter): |
861 |
expected = [entries[i] for i in expected_indexes] |
862 |
self.assertEqual(expected, list(pack_iter._walk_all_chains())) |
863 |
|
864 |
def test_no_deltas(self): |
865 |
f = BytesIO() |
866 |
entries = build_pack(f, [ |
867 |
(Commit.type_num, b'commit'),
|
868 |
(Blob.type_num, b'blob'),
|
869 |
(Tree.type_num, b'tree'),
|
870 |
]) |
871 |
self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f)) |
872 |
|
873 |
def test_ofs_deltas(self): |
874 |
f = BytesIO() |
875 |
entries = build_pack(f, [ |
876 |
(Blob.type_num, b'blob'),
|
877 |
(OFS_DELTA, (0, b'blob1')), |
878 |
(OFS_DELTA, (0, b'blob2')), |
879 |
]) |
880 |
self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f)) |
881 |
|
882 |
def test_ofs_deltas_chain(self): |
883 |
f = BytesIO() |
884 |
entries = build_pack(f, [ |
885 |
(Blob.type_num, b'blob'),
|
886 |
(OFS_DELTA, (0, b'blob1')), |
887 |
(OFS_DELTA, (1, b'blob2')), |
888 |
]) |
889 |
self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f)) |
890 |
|
891 |
def test_ref_deltas(self): |
892 |
f = BytesIO() |
893 |
entries = build_pack(f, [ |
894 |
(REF_DELTA, (1, b'blob1')), |
895 |
(Blob.type_num, (b'blob')),
|
896 |
(REF_DELTA, (1, b'blob2')), |
897 |
]) |
898 |
self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f)) |
899 |
|
900 |
def test_ref_deltas_chain(self): |
901 |
f = BytesIO() |
902 |
entries = build_pack(f, [ |
903 |
(REF_DELTA, (2, b'blob1')), |
904 |
(Blob.type_num, (b'blob')),
|
905 |
(REF_DELTA, (1, b'blob2')), |
906 |
]) |
907 |
self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f)) |
908 |
|
909 |
def test_ofs_and_ref_deltas(self): |
910 |
# Deltas pending on this offset are popped before deltas depending on
|
911 |
# this ref.
|
912 |
f = BytesIO() |
913 |
entries = build_pack(f, [ |
914 |
(REF_DELTA, (1, b'blob1')), |
915 |
(Blob.type_num, (b'blob')),
|
916 |
(OFS_DELTA, (1, b'blob2')), |
917 |
]) |
918 |
self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f)) |
919 |
|
920 |
def test_mixed_chain(self): |
921 |
f = BytesIO() |
922 |
entries = build_pack(f, [ |
923 |
(Blob.type_num, b'blob'),
|
924 |
(REF_DELTA, (2, b'blob2')), |
925 |
(OFS_DELTA, (0, b'blob1')), |
926 |
(OFS_DELTA, (1, b'blob3')), |
927 |
(OFS_DELTA, (0, b'bob')), |
928 |
]) |
929 |
self.assertEntriesMatch([0, 2, 4, 1, 3], entries, |
930 |
self.make_pack_iter(f))
|
931 |
|
932 |
def test_long_chain(self): |
933 |
n = 100
|
934 |
objects_spec = [(Blob.type_num, b'blob')]
|
935 |
for i in range(n): |
936 |
objects_spec.append((OFS_DELTA, (i, b'blob' + str(i).encode('ascii')))) |
937 |
f = BytesIO() |
938 |
entries = build_pack(f, objects_spec) |
939 |
self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f)) |
940 |
|
941 |
def test_branchy_chain(self): |
942 |
n = 100
|
943 |
objects_spec = [(Blob.type_num, b'blob')]
|
944 |
for i in range(n): |
945 |
objects_spec.append((OFS_DELTA, (0, b'blob' + str(i).encode('ascii')))) |
946 |
f = BytesIO() |
947 |
entries = build_pack(f, objects_spec) |
948 |
self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f)) |
949 |
|
950 |
def test_ext_ref(self): |
951 |
blob, = self.store_blobs([b'blob']) |
952 |
f = BytesIO() |
953 |
entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))],
|
954 |
store=self.store)
|
955 |
pack_iter = self.make_pack_iter(f)
|
956 |
self.assertEntriesMatch([0], entries, pack_iter) |
957 |
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
958 |
|
959 |
def test_ext_ref_chain(self): |
960 |
blob, = self.store_blobs([b'blob']) |
961 |
f = BytesIO() |
962 |
entries = build_pack(f, [ |
963 |
(REF_DELTA, (1, b'blob2')), |
964 |
(REF_DELTA, (blob.id, b'blob1')),
|
965 |
], store=self.store)
|
966 |
pack_iter = self.make_pack_iter(f)
|
967 |
self.assertEntriesMatch([1, 0], entries, pack_iter) |
968 |
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
969 |
|
970 |
def test_ext_ref_chain_degenerate(self): |
971 |
# Test a degenerate case where the sender is sending a REF_DELTA
|
972 |
# object that expands to an object already in the repository.
|
973 |
blob, = self.store_blobs([b'blob']) |
974 |
blob2, = self.store_blobs([b'blob2']) |
975 |
assert blob.id < blob2.id
|
976 |
|
977 |
f = BytesIO() |
978 |
entries = build_pack(f, [ |
979 |
(REF_DELTA, (blob.id, b'blob2')),
|
980 |
(REF_DELTA, (0, b'blob3')), |
981 |
], store=self.store)
|
982 |
pack_iter = self.make_pack_iter(f)
|
983 |
self.assertEntriesMatch([0, 1], entries, pack_iter) |
984 |
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
985 |
|
986 |
def test_ext_ref_multiple_times(self): |
987 |
blob, = self.store_blobs([b'blob']) |
988 |
f = BytesIO() |
989 |
entries = build_pack(f, [ |
990 |
(REF_DELTA, (blob.id, b'blob1')),
|
991 |
(REF_DELTA, (blob.id, b'blob2')),
|
992 |
], store=self.store)
|
993 |
pack_iter = self.make_pack_iter(f)
|
994 |
self.assertEntriesMatch([0, 1], entries, pack_iter) |
995 |
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
996 |
|
997 |
def test_multiple_ext_refs(self): |
998 |
b1, b2 = self.store_blobs([b'foo', b'bar']) |
999 |
f = BytesIO() |
1000 |
entries = build_pack(f, [ |
1001 |
(REF_DELTA, (b1.id, b'foo1')),
|
1002 |
(REF_DELTA, (b2.id, b'bar2')),
|
1003 |
], store=self.store)
|
1004 |
pack_iter = self.make_pack_iter(f)
|
1005 |
self.assertEntriesMatch([0, 1], entries, pack_iter) |
1006 |
self.assertEqual([hex_to_sha(b1.id), hex_to_sha(b2.id)],
|
1007 |
pack_iter.ext_refs()) |
1008 |
|
1009 |
def test_bad_ext_ref_non_thin_pack(self): |
1010 |
blob, = self.store_blobs([b'blob']) |
1011 |
f = BytesIO() |
1012 |
entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))],
|
1013 |
store=self.store)
|
1014 |
pack_iter = self.make_pack_iter(f, thin=False) |
1015 |
try:
|
1016 |
list(pack_iter._walk_all_chains())
|
1017 |
self.fail()
|
1018 |
except KeyError as e: |
1019 |
self.assertEqual(([blob.id],), e.args)
|
1020 |
|
1021 |
def test_bad_ext_ref_thin_pack(self): |
1022 |
b1, b2, b3 = self.store_blobs([b'foo', b'bar', b'baz']) |
1023 |
f = BytesIO() |
1024 |
build_pack(f, [ |
1025 |
(REF_DELTA, (1, b'foo99')), |
1026 |
(REF_DELTA, (b1.id, b'foo1')),
|
1027 |
(REF_DELTA, (b2.id, b'bar2')),
|
1028 |
(REF_DELTA, (b3.id, b'baz3')),
|
1029 |
], store=self.store)
|
1030 |
del self.store[b2.id] |
1031 |
del self.store[b3.id] |
1032 |
pack_iter = self.make_pack_iter(f)
|
1033 |
try:
|
1034 |
list(pack_iter._walk_all_chains())
|
1035 |
self.fail()
|
1036 |
except KeyError as e: |
1037 |
self.assertEqual((sorted([b2.id, b3.id]),), (sorted(e.args[0]),)) |
1038 |
|
1039 |
|
1040 |
class DeltaEncodeSizeTests(TestCase): |
1041 |
|
1042 |
def test_basic(self): |
1043 |
self.assertEqual(b'\x00', _delta_encode_size(0)) |
1044 |
self.assertEqual(b'\x01', _delta_encode_size(1)) |
1045 |
self.assertEqual(b'\xfa\x01', _delta_encode_size(250)) |
1046 |
self.assertEqual(b'\xe8\x07', _delta_encode_size(1000)) |
1047 |
self.assertEqual(b'\xa0\x8d\x06', _delta_encode_size(100000)) |
1048 |
|
1049 |
|
1050 |
class EncodeCopyOperationTests(TestCase): |
1051 |
|
1052 |
def test_basic(self): |
1053 |
self.assertEqual(b'\x80', _encode_copy_operation(0, 0)) |
1054 |
self.assertEqual(b'\x91\x01\x0a', _encode_copy_operation(1, 10)) |
1055 |
self.assertEqual(b'\xb1\x64\xe8\x03', _encode_copy_operation(100, 1000)) |
1056 |
self.assertEqual(b'\x93\xe8\x03\x01', _encode_copy_operation(1000, 1)) |