gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_porcelain.py @ 959
History | View | Annotate | Download (31.9 KB)
1 |
# test_porcelain.py -- porcelain tests
|
---|---|
2 |
# Copyright (C) 2013 Jelmer Vernooij <jelmer@samba.org>
|
3 |
#
|
4 |
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
|
5 |
# General Public License as public by the Free Software Foundation; version 2.0
|
6 |
# or (at your option) any later version. You can redistribute it and/or
|
7 |
# modify it under the terms of either of these two licenses.
|
8 |
#
|
9 |
# Unless required by applicable law or agreed to in writing, software
|
10 |
# distributed under the License is distributed on an "AS IS" BASIS,
|
11 |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 |
# See the License for the specific language governing permissions and
|
13 |
# limitations under the License.
|
14 |
#
|
15 |
# You should have received a copy of the licenses; if not, see
|
16 |
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
|
17 |
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
|
18 |
# License, Version 2.0.
|
19 |
#
|
20 |
|
21 |
"""Tests for dulwich.porcelain."""
|
22 |
|
23 |
from io import BytesIO |
24 |
try:
|
25 |
from StringIO import StringIO |
26 |
except ImportError: |
27 |
from io import StringIO |
28 |
import os |
29 |
import shutil |
30 |
import tarfile |
31 |
import tempfile |
32 |
import time |
33 |
|
34 |
from dulwich import porcelain |
35 |
from dulwich.diff_tree import tree_changes |
36 |
from dulwich.objects import ( |
37 |
Blob, |
38 |
Tag, |
39 |
Tree, |
40 |
ZERO_SHA, |
41 |
) |
42 |
from dulwich.repo import Repo |
43 |
from dulwich.tests import ( |
44 |
TestCase, |
45 |
) |
46 |
from dulwich.tests.utils import ( |
47 |
build_commit_graph, |
48 |
make_object, |
49 |
) |
50 |
|
51 |
|
52 |
class PorcelainTestCase(TestCase): |
53 |
|
54 |
def setUp(self): |
55 |
super(PorcelainTestCase, self).setUp() |
56 |
repo_dir = tempfile.mkdtemp() |
57 |
self.addCleanup(shutil.rmtree, repo_dir)
|
58 |
self.repo = Repo.init(repo_dir)
|
59 |
|
60 |
def tearDown(self): |
61 |
super(PorcelainTestCase, self).tearDown() |
62 |
self.repo.close()
|
63 |
|
64 |
|
65 |
class ArchiveTests(PorcelainTestCase): |
66 |
"""Tests for the archive command."""
|
67 |
|
68 |
def test_simple(self): |
69 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) |
70 |
self.repo.refs[b"refs/heads/master"] = c3.id |
71 |
out = BytesIO() |
72 |
err = BytesIO() |
73 |
porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out, |
74 |
errstream=err) |
75 |
self.assertEqual(b"", err.getvalue()) |
76 |
tf = tarfile.TarFile(fileobj=out) |
77 |
self.addCleanup(tf.close)
|
78 |
self.assertEqual([], tf.getnames())
|
79 |
|
80 |
|
81 |
class UpdateServerInfoTests(PorcelainTestCase): |
82 |
|
83 |
def test_simple(self): |
84 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
85 |
[3, 1, 2]]) |
86 |
self.repo.refs[b"refs/heads/foo"] = c3.id |
87 |
porcelain.update_server_info(self.repo.path)
|
88 |
self.assertTrue(os.path.exists(os.path.join(self.repo.controldir(), |
89 |
'info', 'refs'))) |
90 |
|
91 |
|
92 |
class CommitTests(PorcelainTestCase): |
93 |
|
94 |
def test_custom_author(self): |
95 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
96 |
[3, 1, 2]]) |
97 |
self.repo.refs[b"refs/heads/foo"] = c3.id |
98 |
sha = porcelain.commit(self.repo.path, message=b"Some message", |
99 |
author=b"Joe <joe@example.com>", committer=b"Bob <bob@example.com>") |
100 |
self.assertTrue(isinstance(sha, bytes)) |
101 |
self.assertEqual(len(sha), 40) |
102 |
|
103 |
|
104 |
class CloneTests(PorcelainTestCase): |
105 |
|
106 |
def test_simple_local(self): |
107 |
f1_1 = make_object(Blob, data=b'f1')
|
108 |
commit_spec = [[1], [2, 1], [3, 1, 2]] |
109 |
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], |
110 |
2: [(b'f1', f1_1), (b'f2', f1_1)], |
111 |
3: [(b'f1', f1_1), (b'f2', f1_1)], } |
112 |
|
113 |
c1, c2, c3 = build_commit_graph(self.repo.object_store,
|
114 |
commit_spec, trees) |
115 |
self.repo.refs[b"refs/heads/master"] = c3.id |
116 |
self.repo.refs[b"refs/tags/foo"] = c3.id |
117 |
target_path = tempfile.mkdtemp() |
118 |
errstream = BytesIO() |
119 |
self.addCleanup(shutil.rmtree, target_path)
|
120 |
r = porcelain.clone(self.repo.path, target_path,
|
121 |
checkout=False, errstream=errstream)
|
122 |
self.assertEqual(r.path, target_path)
|
123 |
target_repo = Repo(target_path) |
124 |
self.assertEqual(target_repo.head(), c3.id)
|
125 |
self.assertEqual(c3.id, target_repo.refs[b'refs/tags/foo']) |
126 |
self.assertTrue(b'f1' not in os.listdir(target_path)) |
127 |
self.assertTrue(b'f2' not in os.listdir(target_path)) |
128 |
c = r.get_config() |
129 |
encoded_path = self.repo.path
|
130 |
if not isinstance(encoded_path, bytes): |
131 |
encoded_path = encoded_path.encode('utf-8')
|
132 |
self.assertEqual(encoded_path, c.get((b'remote', b'origin'), b'url')) |
133 |
self.assertEqual(
|
134 |
b'+refs/heads/*:refs/remotes/origin/*',
|
135 |
c.get((b'remote', b'origin'), b'fetch')) |
136 |
|
137 |
def test_simple_local_with_checkout(self): |
138 |
f1_1 = make_object(Blob, data=b'f1')
|
139 |
commit_spec = [[1], [2, 1], [3, 1, 2]] |
140 |
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], |
141 |
2: [(b'f1', f1_1), (b'f2', f1_1)], |
142 |
3: [(b'f1', f1_1), (b'f2', f1_1)], } |
143 |
|
144 |
c1, c2, c3 = build_commit_graph(self.repo.object_store,
|
145 |
commit_spec, trees) |
146 |
self.repo.refs[b"refs/heads/master"] = c3.id |
147 |
target_path = tempfile.mkdtemp() |
148 |
errstream = BytesIO() |
149 |
self.addCleanup(shutil.rmtree, target_path)
|
150 |
with porcelain.clone(self.repo.path, target_path, |
151 |
checkout=True,
|
152 |
errstream=errstream) as r:
|
153 |
self.assertEqual(r.path, target_path)
|
154 |
with Repo(target_path) as r: |
155 |
self.assertEqual(r.head(), c3.id)
|
156 |
self.assertTrue('f1' in os.listdir(target_path)) |
157 |
self.assertTrue('f2' in os.listdir(target_path)) |
158 |
|
159 |
def test_bare_local_with_checkout(self): |
160 |
f1_1 = make_object(Blob, data=b'f1')
|
161 |
commit_spec = [[1], [2, 1], [3, 1, 2]] |
162 |
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], |
163 |
2: [(b'f1', f1_1), (b'f2', f1_1)], |
164 |
3: [(b'f1', f1_1), (b'f2', f1_1)], } |
165 |
|
166 |
c1, c2, c3 = build_commit_graph(self.repo.object_store,
|
167 |
commit_spec, trees) |
168 |
self.repo.refs[b"refs/heads/master"] = c3.id |
169 |
target_path = tempfile.mkdtemp() |
170 |
errstream = BytesIO() |
171 |
self.addCleanup(shutil.rmtree, target_path)
|
172 |
r = porcelain.clone(self.repo.path, target_path,
|
173 |
bare=True, errstream=errstream)
|
174 |
self.assertEqual(r.path, target_path)
|
175 |
self.assertEqual(Repo(target_path).head(), c3.id)
|
176 |
self.assertFalse(b'f1' in os.listdir(target_path)) |
177 |
self.assertFalse(b'f2' in os.listdir(target_path)) |
178 |
|
179 |
def test_no_checkout_with_bare(self): |
180 |
f1_1 = make_object(Blob, data=b'f1')
|
181 |
commit_spec = [[1]]
|
182 |
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} |
183 |
|
184 |
(c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
|
185 |
self.repo.refs[b"refs/heads/master"] = c1.id |
186 |
target_path = tempfile.mkdtemp() |
187 |
errstream = BytesIO() |
188 |
self.addCleanup(shutil.rmtree, target_path)
|
189 |
self.assertRaises(ValueError, porcelain.clone, self.repo.path, |
190 |
target_path, checkout=True, bare=True, errstream=errstream) |
191 |
|
192 |
|
193 |
class InitTests(TestCase): |
194 |
|
195 |
def test_non_bare(self): |
196 |
repo_dir = tempfile.mkdtemp() |
197 |
self.addCleanup(shutil.rmtree, repo_dir)
|
198 |
porcelain.init(repo_dir) |
199 |
|
200 |
def test_bare(self): |
201 |
repo_dir = tempfile.mkdtemp() |
202 |
self.addCleanup(shutil.rmtree, repo_dir)
|
203 |
porcelain.init(repo_dir, bare=True)
|
204 |
|
205 |
|
206 |
class AddTests(PorcelainTestCase): |
207 |
|
208 |
def test_add_default_paths(self): |
209 |
|
210 |
# create a file for initial commit
|
211 |
with open(os.path.join(self.repo.path, 'blah'), 'w') as f: |
212 |
f.write("\n")
|
213 |
porcelain.add(repo=self.repo.path, paths=['blah']) |
214 |
porcelain.commit(repo=self.repo.path, message=b'test', |
215 |
author=b'test', committer=b'test') |
216 |
|
217 |
# Add a second test file and a file in a directory
|
218 |
with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
219 |
f.write("\n")
|
220 |
os.mkdir(os.path.join(self.repo.path, 'adir')) |
221 |
with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f: |
222 |
f.write("\n")
|
223 |
porcelain.add(self.repo.path)
|
224 |
|
225 |
# Check that foo was added and nothing in .git was modified
|
226 |
index = self.repo.open_index()
|
227 |
self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo']) |
228 |
|
229 |
def test_add_file(self): |
230 |
with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
231 |
f.write("BAR")
|
232 |
porcelain.add(self.repo.path, paths=["foo"]) |
233 |
|
234 |
|
235 |
class RemoveTests(PorcelainTestCase): |
236 |
|
237 |
def test_remove_file(self): |
238 |
with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
239 |
f.write("BAR")
|
240 |
porcelain.add(self.repo.path, paths=["foo"]) |
241 |
porcelain.rm(self.repo.path, paths=["foo"]) |
242 |
|
243 |
|
244 |
class LogTests(PorcelainTestCase): |
245 |
|
246 |
def test_simple(self): |
247 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
248 |
[3, 1, 2]]) |
249 |
self.repo.refs[b"HEAD"] = c3.id |
250 |
outstream = StringIO() |
251 |
porcelain.log(self.repo.path, outstream=outstream)
|
252 |
self.assertEqual(3, outstream.getvalue().count("-" * 50)) |
253 |
|
254 |
def test_max_entries(self): |
255 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
256 |
[3, 1, 2]]) |
257 |
self.repo.refs[b"HEAD"] = c3.id |
258 |
outstream = StringIO() |
259 |
porcelain.log(self.repo.path, outstream=outstream, max_entries=1) |
260 |
self.assertEqual(1, outstream.getvalue().count("-" * 50)) |
261 |
|
262 |
|
263 |
class ShowTests(PorcelainTestCase): |
264 |
|
265 |
def test_nolist(self): |
266 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
267 |
[3, 1, 2]]) |
268 |
self.repo.refs[b"HEAD"] = c3.id |
269 |
outstream = StringIO() |
270 |
porcelain.show(self.repo.path, objects=c3.id, outstream=outstream)
|
271 |
self.assertTrue(outstream.getvalue().startswith("-" * 50)) |
272 |
|
273 |
def test_simple(self): |
274 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
275 |
[3, 1, 2]]) |
276 |
self.repo.refs[b"HEAD"] = c3.id |
277 |
outstream = StringIO() |
278 |
porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream)
|
279 |
self.assertTrue(outstream.getvalue().startswith("-" * 50)) |
280 |
|
281 |
def test_blob(self): |
282 |
b = Blob.from_string(b"The Foo\n")
|
283 |
self.repo.object_store.add_object(b)
|
284 |
outstream = StringIO() |
285 |
porcelain.show(self.repo.path, objects=[b.id], outstream=outstream)
|
286 |
self.assertEqual(outstream.getvalue(), "The Foo\n") |
287 |
|
288 |
|
289 |
class SymbolicRefTests(PorcelainTestCase): |
290 |
|
291 |
def test_set_wrong_symbolic_ref(self): |
292 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
293 |
[3, 1, 2]]) |
294 |
self.repo.refs[b"HEAD"] = c3.id |
295 |
|
296 |
self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path, b'foobar') |
297 |
|
298 |
def test_set_force_wrong_symbolic_ref(self): |
299 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
300 |
[3, 1, 2]]) |
301 |
self.repo.refs[b"HEAD"] = c3.id |
302 |
|
303 |
porcelain.symbolic_ref(self.repo.path, b'force_foobar', force=True) |
304 |
|
305 |
#test if we actually changed the file
|
306 |
with self.repo.get_named_file('HEAD') as f: |
307 |
new_ref = f.read() |
308 |
self.assertEqual(new_ref, b'ref: refs/heads/force_foobar\n') |
309 |
|
310 |
def test_set_symbolic_ref(self): |
311 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
312 |
[3, 1, 2]]) |
313 |
self.repo.refs[b"HEAD"] = c3.id |
314 |
|
315 |
porcelain.symbolic_ref(self.repo.path, b'master') |
316 |
|
317 |
def test_set_symbolic_ref_other_than_master(self): |
318 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
319 |
[3, 1, 2]], attrs=dict(refs='develop')) |
320 |
self.repo.refs[b"HEAD"] = c3.id |
321 |
self.repo.refs[b"refs/heads/develop"] = c3.id |
322 |
|
323 |
porcelain.symbolic_ref(self.repo.path, b'develop') |
324 |
|
325 |
#test if we actually changed the file
|
326 |
with self.repo.get_named_file('HEAD') as f: |
327 |
new_ref = f.read() |
328 |
self.assertEqual(new_ref, b'ref: refs/heads/develop\n') |
329 |
|
330 |
|
331 |
class DiffTreeTests(PorcelainTestCase): |
332 |
|
333 |
def test_empty(self): |
334 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
335 |
[3, 1, 2]]) |
336 |
self.repo.refs[b"HEAD"] = c3.id |
337 |
outstream = BytesIO() |
338 |
porcelain.diff_tree(self.repo.path, c2.tree, c3.tree, outstream=outstream)
|
339 |
self.assertEqual(outstream.getvalue(), b"") |
340 |
|
341 |
|
342 |
class CommitTreeTests(PorcelainTestCase): |
343 |
|
344 |
def test_simple(self): |
345 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
346 |
[3, 1, 2]]) |
347 |
b = Blob() |
348 |
b.data = b"foo the bar"
|
349 |
t = Tree() |
350 |
t.add(b"somename", 0o100644, b.id) |
351 |
self.repo.object_store.add_object(t)
|
352 |
self.repo.object_store.add_object(b)
|
353 |
sha = porcelain.commit_tree( |
354 |
self.repo.path, t.id, message=b"Withcommit.", |
355 |
author=b"Joe <joe@example.com>",
|
356 |
committer=b"Jane <jane@example.com>")
|
357 |
self.assertTrue(isinstance(sha, bytes)) |
358 |
self.assertEqual(len(sha), 40) |
359 |
|
360 |
|
361 |
class RevListTests(PorcelainTestCase): |
362 |
|
363 |
def test_simple(self): |
364 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
365 |
[3, 1, 2]]) |
366 |
outstream = BytesIO() |
367 |
porcelain.rev_list( |
368 |
self.repo.path, [c3.id], outstream=outstream)
|
369 |
self.assertEqual(
|
370 |
c3.id + b"\n" +
|
371 |
c2.id + b"\n" +
|
372 |
c1.id + b"\n",
|
373 |
outstream.getvalue()) |
374 |
|
375 |
|
376 |
class TagCreateTests(PorcelainTestCase): |
377 |
|
378 |
def test_annotated(self): |
379 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
380 |
[3, 1, 2]]) |
381 |
self.repo.refs[b"HEAD"] = c3.id |
382 |
|
383 |
porcelain.tag_create(self.repo.path, b"tryme", b'foo <foo@bar.com>', |
384 |
b'bar', annotated=True) |
385 |
|
386 |
tags = self.repo.refs.as_dict(b"refs/tags") |
387 |
self.assertEqual(list(tags.keys()), [b"tryme"]) |
388 |
tag = self.repo[b'refs/tags/tryme'] |
389 |
self.assertTrue(isinstance(tag, Tag)) |
390 |
self.assertEqual(b"foo <foo@bar.com>", tag.tagger) |
391 |
self.assertEqual(b"bar", tag.message) |
392 |
self.assertLess(time.time() - tag.tag_time, 5) |
393 |
|
394 |
def test_unannotated(self): |
395 |
c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
396 |
[3, 1, 2]]) |
397 |
self.repo.refs[b"HEAD"] = c3.id |
398 |
|
399 |
porcelain.tag_create(self.repo.path, b"tryme", annotated=False) |
400 |
|
401 |
tags = self.repo.refs.as_dict(b"refs/tags") |
402 |
self.assertEqual(list(tags.keys()), [b"tryme"]) |
403 |
self.repo[b'refs/tags/tryme'] |
404 |
self.assertEqual(list(tags.values()), [self.repo.head()]) |
405 |
|
406 |
|
407 |
class TagListTests(PorcelainTestCase): |
408 |
|
409 |
def test_empty(self): |
410 |
tags = porcelain.tag_list(self.repo.path)
|
411 |
self.assertEqual([], tags)
|
412 |
|
413 |
def test_simple(self): |
414 |
self.repo.refs[b"refs/tags/foo"] = b"aa" * 20 |
415 |
self.repo.refs[b"refs/tags/bar/bla"] = b"bb" * 20 |
416 |
tags = porcelain.tag_list(self.repo.path)
|
417 |
|
418 |
self.assertEqual([b"bar/bla", b"foo"], tags) |
419 |
|
420 |
|
421 |
class TagDeleteTests(PorcelainTestCase): |
422 |
|
423 |
def test_simple(self): |
424 |
[c1] = build_commit_graph(self.repo.object_store, [[1]]) |
425 |
self.repo[b"HEAD"] = c1.id |
426 |
porcelain.tag_create(self.repo, b'foo') |
427 |
self.assertTrue(b"foo" in porcelain.tag_list(self.repo)) |
428 |
porcelain.tag_delete(self.repo, b'foo') |
429 |
self.assertFalse(b"foo" in porcelain.tag_list(self.repo)) |
430 |
|
431 |
|
432 |
class ResetTests(PorcelainTestCase): |
433 |
|
434 |
def test_hard_head(self): |
435 |
with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
436 |
f.write("BAR")
|
437 |
porcelain.add(self.repo.path, paths=["foo"]) |
438 |
porcelain.commit(self.repo.path, message=b"Some message", |
439 |
committer=b"Jane <jane@example.com>",
|
440 |
author=b"John <john@example.com>")
|
441 |
|
442 |
with open(os.path.join(self.repo.path, 'foo'), 'wb') as f: |
443 |
f.write(b"OOH")
|
444 |
|
445 |
porcelain.reset(self.repo, "hard", b"HEAD") |
446 |
|
447 |
index = self.repo.open_index()
|
448 |
changes = list(tree_changes(self.repo, |
449 |
index.commit(self.repo.object_store),
|
450 |
self.repo[b'HEAD'].tree)) |
451 |
|
452 |
self.assertEqual([], changes)
|
453 |
|
454 |
def test_hard_commit(self): |
455 |
with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
456 |
f.write("BAR")
|
457 |
porcelain.add(self.repo.path, paths=["foo"]) |
458 |
sha = porcelain.commit(self.repo.path, message=b"Some message", |
459 |
committer=b"Jane <jane@example.com>",
|
460 |
author=b"John <john@example.com>")
|
461 |
|
462 |
with open(os.path.join(self.repo.path, 'foo'), 'wb') as f: |
463 |
f.write(b"BAZ")
|
464 |
porcelain.add(self.repo.path, paths=["foo"]) |
465 |
porcelain.commit(self.repo.path, message=b"Some other message", |
466 |
committer=b"Jane <jane@example.com>",
|
467 |
author=b"John <john@example.com>")
|
468 |
|
469 |
porcelain.reset(self.repo, "hard", sha) |
470 |
|
471 |
index = self.repo.open_index()
|
472 |
changes = list(tree_changes(self.repo, |
473 |
index.commit(self.repo.object_store),
|
474 |
self.repo[sha].tree))
|
475 |
|
476 |
self.assertEqual([], changes)
|
477 |
|
478 |
|
479 |
class PushTests(PorcelainTestCase): |
480 |
|
481 |
def test_simple(self): |
482 |
"""
|
483 |
Basic test of porcelain push where self.repo is the remote. First
|
484 |
clone the remote, commit a file to the clone, then push the changes
|
485 |
back to the remote.
|
486 |
"""
|
487 |
outstream = BytesIO() |
488 |
errstream = BytesIO() |
489 |
|
490 |
porcelain.commit(repo=self.repo.path, message=b'init', |
491 |
author=b'', committer=b'') |
492 |
|
493 |
# Setup target repo cloned from temp test repo
|
494 |
clone_path = tempfile.mkdtemp() |
495 |
self.addCleanup(shutil.rmtree, clone_path)
|
496 |
target_repo = porcelain.clone(self.repo.path, target=clone_path,
|
497 |
errstream=errstream) |
498 |
try:
|
499 |
self.assertEqual(target_repo[b'HEAD'], self.repo[b'HEAD']) |
500 |
finally:
|
501 |
target_repo.close() |
502 |
|
503 |
# create a second file to be pushed back to origin
|
504 |
handle, fullpath = tempfile.mkstemp(dir=clone_path) |
505 |
os.close(handle) |
506 |
porcelain.add(repo=clone_path, paths=[os.path.basename(fullpath)]) |
507 |
porcelain.commit(repo=clone_path, message=b'push',
|
508 |
author=b'', committer=b'') |
509 |
|
510 |
# Setup a non-checked out branch in the remote
|
511 |
refs_path = b"refs/heads/foo"
|
512 |
new_id = self.repo[b'HEAD'].id |
513 |
self.assertNotEqual(new_id, ZERO_SHA)
|
514 |
self.repo.refs[refs_path] = new_id
|
515 |
|
516 |
# Push to the remote
|
517 |
porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path, outstream=outstream, |
518 |
errstream=errstream) |
519 |
|
520 |
# Check that the target and source
|
521 |
with Repo(clone_path) as r_clone: |
522 |
self.assertEqual({
|
523 |
b'HEAD': new_id,
|
524 |
b'refs/heads/foo': r_clone[b'HEAD'].id, |
525 |
b'refs/heads/master': new_id,
|
526 |
}, self.repo.get_refs())
|
527 |
self.assertEqual(r_clone[b'HEAD'].id, self.repo.refs[refs_path]) |
528 |
|
529 |
# Get the change in the target repo corresponding to the add
|
530 |
# this will be in the foo branch.
|
531 |
change = list(tree_changes(self.repo, self.repo[b'HEAD'].tree, |
532 |
self.repo[b'refs/heads/foo'].tree))[0] |
533 |
self.assertEqual(os.path.basename(fullpath),
|
534 |
change.new.path.decode('ascii'))
|
535 |
|
536 |
def test_delete(self): |
537 |
"""Basic test of porcelain push, removing a branch.
|
538 |
"""
|
539 |
outstream = BytesIO() |
540 |
errstream = BytesIO() |
541 |
|
542 |
porcelain.commit(repo=self.repo.path, message=b'init', |
543 |
author=b'', committer=b'') |
544 |
|
545 |
# Setup target repo cloned from temp test repo
|
546 |
clone_path = tempfile.mkdtemp() |
547 |
self.addCleanup(shutil.rmtree, clone_path)
|
548 |
target_repo = porcelain.clone(self.repo.path, target=clone_path,
|
549 |
errstream=errstream) |
550 |
target_repo.close() |
551 |
|
552 |
# Setup a non-checked out branch in the remote
|
553 |
refs_path = b"refs/heads/foo"
|
554 |
new_id = self.repo[b'HEAD'].id |
555 |
self.assertNotEqual(new_id, ZERO_SHA)
|
556 |
self.repo.refs[refs_path] = new_id
|
557 |
|
558 |
# Push to the remote
|
559 |
porcelain.push(clone_path, self.repo.path, b":" + refs_path, outstream=outstream, |
560 |
errstream=errstream) |
561 |
|
562 |
self.assertEqual({
|
563 |
b'HEAD': new_id,
|
564 |
b'refs/heads/master': new_id,
|
565 |
}, self.repo.get_refs())
|
566 |
|
567 |
|
568 |
|
569 |
class PullTests(PorcelainTestCase): |
570 |
|
571 |
def setUp(self): |
572 |
super(PullTests, self).setUp() |
573 |
# create a file for initial commit
|
574 |
handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
575 |
os.close(handle) |
576 |
filename = os.path.basename(fullpath) |
577 |
porcelain.add(repo=self.repo.path, paths=filename)
|
578 |
porcelain.commit(repo=self.repo.path, message=b'test', |
579 |
author=b'test', committer=b'test') |
580 |
|
581 |
# Setup target repo
|
582 |
self.target_path = tempfile.mkdtemp()
|
583 |
self.addCleanup(shutil.rmtree, self.target_path) |
584 |
target_repo = porcelain.clone(self.repo.path, target=self.target_path, |
585 |
errstream=BytesIO()) |
586 |
target_repo.close() |
587 |
|
588 |
# create a second file to be pushed
|
589 |
handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
590 |
os.close(handle) |
591 |
filename = os.path.basename(fullpath) |
592 |
porcelain.add(repo=self.repo.path, paths=filename)
|
593 |
porcelain.commit(repo=self.repo.path, message=b'test2', |
594 |
author=b'test2', committer=b'test2') |
595 |
|
596 |
self.assertTrue(b'refs/heads/master' in self.repo.refs) |
597 |
self.assertTrue(b'refs/heads/master' in target_repo.refs) |
598 |
|
599 |
def test_simple(self): |
600 |
outstream = BytesIO() |
601 |
errstream = BytesIO() |
602 |
|
603 |
# Pull changes into the cloned repo
|
604 |
porcelain.pull(self.target_path, self.repo.path, b'refs/heads/master', |
605 |
outstream=outstream, errstream=errstream) |
606 |
|
607 |
# Check the target repo for pushed changes
|
608 |
with Repo(self.target_path) as r: |
609 |
self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) |
610 |
|
611 |
def test_no_refspec(self): |
612 |
outstream = BytesIO() |
613 |
errstream = BytesIO() |
614 |
|
615 |
# Pull changes into the cloned repo
|
616 |
porcelain.pull(self.target_path, self.repo.path, outstream=outstream, |
617 |
errstream=errstream) |
618 |
|
619 |
# Check the target repo for pushed changes
|
620 |
with Repo(self.target_path) as r: |
621 |
self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) |
622 |
|
623 |
|
624 |
class StatusTests(PorcelainTestCase): |
625 |
|
626 |
def test_empty(self): |
627 |
results = porcelain.status(self.repo)
|
628 |
self.assertEqual(
|
629 |
{'add': [], 'delete': [], 'modify': []}, |
630 |
results.staged) |
631 |
self.assertEqual([], results.unstaged)
|
632 |
|
633 |
def test_status(self): |
634 |
"""Integration test for `status` functionality."""
|
635 |
|
636 |
# Commit a dummy file then modify it
|
637 |
fullpath = os.path.join(self.repo.path, 'foo') |
638 |
with open(fullpath, 'w') as f: |
639 |
f.write('origstuff')
|
640 |
|
641 |
porcelain.add(repo=self.repo.path, paths=['foo']) |
642 |
porcelain.commit(repo=self.repo.path, message=b'test status', |
643 |
author=b'', committer=b'') |
644 |
|
645 |
# modify access and modify time of path
|
646 |
os.utime(fullpath, (0, 0)) |
647 |
|
648 |
with open(fullpath, 'wb') as f: |
649 |
f.write(b'stuff')
|
650 |
|
651 |
# Make a dummy file and stage it
|
652 |
filename_add = 'bar'
|
653 |
fullpath = os.path.join(self.repo.path, filename_add)
|
654 |
with open(fullpath, 'w') as f: |
655 |
f.write('stuff')
|
656 |
porcelain.add(repo=self.repo.path, paths=filename_add)
|
657 |
|
658 |
results = porcelain.status(self.repo)
|
659 |
|
660 |
self.assertEqual(results.staged['add'][0], filename_add.encode('ascii')) |
661 |
self.assertEqual(results.unstaged, [b'foo']) |
662 |
|
663 |
def test_get_tree_changes_add(self): |
664 |
"""Unit test for get_tree_changes add."""
|
665 |
|
666 |
# Make a dummy file, stage
|
667 |
filename = 'bar'
|
668 |
with open(os.path.join(self.repo.path, filename), 'w') as f: |
669 |
f.write('stuff')
|
670 |
porcelain.add(repo=self.repo.path, paths=filename)
|
671 |
porcelain.commit(repo=self.repo.path, message=b'test status', |
672 |
author=b'', committer=b'') |
673 |
|
674 |
filename = 'foo'
|
675 |
with open(os.path.join(self.repo.path, filename), 'w') as f: |
676 |
f.write('stuff')
|
677 |
porcelain.add(repo=self.repo.path, paths=filename)
|
678 |
changes = porcelain.get_tree_changes(self.repo.path)
|
679 |
|
680 |
self.assertEqual(changes['add'][0], filename.encode('ascii')) |
681 |
self.assertEqual(len(changes['add']), 1) |
682 |
self.assertEqual(len(changes['modify']), 0) |
683 |
self.assertEqual(len(changes['delete']), 0) |
684 |
|
685 |
def test_get_tree_changes_modify(self): |
686 |
"""Unit test for get_tree_changes modify."""
|
687 |
|
688 |
# Make a dummy file, stage, commit, modify
|
689 |
filename = 'foo'
|
690 |
fullpath = os.path.join(self.repo.path, filename)
|
691 |
with open(fullpath, 'w') as f: |
692 |
f.write('stuff')
|
693 |
porcelain.add(repo=self.repo.path, paths=filename)
|
694 |
porcelain.commit(repo=self.repo.path, message=b'test status', |
695 |
author=b'', committer=b'') |
696 |
with open(fullpath, 'w') as f: |
697 |
f.write('otherstuff')
|
698 |
porcelain.add(repo=self.repo.path, paths=filename)
|
699 |
changes = porcelain.get_tree_changes(self.repo.path)
|
700 |
|
701 |
self.assertEqual(changes['modify'][0], filename.encode('ascii')) |
702 |
self.assertEqual(len(changes['add']), 0) |
703 |
self.assertEqual(len(changes['modify']), 1) |
704 |
self.assertEqual(len(changes['delete']), 0) |
705 |
|
706 |
def test_get_tree_changes_delete(self): |
707 |
"""Unit test for get_tree_changes delete."""
|
708 |
|
709 |
# Make a dummy file, stage, commit, remove
|
710 |
filename = 'foo'
|
711 |
with open(os.path.join(self.repo.path, filename), 'w') as f: |
712 |
f.write('stuff')
|
713 |
porcelain.add(repo=self.repo.path, paths=filename)
|
714 |
porcelain.commit(repo=self.repo.path, message=b'test status', |
715 |
author=b'', committer=b'') |
716 |
porcelain.rm(repo=self.repo.path, paths=[filename])
|
717 |
changes = porcelain.get_tree_changes(self.repo.path)
|
718 |
|
719 |
self.assertEqual(changes['delete'][0], filename.encode('ascii')) |
720 |
self.assertEqual(len(changes['add']), 0) |
721 |
self.assertEqual(len(changes['modify']), 0) |
722 |
self.assertEqual(len(changes['delete']), 1) |
723 |
|
724 |
|
725 |
# TODO(jelmer): Add test for dulwich.porcelain.daemon
|
726 |
|
727 |
|
728 |
class UploadPackTests(PorcelainTestCase): |
729 |
"""Tests for upload_pack."""
|
730 |
|
731 |
def test_upload_pack(self): |
732 |
outf = BytesIO() |
733 |
exitcode = porcelain.upload_pack(self.repo.path, BytesIO(b"0000"), outf) |
734 |
outlines = outf.getvalue().splitlines() |
735 |
self.assertEqual([b"0000"], outlines) |
736 |
self.assertEqual(0, exitcode) |
737 |
|
738 |
|
739 |
class ReceivePackTests(PorcelainTestCase): |
740 |
"""Tests for receive_pack."""
|
741 |
|
742 |
def test_receive_pack(self): |
743 |
filename = 'foo'
|
744 |
with open(os.path.join(self.repo.path, filename), 'w') as f: |
745 |
f.write('stuff')
|
746 |
porcelain.add(repo=self.repo.path, paths=filename)
|
747 |
self.repo.do_commit(message=b'test status', |
748 |
author=b'', committer=b'', author_timestamp=1402354300, |
749 |
commit_timestamp=1402354300, author_timezone=0, commit_timezone=0) |
750 |
outf = BytesIO() |
751 |
exitcode = porcelain.receive_pack(self.repo.path, BytesIO(b"0000"), outf) |
752 |
outlines = outf.getvalue().splitlines() |
753 |
self.assertEqual([
|
754 |
b'00739e65bdcf4a22cdd4f3700604a275cd2aaf146b23 HEAD\x00 report-status '
|
755 |
b'delete-refs quiet ofs-delta side-band-64k no-done',
|
756 |
b'003f9e65bdcf4a22cdd4f3700604a275cd2aaf146b23 refs/heads/master',
|
757 |
b'0000'], outlines)
|
758 |
self.assertEqual(0, exitcode) |
759 |
|
760 |
|
761 |
class BranchListTests(PorcelainTestCase): |
762 |
|
763 |
def test_standard(self): |
764 |
self.assertEqual(set([]), set(porcelain.branch_list(self.repo))) |
765 |
|
766 |
def test_new_branch(self): |
767 |
[c1] = build_commit_graph(self.repo.object_store, [[1]]) |
768 |
self.repo[b"HEAD"] = c1.id |
769 |
porcelain.branch_create(self.repo, b"foo") |
770 |
self.assertEqual(
|
771 |
set([b"master", b"foo"]), |
772 |
set(porcelain.branch_list(self.repo))) |
773 |
|
774 |
|
775 |
class BranchCreateTests(PorcelainTestCase): |
776 |
|
777 |
def test_branch_exists(self): |
778 |
[c1] = build_commit_graph(self.repo.object_store, [[1]]) |
779 |
self.repo[b"HEAD"] = c1.id |
780 |
porcelain.branch_create(self.repo, b"foo") |
781 |
self.assertRaises(KeyError, porcelain.branch_create, self.repo, b"foo") |
782 |
porcelain.branch_create(self.repo, b"foo", force=True) |
783 |
|
784 |
def test_new_branch(self): |
785 |
[c1] = build_commit_graph(self.repo.object_store, [[1]]) |
786 |
self.repo[b"HEAD"] = c1.id |
787 |
porcelain.branch_create(self.repo, b"foo") |
788 |
self.assertEqual(
|
789 |
set([b"master", b"foo"]), |
790 |
set(porcelain.branch_list(self.repo))) |
791 |
|
792 |
|
793 |
class BranchDeleteTests(PorcelainTestCase): |
794 |
|
795 |
def test_simple(self): |
796 |
[c1] = build_commit_graph(self.repo.object_store, [[1]]) |
797 |
self.repo[b"HEAD"] = c1.id |
798 |
porcelain.branch_create(self.repo, b'foo') |
799 |
self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) |
800 |
porcelain.branch_delete(self.repo, b'foo') |
801 |
self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) |
802 |
|
803 |
|
804 |
class FetchTests(PorcelainTestCase): |
805 |
|
806 |
def test_simple(self): |
807 |
outstream = BytesIO() |
808 |
errstream = BytesIO() |
809 |
|
810 |
# create a file for initial commit
|
811 |
handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
812 |
os.close(handle) |
813 |
filename = os.path.basename(fullpath) |
814 |
porcelain.add(repo=self.repo.path, paths=filename)
|
815 |
porcelain.commit(repo=self.repo.path, message=b'test', |
816 |
author=b'test', committer=b'test') |
817 |
|
818 |
# Setup target repo
|
819 |
target_path = tempfile.mkdtemp() |
820 |
self.addCleanup(shutil.rmtree, target_path)
|
821 |
target_repo = porcelain.clone(self.repo.path, target=target_path,
|
822 |
errstream=errstream) |
823 |
|
824 |
# create a second file to be pushed
|
825 |
handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
826 |
os.close(handle) |
827 |
filename = os.path.basename(fullpath) |
828 |
porcelain.add(repo=self.repo.path, paths=filename)
|
829 |
porcelain.commit(repo=self.repo.path, message=b'test2', |
830 |
author=b'test2', committer=b'test2') |
831 |
|
832 |
self.assertFalse(self.repo[b'HEAD'].id in target_repo) |
833 |
target_repo.close() |
834 |
|
835 |
# Fetch changes into the cloned repo
|
836 |
porcelain.fetch(target_path, self.repo.path, outstream=outstream,
|
837 |
errstream=errstream) |
838 |
|
839 |
# Check the target repo for pushed changes
|
840 |
with Repo(target_path) as r: |
841 |
self.assertTrue(self.repo[b'HEAD'].id in r) |
842 |
|
843 |
|
844 |
class RepackTests(PorcelainTestCase): |
845 |
|
846 |
def test_empty(self): |
847 |
porcelain.repack(self.repo)
|
848 |
|
849 |
def test_simple(self): |
850 |
handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
851 |
os.close(handle) |
852 |
filename = os.path.basename(fullpath) |
853 |
porcelain.add(repo=self.repo.path, paths=filename)
|
854 |
porcelain.repack(self.repo)
|
855 |
|
856 |
|
857 |
class LsTreeTests(PorcelainTestCase): |
858 |
|
859 |
def test_empty(self): |
860 |
porcelain.commit(repo=self.repo.path, message=b'test status', |
861 |
author=b'', committer=b'') |
862 |
|
863 |
f = StringIO() |
864 |
porcelain.ls_tree(self.repo, b"HEAD", outstream=f) |
865 |
self.assertEqual(f.getvalue(), "") |
866 |
|
867 |
def test_simple(self): |
868 |
# Commit a dummy file then modify it
|
869 |
fullpath = os.path.join(self.repo.path, 'foo') |
870 |
with open(fullpath, 'w') as f: |
871 |
f.write('origstuff')
|
872 |
|
873 |
porcelain.add(repo=self.repo.path, paths=['foo']) |
874 |
porcelain.commit(repo=self.repo.path, message=b'test status', |
875 |
author=b'', committer=b'') |
876 |
|
877 |
f = StringIO() |
878 |
porcelain.ls_tree(self.repo, b"HEAD", outstream=f) |
879 |
self.assertEqual(
|
880 |
f.getvalue(), |
881 |
'100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tfoo\n')
|
882 |
|
883 |
|
884 |
class LsRemoteTests(PorcelainTestCase): |
885 |
|
886 |
def test_empty(self): |
887 |
self.assertEqual({}, porcelain.ls_remote(self.repo.path)) |
888 |
|
889 |
def test_some(self): |
890 |
cid = porcelain.commit(repo=self.repo.path, message=b'test status', |
891 |
author=b'', committer=b'') |
892 |
|
893 |
self.assertEqual({
|
894 |
b'refs/heads/master': cid,
|
895 |
b'HEAD': cid},
|
896 |
porcelain.ls_remote(self.repo.path))
|