Statistics
| Revision:

gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_repository.py @ 959

History | View | Annotate | Download (33.9 KB)

1
# -*- coding: utf-8 -*-
2
# test_repository.py -- tests for repository.py
3
# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
4
#
5
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6
# General Public License as public by the Free Software Foundation; version 2.0
7
# or (at your option) any later version. You can redistribute it and/or
8
# modify it under the terms of either of these two licenses.
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
#
16
# You should have received a copy of the licenses; if not, see
17
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19
# License, Version 2.0.
20
#
21

    
22
"""Tests for the repository."""
23

    
24
import locale
25
import os
26
import stat
27
import shutil
28
import sys
29
import tempfile
30
import warnings
31

    
32
from dulwich import errors
33
from dulwich.object_store import (
34
    tree_lookup_path,
35
    )
36
from dulwich import objects
37
from dulwich.config import Config
38
from dulwich.errors import NotGitRepository
39
from dulwich.repo import (
40
    Repo,
41
    MemoryRepo,
42
    )
43
from dulwich.tests import (
44
    TestCase,
45
    skipIf,
46
    )
47
from dulwich.tests.utils import (
48
    open_repo,
49
    tear_down_repo,
50
    setup_warning_catcher,
51
    )
52

    
53
missing_sha = b'b91fa4d900e17e99b433218e988c4eb4a3e9a097'
54

    
55

    
56
class CreateRepositoryTests(TestCase):
57

    
58
    def assertFileContentsEqual(self, expected, repo, path):
59
        f = repo.get_named_file(path)
60
        if not f:
61
            self.assertEqual(expected, None)
62
        else:
63
            with f:
64
                self.assertEqual(expected, f.read())
65

    
66
    def _check_repo_contents(self, repo, expect_bare):
67
        self.assertEqual(expect_bare, repo.bare)
68
        self.assertFileContentsEqual(b'Unnamed repository', repo, 'description')
69
        self.assertFileContentsEqual(b'', repo, os.path.join('info', 'exclude'))
70
        self.assertFileContentsEqual(None, repo, 'nonexistent file')
71
        barestr = b'bare = ' + str(expect_bare).lower().encode('ascii')
72
        with repo.get_named_file('config') as f:
73
            config_text = f.read()
74
            self.assertTrue(barestr in config_text, "%r" % config_text)
75
        expect_filemode = sys.platform != 'win32'
76
        barestr = b'filemode = ' + str(expect_filemode).lower().encode('ascii')
77
        with repo.get_named_file('config') as f:
78
            config_text = f.read()
79
            self.assertTrue(barestr in config_text, "%r" % config_text)
80

    
81

    
82
    def test_create_memory(self):
83
        repo = MemoryRepo.init_bare([], {})
84
        self._check_repo_contents(repo, True)
85

    
86
    def test_create_disk_bare(self):
87
        tmp_dir = tempfile.mkdtemp()
88
        self.addCleanup(shutil.rmtree, tmp_dir)
89
        repo = Repo.init_bare(tmp_dir)
90
        self.assertEqual(tmp_dir, repo._controldir)
91
        self._check_repo_contents(repo, True)
92

    
93
    def test_create_disk_non_bare(self):
94
        tmp_dir = tempfile.mkdtemp()
95
        self.addCleanup(shutil.rmtree, tmp_dir)
96
        repo = Repo.init(tmp_dir)
97
        self.assertEqual(os.path.join(tmp_dir, '.git'), repo._controldir)
98
        self._check_repo_contents(repo, False)
99

    
100

    
101
class RepositoryRootTests(TestCase):
102

    
103
    def mkdtemp(self):
104
        return tempfile.mkdtemp()
105

    
106
    def open_repo(self, name):
107
        temp_dir = self.mkdtemp()
108
        repo = open_repo(name, temp_dir)
109
        self.addCleanup(tear_down_repo, repo)
110
        return repo
111

    
112
    def test_simple_props(self):
113
        r = self.open_repo('a.git')
114
        self.assertEqual(r.controldir(), r.path)
115

    
116
    def test_setitem(self):
117
        r = self.open_repo('a.git')
118
        r[b"refs/tags/foo"] = b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'
119
        self.assertEqual(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
120
                          r[b"refs/tags/foo"].id)
121

    
122
    def test_getitem_unicode(self):
123
        r = self.open_repo('a.git')
124

    
125
        test_keys = [
126
            (b'refs/heads/master', True),
127
            (b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', True),
128
            (b'11' * 19 + b'--', False),
129
        ]
130

    
131
        for k, contained in test_keys:
132
            self.assertEqual(k in r, contained)
133

    
134
        # Avoid deprecation warning under Py3.2+
135
        if getattr(self, 'assertRaisesRegex', None):
136
            assertRaisesRegexp = self.assertRaisesRegex
137
        else:
138
            assertRaisesRegexp = self.assertRaisesRegexp
139
        for k, _ in test_keys:
140
            assertRaisesRegexp(
141
                TypeError, "'name' must be bytestring, not int",
142
                r.__getitem__, 12
143
            )
144

    
145
    def test_delitem(self):
146
        r = self.open_repo('a.git')
147

    
148
        del r[b'refs/heads/master']
149
        self.assertRaises(KeyError, lambda: r[b'refs/heads/master'])
150

    
151
        del r[b'HEAD']
152
        self.assertRaises(KeyError, lambda: r[b'HEAD'])
153

    
154
        self.assertRaises(ValueError, r.__delitem__, b'notrefs/foo')
155

    
156
    def test_get_refs(self):
157
        r = self.open_repo('a.git')
158
        self.assertEqual({
159
            b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
160
            b'refs/heads/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
161
            b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
162
            b'refs/tags/mytag-packed': b'b0931cadc54336e78a1d980420e3268903b57a50',
163
            }, r.get_refs())
164

    
165
    def test_head(self):
166
        r = self.open_repo('a.git')
167
        self.assertEqual(r.head(), b'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
168

    
169
    def test_get_object(self):
170
        r = self.open_repo('a.git')
171
        obj = r.get_object(r.head())
172
        self.assertEqual(obj.type_name, b'commit')
173

    
174
    def test_get_object_non_existant(self):
175
        r = self.open_repo('a.git')
176
        self.assertRaises(KeyError, r.get_object, missing_sha)
177

    
178
    def test_contains_object(self):
179
        r = self.open_repo('a.git')
180
        self.assertTrue(r.head() in r)
181

    
182
    def test_contains_ref(self):
183
        r = self.open_repo('a.git')
184
        self.assertTrue(b"HEAD" in r)
185

    
186
    def test_get_no_description(self):
187
        r = self.open_repo('a.git')
188
        self.assertIs(None, r.get_description())
189

    
190
    def test_get_description(self):
191
        r = self.open_repo('a.git')
192
        with open(os.path.join(r.path, 'description'), 'wb') as f:
193
            f.write(b"Some description")
194
        self.assertEqual(b"Some description", r.get_description())
195

    
196
    def test_set_description(self):
197
        r = self.open_repo('a.git')
198
        description = b"Some description"
199
        r.set_description(description)
200
        self.assertEqual(description, r.get_description())
201

    
202
    def test_contains_missing(self):
203
        r = self.open_repo('a.git')
204
        self.assertFalse(b"bar" in r)
205

    
206
    def test_get_peeled(self):
207
        # unpacked ref
208
        r = self.open_repo('a.git')
209
        tag_sha = b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a'
210
        self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head())
211
        self.assertEqual(r.get_peeled(b'refs/tags/mytag'), r.head())
212

    
213
        # packed ref with cached peeled value
214
        packed_tag_sha = b'b0931cadc54336e78a1d980420e3268903b57a50'
215
        parent_sha = r[r.head()].parents[0]
216
        self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha)
217
        self.assertEqual(r.get_peeled(b'refs/tags/mytag-packed'), parent_sha)
218

    
219
        # TODO: add more corner cases to test repo
220

    
221
    def test_get_peeled_not_tag(self):
222
        r = self.open_repo('a.git')
223
        self.assertEqual(r.get_peeled(b'HEAD'), r.head())
224

    
225
    def test_get_walker(self):
226
        r = self.open_repo('a.git')
227
        # include defaults to [r.head()]
228
        self.assertEqual([e.commit.id for e in r.get_walker()],
229
                         [r.head(), b'2a72d929692c41d8554c07f6301757ba18a65d91'])
230
        self.assertEqual(
231
            [e.commit.id for e in r.get_walker([b'2a72d929692c41d8554c07f6301757ba18a65d91'])],
232
            [b'2a72d929692c41d8554c07f6301757ba18a65d91'])
233
        self.assertEqual(
234
            [e.commit.id for e in r.get_walker(b'2a72d929692c41d8554c07f6301757ba18a65d91')],
235
            [b'2a72d929692c41d8554c07f6301757ba18a65d91'])
236

    
237
    def test_clone(self):
238
        r = self.open_repo('a.git')
239
        tmp_dir = self.mkdtemp()
240
        self.addCleanup(shutil.rmtree, tmp_dir)
241
        with r.clone(tmp_dir, mkdir=False) as t:
242
            self.assertEqual({
243
                b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
244
                b'refs/remotes/origin/master':
245
                    b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
246
                b'refs/heads/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
247
                b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
248
                b'refs/tags/mytag-packed':
249
                    b'b0931cadc54336e78a1d980420e3268903b57a50',
250
                }, t.refs.as_dict())
251
            shas = [e.commit.id for e in r.get_walker()]
252
            self.assertEqual(shas, [t.head(),
253
                             b'2a72d929692c41d8554c07f6301757ba18a65d91'])
254
            c = t.get_config()
255
            encoded_path = r.path
256
            if not isinstance(encoded_path, bytes):
257
                encoded_path = encoded_path.encode(sys.getfilesystemencoding())
258
            self.assertEqual(encoded_path, c.get((b'remote', b'origin'), b'url'))
259
            self.assertEqual(
260
                b'+refs/heads/*:refs/remotes/origin/*',
261
                c.get((b'remote', b'origin'), b'fetch'))
262

    
263
    def test_clone_no_head(self):
264
        temp_dir = self.mkdtemp()
265
        self.addCleanup(shutil.rmtree, temp_dir)
266
        repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos')
267
        dest_dir = os.path.join(temp_dir, 'a.git')
268
        shutil.copytree(os.path.join(repo_dir, 'a.git'),
269
                        dest_dir, symlinks=True)
270
        r = Repo(dest_dir)
271
        del r.refs[b"refs/heads/master"]
272
        del r.refs[b"HEAD"]
273
        t = r.clone(os.path.join(temp_dir, 'b.git'), mkdir=True)
274
        self.assertEqual({
275
            b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
276
            b'refs/tags/mytag-packed':
277
                b'b0931cadc54336e78a1d980420e3268903b57a50',
278
            }, t.refs.as_dict())
279

    
280
    def test_clone_empty(self):
281
        """Test clone() doesn't crash if HEAD points to a non-existing ref.
282

283
        This simulates cloning server-side bare repository either when it is
284
        still empty or if user renames master branch and pushes private repo
285
        to the server.
286
        Non-bare repo HEAD always points to an existing ref.
287
        """
288
        r = self.open_repo('empty.git')
289
        tmp_dir = self.mkdtemp()
290
        self.addCleanup(shutil.rmtree, tmp_dir)
291
        r.clone(tmp_dir, mkdir=False, bare=True)
292

    
293
    def test_merge_history(self):
294
        r = self.open_repo('simple_merge.git')
295
        shas = [e.commit.id for e in r.get_walker()]
296
        self.assertEqual(shas, [b'5dac377bdded4c9aeb8dff595f0faeebcc8498cc',
297
                                b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd',
298
                                b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6',
299
                                b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
300
                                b'0d89f20333fbb1d2f3a94da77f4981373d8f4310'])
301

    
302
    def test_out_of_order_merge(self):
303
        """Test that revision history is ordered by date, not parent order."""
304
        r = self.open_repo('ooo_merge.git')
305
        shas = [e.commit.id for e in r.get_walker()]
306
        self.assertEqual(shas, [b'7601d7f6231db6a57f7bbb79ee52e4d462fd44d1',
307
                                b'f507291b64138b875c28e03469025b1ea20bc614',
308
                                b'fb5b0425c7ce46959bec94d54b9a157645e114f5',
309
                                b'f9e39b120c68182a4ba35349f832d0e4e61f485c'])
310

    
311
    def test_get_tags_empty(self):
312
        r = self.open_repo('ooo_merge.git')
313
        self.assertEqual({}, r.refs.as_dict(b'refs/tags'))
314

    
315
    def test_get_config(self):
316
        r = self.open_repo('ooo_merge.git')
317
        self.assertIsInstance(r.get_config(), Config)
318

    
319
    def test_get_config_stack(self):
320
        r = self.open_repo('ooo_merge.git')
321
        self.assertIsInstance(r.get_config_stack(), Config)
322

    
323
    @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
324
    def test_submodule(self):
325
        temp_dir = self.mkdtemp()
326
        self.addCleanup(shutil.rmtree, temp_dir)
327
        repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos')
328
        shutil.copytree(os.path.join(repo_dir, 'a.git'),
329
                        os.path.join(temp_dir, 'a.git'), symlinks=True)
330
        rel = os.path.relpath(os.path.join(repo_dir, 'submodule'), temp_dir)
331
        os.symlink(os.path.join(rel, 'dotgit'), os.path.join(temp_dir, '.git'))
332
        with Repo(temp_dir) as r:
333
            self.assertEqual(r.head(), b'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
334

    
335
    def test_common_revisions(self):
336
        """
337
        This test demonstrates that ``find_common_revisions()`` actually returns
338
        common heads, not revisions; dulwich already uses
339
        ``find_common_revisions()`` in such a manner (see
340
        ``Repo.fetch_objects()``).
341
        """
342

    
343
        expected_shas = set([b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e'])
344

    
345
        # Source for objects.
346
        r_base = self.open_repo('simple_merge.git')
347

    
348
        # Re-create each-side of the merge in simple_merge.git.
349
        #
350
        # Since the trees and blobs are missing, the repository created is
351
        # corrupted, but we're only checking for commits for the purpose of this
352
        # test, so it's immaterial.
353
        r1_dir = self.mkdtemp()
354
        self.addCleanup(shutil.rmtree, r1_dir)
355
        r1_commits = [b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd', # HEAD
356
                      b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
357
                      b'0d89f20333fbb1d2f3a94da77f4981373d8f4310']
358

    
359
        r2_dir = self.mkdtemp()
360
        self.addCleanup(shutil.rmtree, r2_dir)
361
        r2_commits = [b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6', # HEAD
362
                      b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
363
                      b'0d89f20333fbb1d2f3a94da77f4981373d8f4310']
364

    
365
        r1 = Repo.init_bare(r1_dir)
366
        for c in r1_commits:
367
            r1.object_store.add_object(r_base.get_object(c))
368
        r1.refs[b'HEAD'] = r1_commits[0]
369

    
370
        r2 = Repo.init_bare(r2_dir)
371
        for c in r2_commits:
372
            r2.object_store.add_object(r_base.get_object(c))
373
        r2.refs[b'HEAD'] = r2_commits[0]
374

    
375
        # Finally, the 'real' testing!
376
        shas = r2.object_store.find_common_revisions(r1.get_graph_walker())
377
        self.assertEqual(set(shas), expected_shas)
378

    
379
        shas = r1.object_store.find_common_revisions(r2.get_graph_walker())
380
        self.assertEqual(set(shas), expected_shas)
381

    
382
    def test_shell_hook_pre_commit(self):
383
        if os.name != 'posix':
384
            self.skipTest('shell hook tests requires POSIX shell')
385

    
386
        pre_commit_fail = """#!/bin/sh
387
exit 1
388
"""
389

    
390
        pre_commit_success = """#!/bin/sh
391
exit 0
392
"""
393

    
394
        repo_dir = os.path.join(self.mkdtemp())
395
        r = Repo.init(repo_dir)
396
        self.addCleanup(shutil.rmtree, repo_dir)
397

    
398
        pre_commit = os.path.join(r.controldir(), 'hooks', 'pre-commit')
399

    
400
        with open(pre_commit, 'w') as f:
401
            f.write(pre_commit_fail)
402
        os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
403

    
404
        self.assertRaises(errors.CommitError, r.do_commit, 'failed commit',
405
                          committer='Test Committer <test@nodomain.com>',
406
                          author='Test Author <test@nodomain.com>',
407
                          commit_timestamp=12345, commit_timezone=0,
408
                          author_timestamp=12345, author_timezone=0)
409

    
410
        with open(pre_commit, 'w') as f:
411
            f.write(pre_commit_success)
412
        os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
413

    
414
        commit_sha = r.do_commit(
415
            b'empty commit',
416
            committer=b'Test Committer <test@nodomain.com>',
417
            author=b'Test Author <test@nodomain.com>',
418
            commit_timestamp=12395, commit_timezone=0,
419
            author_timestamp=12395, author_timezone=0)
420
        self.assertEqual([], r[commit_sha].parents)
421

    
422
    def test_shell_hook_commit_msg(self):
423
        if os.name != 'posix':
424
            self.skipTest('shell hook tests requires POSIX shell')
425

    
426
        commit_msg_fail = """#!/bin/sh
427
exit 1
428
"""
429

    
430
        commit_msg_success = """#!/bin/sh
431
exit 0
432
"""
433

    
434
        repo_dir = self.mkdtemp()
435
        r = Repo.init(repo_dir)
436
        self.addCleanup(shutil.rmtree, repo_dir)
437

    
438
        commit_msg = os.path.join(r.controldir(), 'hooks', 'commit-msg')
439

    
440
        with open(commit_msg, 'w') as f:
441
            f.write(commit_msg_fail)
442
        os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
443

    
444
        self.assertRaises(errors.CommitError, r.do_commit, b'failed commit',
445
                          committer=b'Test Committer <test@nodomain.com>',
446
                          author=b'Test Author <test@nodomain.com>',
447
                          commit_timestamp=12345, commit_timezone=0,
448
                          author_timestamp=12345, author_timezone=0)
449

    
450
        with open(commit_msg, 'w') as f:
451
            f.write(commit_msg_success)
452
        os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
453

    
454
        commit_sha = r.do_commit(
455
            b'empty commit',
456
            committer=b'Test Committer <test@nodomain.com>',
457
            author=b'Test Author <test@nodomain.com>',
458
            commit_timestamp=12395, commit_timezone=0,
459
            author_timestamp=12395, author_timezone=0)
460
        self.assertEqual([], r[commit_sha].parents)
461

    
462
    def test_shell_hook_post_commit(self):
463
        if os.name != 'posix':
464
            self.skipTest('shell hook tests requires POSIX shell')
465

    
466
        repo_dir = self.mkdtemp()
467

    
468
        r = Repo.init(repo_dir)
469
        self.addCleanup(shutil.rmtree, repo_dir)
470

    
471
        (fd, path) = tempfile.mkstemp(dir=repo_dir)
472
        os.close(fd)
473
        post_commit_msg = """#!/bin/sh
474
rm """ + path + """
475
"""
476

    
477
        root_sha = r.do_commit(
478
            b'empty commit',
479
            committer=b'Test Committer <test@nodomain.com>',
480
            author=b'Test Author <test@nodomain.com>',
481
            commit_timestamp=12345, commit_timezone=0,
482
            author_timestamp=12345, author_timezone=0)
483
        self.assertEqual([], r[root_sha].parents)
484

    
485
        post_commit = os.path.join(r.controldir(), 'hooks', 'post-commit')
486

    
487
        with open(post_commit, 'wb') as f:
488
            f.write(post_commit_msg.encode(locale.getpreferredencoding()))
489
        os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
490

    
491
        commit_sha = r.do_commit(
492
            b'empty commit',
493
            committer=b'Test Committer <test@nodomain.com>',
494
            author=b'Test Author <test@nodomain.com>',
495
            commit_timestamp=12345, commit_timezone=0,
496
            author_timestamp=12345, author_timezone=0)
497
        self.assertEqual([root_sha], r[commit_sha].parents)
498

    
499
        self.assertFalse(os.path.exists(path))
500

    
501
        post_commit_msg_fail = """#!/bin/sh
502
exit 1
503
"""
504
        with open(post_commit, 'w') as f:
505
            f.write(post_commit_msg_fail)
506
        os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
507

    
508
        warnings.simplefilter("always", UserWarning)
509
        self.addCleanup(warnings.resetwarnings)
510
        warnings_list, restore_warnings = setup_warning_catcher()
511
        self.addCleanup(restore_warnings)
512

    
513
        commit_sha2 = r.do_commit(
514
            b'empty commit',
515
            committer=b'Test Committer <test@nodomain.com>',
516
            author=b'Test Author <test@nodomain.com>',
517
            commit_timestamp=12345, commit_timezone=0,
518
            author_timestamp=12345, author_timezone=0)
519
        self.assertEqual(len(warnings_list), 1, warnings_list)
520
        self.assertIsInstance(warnings_list[-1], UserWarning)
521
        self.assertTrue("post-commit hook failed: " in str(warnings_list[-1]))
522
        self.assertEqual([commit_sha], r[commit_sha2].parents)
523

    
524
    def test_as_dict(self):
525
        def check(repo):
526
            self.assertEqual(repo.refs.subkeys(b'refs/tags'), repo.refs.subkeys(b'refs/tags/'))
527
            self.assertEqual(repo.refs.as_dict(b'refs/tags'), repo.refs.as_dict(b'refs/tags/'))
528
            self.assertEqual(repo.refs.as_dict(b'refs/heads'), repo.refs.as_dict(b'refs/heads/'))
529

    
530
        bare = self.open_repo('a.git')
531
        tmp_dir = self.mkdtemp()
532
        self.addCleanup(shutil.rmtree, tmp_dir)
533
        with bare.clone(tmp_dir, mkdir=False) as nonbare:
534
            check(nonbare)
535
            check(bare)
536

    
537
    def test_working_tree(self):
538
        temp_dir = tempfile.mkdtemp()
539
        self.addCleanup(shutil.rmtree, temp_dir)
540
        worktree_temp_dir = tempfile.mkdtemp()
541
        self.addCleanup(shutil.rmtree, worktree_temp_dir)
542
        r = Repo.init(temp_dir)
543
        root_sha = r.do_commit(
544
                b'empty commit',
545
                committer=b'Test Committer <test@nodomain.com>',
546
                author=b'Test Author <test@nodomain.com>',
547
                commit_timestamp=12345, commit_timezone=0,
548
                author_timestamp=12345, author_timezone=0)
549
        r.refs[b'refs/heads/master'] = root_sha
550
        w = Repo._init_new_working_directory(worktree_temp_dir, r)
551
        new_sha = w.do_commit(
552
                b'new commit',
553
                committer=b'Test Committer <test@nodomain.com>',
554
                author=b'Test Author <test@nodomain.com>',
555
                commit_timestamp=12345, commit_timezone=0,
556
                author_timestamp=12345, author_timezone=0)
557
        w.refs[b'HEAD'] = new_sha
558
        self.assertEqual(os.path.abspath(r.controldir()),
559
                         os.path.abspath(w.commondir()))
560
        self.assertEqual(r.refs.keys(), w.refs.keys())
561
        self.assertNotEqual(r.head(), w.head())
562

    
563

    
564
class BuildRepoRootTests(TestCase):
565
    """Tests that build on-disk repos from scratch.
566

567
    Repos live in a temp dir and are torn down after each test. They start with
568
    a single commit in master having single file named 'a'.
569
    """
570

    
571
    def get_repo_dir(self):
572
        return os.path.join(tempfile.mkdtemp(), 'test')
573

    
574
    def setUp(self):
575
        super(BuildRepoRootTests, self).setUp()
576
        self._repo_dir = self.get_repo_dir()
577
        os.makedirs(self._repo_dir)
578
        r = self._repo = Repo.init(self._repo_dir)
579
        self.addCleanup(tear_down_repo, r)
580
        self.assertFalse(r.bare)
581
        self.assertEqual(b'ref: refs/heads/master', r.refs.read_ref(b'HEAD'))
582
        self.assertRaises(KeyError, lambda: r.refs[b'refs/heads/master'])
583

    
584
        with open(os.path.join(r.path, 'a'), 'wb') as f:
585
            f.write(b'file contents')
586
        r.stage(['a'])
587
        commit_sha = r.do_commit(b'msg',
588
                                 committer=b'Test Committer <test@nodomain.com>',
589
                                 author=b'Test Author <test@nodomain.com>',
590
                                 commit_timestamp=12345, commit_timezone=0,
591
                                 author_timestamp=12345, author_timezone=0)
592
        self.assertEqual([], r[commit_sha].parents)
593
        self._root_commit = commit_sha
594

    
595
    def test_build_repo(self):
596
        r = self._repo
597
        self.assertEqual(b'ref: refs/heads/master', r.refs.read_ref(b'HEAD'))
598
        self.assertEqual(self._root_commit, r.refs[b'refs/heads/master'])
599
        expected_blob = objects.Blob.from_string(b'file contents')
600
        self.assertEqual(expected_blob.data, r[expected_blob.id].data)
601
        actual_commit = r[self._root_commit]
602
        self.assertEqual(b'msg', actual_commit.message)
603

    
604
    def test_commit_modified(self):
605
        r = self._repo
606
        with open(os.path.join(r.path, 'a'), 'wb') as f:
607
            f.write(b'new contents')
608
        r.stage(['a'])
609
        commit_sha = r.do_commit(b'modified a',
610
                                 committer=b'Test Committer <test@nodomain.com>',
611
                                 author=b'Test Author <test@nodomain.com>',
612
                                 commit_timestamp=12395, commit_timezone=0,
613
                                 author_timestamp=12395, author_timezone=0)
614
        self.assertEqual([self._root_commit], r[commit_sha].parents)
615
        a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b'a')
616
        self.assertEqual(stat.S_IFREG | 0o644, a_mode)
617
        self.assertEqual(b'new contents', r[a_id].data)
618

    
619
    @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
620
    def test_commit_symlink(self):
621
        r = self._repo
622
        os.symlink('a', os.path.join(r.path, 'b'))
623
        r.stage(['a', 'b'])
624
        commit_sha = r.do_commit(b'Symlink b',
625
                                 committer=b'Test Committer <test@nodomain.com>',
626
                                 author=b'Test Author <test@nodomain.com>',
627
                                 commit_timestamp=12395, commit_timezone=0,
628
                                 author_timestamp=12395, author_timezone=0)
629
        self.assertEqual([self._root_commit], r[commit_sha].parents)
630
        b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b'b')
631
        self.assertTrue(stat.S_ISLNK(b_mode))
632
        self.assertEqual(b'a', r[b_id].data)
633

    
634
    def test_commit_deleted(self):
635
        r = self._repo
636
        os.remove(os.path.join(r.path, 'a'))
637
        r.stage(['a'])
638
        commit_sha = r.do_commit(b'deleted a',
639
                                 committer=b'Test Committer <test@nodomain.com>',
640
                                 author=b'Test Author <test@nodomain.com>',
641
                                 commit_timestamp=12395, commit_timezone=0,
642
                                 author_timestamp=12395, author_timezone=0)
643
        self.assertEqual([self._root_commit], r[commit_sha].parents)
644
        self.assertEqual([], list(r.open_index()))
645
        tree = r[r[commit_sha].tree]
646
        self.assertEqual([], list(tree.iteritems()))
647

    
648
    def test_commit_follows(self):
649
        r = self._repo
650
        r.refs.set_symbolic_ref(b'HEAD', b'refs/heads/bla')
651
        commit_sha = r.do_commit(b'commit with strange character',
652
             committer=b'Test Committer <test@nodomain.com>',
653
             author=b'Test Author <test@nodomain.com>',
654
             commit_timestamp=12395, commit_timezone=0,
655
             author_timestamp=12395, author_timezone=0,
656
             ref=b'HEAD')
657
        self.assertEqual(commit_sha, r[b'refs/heads/bla'].id)
658

    
659
    def test_commit_encoding(self):
660
        r = self._repo
661
        commit_sha = r.do_commit(b'commit with strange character \xee',
662
             committer=b'Test Committer <test@nodomain.com>',
663
             author=b'Test Author <test@nodomain.com>',
664
             commit_timestamp=12395, commit_timezone=0,
665
             author_timestamp=12395, author_timezone=0,
666
             encoding=b"iso8859-1")
667
        self.assertEqual(b"iso8859-1", r[commit_sha].encoding)
668

    
669
    def test_commit_config_identity(self):
670
        # commit falls back to the users' identity if it wasn't specified
671
        r = self._repo
672
        c = r.get_config()
673
        c.set((b"user", ), b"name", b"Jelmer")
674
        c.set((b"user", ), b"email", b"jelmer@apache.org")
675
        c.write_to_path()
676
        commit_sha = r.do_commit(b'message')
677
        self.assertEqual(
678
            b"Jelmer <jelmer@apache.org>",
679
            r[commit_sha].author)
680
        self.assertEqual(
681
            b"Jelmer <jelmer@apache.org>",
682
            r[commit_sha].committer)
683

    
684
    def test_commit_config_identity_in_memoryrepo(self):
685
        # commit falls back to the users' identity if it wasn't specified
686
        r = MemoryRepo.init_bare([], {})
687
        c = r.get_config()
688
        c.set((b"user", ), b"name", b"Jelmer")
689
        c.set((b"user", ), b"email", b"jelmer@apache.org")
690

    
691
        commit_sha = r.do_commit(b'message', tree=objects.Tree().id)
692
        self.assertEqual(
693
            b"Jelmer <jelmer@apache.org>",
694
            r[commit_sha].author)
695
        self.assertEqual(
696
            b"Jelmer <jelmer@apache.org>",
697
            r[commit_sha].committer)
698

    
699
    def test_commit_fail_ref(self):
700
        r = self._repo
701

    
702
        def set_if_equals(name, old_ref, new_ref):
703
            return False
704
        r.refs.set_if_equals = set_if_equals
705

    
706
        def add_if_new(name, new_ref):
707
            self.fail('Unexpected call to add_if_new')
708
        r.refs.add_if_new = add_if_new
709

    
710
        old_shas = set(r.object_store)
711
        self.assertRaises(errors.CommitError, r.do_commit, b'failed commit',
712
                          committer=b'Test Committer <test@nodomain.com>',
713
                          author=b'Test Author <test@nodomain.com>',
714
                          commit_timestamp=12345, commit_timezone=0,
715
                          author_timestamp=12345, author_timezone=0)
716
        new_shas = set(r.object_store) - old_shas
717
        self.assertEqual(1, len(new_shas))
718
        # Check that the new commit (now garbage) was added.
719
        new_commit = r[new_shas.pop()]
720
        self.assertEqual(r[self._root_commit].tree, new_commit.tree)
721
        self.assertEqual(b'failed commit', new_commit.message)
722

    
723
    def test_commit_branch(self):
724
        r = self._repo
725

    
726
        commit_sha = r.do_commit(b'commit to branch',
727
             committer=b'Test Committer <test@nodomain.com>',
728
             author=b'Test Author <test@nodomain.com>',
729
             commit_timestamp=12395, commit_timezone=0,
730
             author_timestamp=12395, author_timezone=0,
731
             ref=b"refs/heads/new_branch")
732
        self.assertEqual(self._root_commit, r[b"HEAD"].id)
733
        self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id)
734
        self.assertEqual([], r[commit_sha].parents)
735
        self.assertTrue(b"refs/heads/new_branch" in r)
736

    
737
        new_branch_head = commit_sha
738

    
739
        commit_sha = r.do_commit(b'commit to branch 2',
740
             committer=b'Test Committer <test@nodomain.com>',
741
             author=b'Test Author <test@nodomain.com>',
742
             commit_timestamp=12395, commit_timezone=0,
743
             author_timestamp=12395, author_timezone=0,
744
             ref=b"refs/heads/new_branch")
745
        self.assertEqual(self._root_commit, r[b"HEAD"].id)
746
        self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id)
747
        self.assertEqual([new_branch_head], r[commit_sha].parents)
748

    
749
    def test_commit_merge_heads(self):
750
        r = self._repo
751
        merge_1 = r.do_commit(b'commit to branch 2',
752
             committer=b'Test Committer <test@nodomain.com>',
753
             author=b'Test Author <test@nodomain.com>',
754
             commit_timestamp=12395, commit_timezone=0,
755
             author_timestamp=12395, author_timezone=0,
756
             ref=b"refs/heads/new_branch")
757
        commit_sha = r.do_commit(b'commit with merge',
758
             committer=b'Test Committer <test@nodomain.com>',
759
             author=b'Test Author <test@nodomain.com>',
760
             commit_timestamp=12395, commit_timezone=0,
761
             author_timestamp=12395, author_timezone=0,
762
             merge_heads=[merge_1])
763
        self.assertEqual(
764
            [self._root_commit, merge_1],
765
            r[commit_sha].parents)
766

    
767
    def test_commit_dangling_commit(self):
768
        r = self._repo
769

    
770
        old_shas = set(r.object_store)
771
        old_refs = r.get_refs()
772
        commit_sha = r.do_commit(b'commit with no ref',
773
             committer=b'Test Committer <test@nodomain.com>',
774
             author=b'Test Author <test@nodomain.com>',
775
             commit_timestamp=12395, commit_timezone=0,
776
             author_timestamp=12395, author_timezone=0,
777
             ref=None)
778
        new_shas = set(r.object_store) - old_shas
779

    
780
        # New sha is added, but no new refs
781
        self.assertEqual(1, len(new_shas))
782
        new_commit = r[new_shas.pop()]
783
        self.assertEqual(r[self._root_commit].tree, new_commit.tree)
784
        self.assertEqual([], r[commit_sha].parents)
785
        self.assertEqual(old_refs, r.get_refs())
786

    
787
    def test_commit_dangling_commit_with_parents(self):
788
        r = self._repo
789

    
790
        old_shas = set(r.object_store)
791
        old_refs = r.get_refs()
792
        commit_sha = r.do_commit(b'commit with no ref',
793
             committer=b'Test Committer <test@nodomain.com>',
794
             author=b'Test Author <test@nodomain.com>',
795
             commit_timestamp=12395, commit_timezone=0,
796
             author_timestamp=12395, author_timezone=0,
797
             ref=None, merge_heads=[self._root_commit])
798
        new_shas = set(r.object_store) - old_shas
799

    
800
        # New sha is added, but no new refs
801
        self.assertEqual(1, len(new_shas))
802
        new_commit = r[new_shas.pop()]
803
        self.assertEqual(r[self._root_commit].tree, new_commit.tree)
804
        self.assertEqual([self._root_commit], r[commit_sha].parents)
805
        self.assertEqual(old_refs, r.get_refs())
806

    
807
    def test_stage_deleted(self):
808
        r = self._repo
809
        os.remove(os.path.join(r.path, 'a'))
810
        r.stage(['a'])
811
        r.stage(['a'])  # double-stage a deleted path
812

    
813
    def test_commit_no_encode_decode(self):
814
        r = self._repo
815
        repo_path_bytes = r.path.encode(sys.getfilesystemencoding())
816
        encodings = ('utf8', 'latin1')
817
        names = [u'À'.encode(encoding) for encoding in encodings]
818
        for name, encoding in zip(names, encodings):
819
            full_path = os.path.join(repo_path_bytes, name)
820
            with open(full_path, 'wb') as f:
821
                f.write(encoding.encode('ascii'))
822
            # These files are break tear_down_repo, so cleanup these files
823
            # ourselves.
824
            self.addCleanup(os.remove, full_path)
825

    
826
        r.stage(names)
827
        commit_sha = r.do_commit(b'Files with different encodings',
828
             committer=b'Test Committer <test@nodomain.com>',
829
             author=b'Test Author <test@nodomain.com>',
830
             commit_timestamp=12395, commit_timezone=0,
831
             author_timestamp=12395, author_timezone=0,
832
             ref=None, merge_heads=[self._root_commit])
833

    
834
        for name, encoding in zip(names, encodings):
835
            mode, id = tree_lookup_path(r.get_object, r[commit_sha].tree, name)
836
            self.assertEqual(stat.S_IFREG | 0o644, mode)
837
            self.assertEqual(encoding.encode('ascii'), r[id].data)
838

    
839
    def test_discover_intended(self):
840
        path = os.path.join(self._repo_dir, 'b/c')
841
        r = Repo.discover(path)
842
        self.assertEqual(r.head(), self._repo.head())
843

    
844
    def test_discover_isrepo(self):
845
        r = Repo.discover(self._repo_dir)
846
        self.assertEqual(r.head(), self._repo.head())
847

    
848
    def test_discover_notrepo(self):
849
        with self.assertRaises(NotGitRepository):
850
            Repo.discover('/')