gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_porcelain.py @ 959
History | View | Annotate | Download (31.9 KB)
1 | 959 | jjdelcerro | # test_porcelain.py -- porcelain tests
|
---|---|---|---|
2 | # Copyright (C) 2013 Jelmer Vernooij <jelmer@samba.org>
|
||
3 | #
|
||
4 | # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
|
||
5 | # General Public License as public by the Free Software Foundation; version 2.0
|
||
6 | # or (at your option) any later version. You can redistribute it and/or
|
||
7 | # modify it under the terms of either of these two licenses.
|
||
8 | #
|
||
9 | # Unless required by applicable law or agreed to in writing, software
|
||
10 | # distributed under the License is distributed on an "AS IS" BASIS,
|
||
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
12 | # See the License for the specific language governing permissions and
|
||
13 | # limitations under the License.
|
||
14 | #
|
||
15 | # You should have received a copy of the licenses; if not, see
|
||
16 | # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
|
||
17 | # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
|
||
18 | # License, Version 2.0.
|
||
19 | #
|
||
20 | |||
21 | """Tests for dulwich.porcelain."""
|
||
22 | |||
23 | from io import BytesIO |
||
24 | try:
|
||
25 | from StringIO import StringIO |
||
26 | except ImportError: |
||
27 | from io import StringIO |
||
28 | import os |
||
29 | import shutil |
||
30 | import tarfile |
||
31 | import tempfile |
||
32 | import time |
||
33 | |||
34 | from dulwich import porcelain |
||
35 | from dulwich.diff_tree import tree_changes |
||
36 | from dulwich.objects import ( |
||
37 | Blob, |
||
38 | Tag, |
||
39 | Tree, |
||
40 | ZERO_SHA, |
||
41 | ) |
||
42 | from dulwich.repo import Repo |
||
43 | from dulwich.tests import ( |
||
44 | TestCase, |
||
45 | ) |
||
46 | from dulwich.tests.utils import ( |
||
47 | build_commit_graph, |
||
48 | make_object, |
||
49 | ) |
||
50 | |||
51 | |||
52 | class PorcelainTestCase(TestCase): |
||
53 | |||
54 | def setUp(self): |
||
55 | super(PorcelainTestCase, self).setUp() |
||
56 | repo_dir = tempfile.mkdtemp() |
||
57 | self.addCleanup(shutil.rmtree, repo_dir)
|
||
58 | self.repo = Repo.init(repo_dir)
|
||
59 | |||
60 | def tearDown(self): |
||
61 | super(PorcelainTestCase, self).tearDown() |
||
62 | self.repo.close()
|
||
63 | |||
64 | |||
65 | class ArchiveTests(PorcelainTestCase): |
||
66 | """Tests for the archive command."""
|
||
67 | |||
68 | def test_simple(self): |
||
69 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) |
||
70 | self.repo.refs[b"refs/heads/master"] = c3.id |
||
71 | out = BytesIO() |
||
72 | err = BytesIO() |
||
73 | porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out, |
||
74 | errstream=err) |
||
75 | self.assertEqual(b"", err.getvalue()) |
||
76 | tf = tarfile.TarFile(fileobj=out) |
||
77 | self.addCleanup(tf.close)
|
||
78 | self.assertEqual([], tf.getnames())
|
||
79 | |||
80 | |||
81 | class UpdateServerInfoTests(PorcelainTestCase): |
||
82 | |||
83 | def test_simple(self): |
||
84 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
85 | [3, 1, 2]]) |
||
86 | self.repo.refs[b"refs/heads/foo"] = c3.id |
||
87 | porcelain.update_server_info(self.repo.path)
|
||
88 | self.assertTrue(os.path.exists(os.path.join(self.repo.controldir(), |
||
89 | 'info', 'refs'))) |
||
90 | |||
91 | |||
92 | class CommitTests(PorcelainTestCase): |
||
93 | |||
94 | def test_custom_author(self): |
||
95 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
96 | [3, 1, 2]]) |
||
97 | self.repo.refs[b"refs/heads/foo"] = c3.id |
||
98 | sha = porcelain.commit(self.repo.path, message=b"Some message", |
||
99 | author=b"Joe <joe@example.com>", committer=b"Bob <bob@example.com>") |
||
100 | self.assertTrue(isinstance(sha, bytes)) |
||
101 | self.assertEqual(len(sha), 40) |
||
102 | |||
103 | |||
104 | class CloneTests(PorcelainTestCase): |
||
105 | |||
106 | def test_simple_local(self): |
||
107 | f1_1 = make_object(Blob, data=b'f1')
|
||
108 | commit_spec = [[1], [2, 1], [3, 1, 2]] |
||
109 | trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], |
||
110 | 2: [(b'f1', f1_1), (b'f2', f1_1)], |
||
111 | 3: [(b'f1', f1_1), (b'f2', f1_1)], } |
||
112 | |||
113 | c1, c2, c3 = build_commit_graph(self.repo.object_store,
|
||
114 | commit_spec, trees) |
||
115 | self.repo.refs[b"refs/heads/master"] = c3.id |
||
116 | self.repo.refs[b"refs/tags/foo"] = c3.id |
||
117 | target_path = tempfile.mkdtemp() |
||
118 | errstream = BytesIO() |
||
119 | self.addCleanup(shutil.rmtree, target_path)
|
||
120 | r = porcelain.clone(self.repo.path, target_path,
|
||
121 | checkout=False, errstream=errstream)
|
||
122 | self.assertEqual(r.path, target_path)
|
||
123 | target_repo = Repo(target_path) |
||
124 | self.assertEqual(target_repo.head(), c3.id)
|
||
125 | self.assertEqual(c3.id, target_repo.refs[b'refs/tags/foo']) |
||
126 | self.assertTrue(b'f1' not in os.listdir(target_path)) |
||
127 | self.assertTrue(b'f2' not in os.listdir(target_path)) |
||
128 | c = r.get_config() |
||
129 | encoded_path = self.repo.path
|
||
130 | if not isinstance(encoded_path, bytes): |
||
131 | encoded_path = encoded_path.encode('utf-8')
|
||
132 | self.assertEqual(encoded_path, c.get((b'remote', b'origin'), b'url')) |
||
133 | self.assertEqual(
|
||
134 | b'+refs/heads/*:refs/remotes/origin/*',
|
||
135 | c.get((b'remote', b'origin'), b'fetch')) |
||
136 | |||
137 | def test_simple_local_with_checkout(self): |
||
138 | f1_1 = make_object(Blob, data=b'f1')
|
||
139 | commit_spec = [[1], [2, 1], [3, 1, 2]] |
||
140 | trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], |
||
141 | 2: [(b'f1', f1_1), (b'f2', f1_1)], |
||
142 | 3: [(b'f1', f1_1), (b'f2', f1_1)], } |
||
143 | |||
144 | c1, c2, c3 = build_commit_graph(self.repo.object_store,
|
||
145 | commit_spec, trees) |
||
146 | self.repo.refs[b"refs/heads/master"] = c3.id |
||
147 | target_path = tempfile.mkdtemp() |
||
148 | errstream = BytesIO() |
||
149 | self.addCleanup(shutil.rmtree, target_path)
|
||
150 | with porcelain.clone(self.repo.path, target_path, |
||
151 | checkout=True,
|
||
152 | errstream=errstream) as r:
|
||
153 | self.assertEqual(r.path, target_path)
|
||
154 | with Repo(target_path) as r: |
||
155 | self.assertEqual(r.head(), c3.id)
|
||
156 | self.assertTrue('f1' in os.listdir(target_path)) |
||
157 | self.assertTrue('f2' in os.listdir(target_path)) |
||
158 | |||
159 | def test_bare_local_with_checkout(self): |
||
160 | f1_1 = make_object(Blob, data=b'f1')
|
||
161 | commit_spec = [[1], [2, 1], [3, 1, 2]] |
||
162 | trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], |
||
163 | 2: [(b'f1', f1_1), (b'f2', f1_1)], |
||
164 | 3: [(b'f1', f1_1), (b'f2', f1_1)], } |
||
165 | |||
166 | c1, c2, c3 = build_commit_graph(self.repo.object_store,
|
||
167 | commit_spec, trees) |
||
168 | self.repo.refs[b"refs/heads/master"] = c3.id |
||
169 | target_path = tempfile.mkdtemp() |
||
170 | errstream = BytesIO() |
||
171 | self.addCleanup(shutil.rmtree, target_path)
|
||
172 | r = porcelain.clone(self.repo.path, target_path,
|
||
173 | bare=True, errstream=errstream)
|
||
174 | self.assertEqual(r.path, target_path)
|
||
175 | self.assertEqual(Repo(target_path).head(), c3.id)
|
||
176 | self.assertFalse(b'f1' in os.listdir(target_path)) |
||
177 | self.assertFalse(b'f2' in os.listdir(target_path)) |
||
178 | |||
179 | def test_no_checkout_with_bare(self): |
||
180 | f1_1 = make_object(Blob, data=b'f1')
|
||
181 | commit_spec = [[1]]
|
||
182 | trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} |
||
183 | |||
184 | (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
|
||
185 | self.repo.refs[b"refs/heads/master"] = c1.id |
||
186 | target_path = tempfile.mkdtemp() |
||
187 | errstream = BytesIO() |
||
188 | self.addCleanup(shutil.rmtree, target_path)
|
||
189 | self.assertRaises(ValueError, porcelain.clone, self.repo.path, |
||
190 | target_path, checkout=True, bare=True, errstream=errstream) |
||
191 | |||
192 | |||
193 | class InitTests(TestCase): |
||
194 | |||
195 | def test_non_bare(self): |
||
196 | repo_dir = tempfile.mkdtemp() |
||
197 | self.addCleanup(shutil.rmtree, repo_dir)
|
||
198 | porcelain.init(repo_dir) |
||
199 | |||
200 | def test_bare(self): |
||
201 | repo_dir = tempfile.mkdtemp() |
||
202 | self.addCleanup(shutil.rmtree, repo_dir)
|
||
203 | porcelain.init(repo_dir, bare=True)
|
||
204 | |||
205 | |||
206 | class AddTests(PorcelainTestCase): |
||
207 | |||
208 | def test_add_default_paths(self): |
||
209 | |||
210 | # create a file for initial commit
|
||
211 | with open(os.path.join(self.repo.path, 'blah'), 'w') as f: |
||
212 | f.write("\n")
|
||
213 | porcelain.add(repo=self.repo.path, paths=['blah']) |
||
214 | porcelain.commit(repo=self.repo.path, message=b'test', |
||
215 | author=b'test', committer=b'test') |
||
216 | |||
217 | # Add a second test file and a file in a directory
|
||
218 | with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
||
219 | f.write("\n")
|
||
220 | os.mkdir(os.path.join(self.repo.path, 'adir')) |
||
221 | with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f: |
||
222 | f.write("\n")
|
||
223 | porcelain.add(self.repo.path)
|
||
224 | |||
225 | # Check that foo was added and nothing in .git was modified
|
||
226 | index = self.repo.open_index()
|
||
227 | self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo']) |
||
228 | |||
229 | def test_add_file(self): |
||
230 | with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
||
231 | f.write("BAR")
|
||
232 | porcelain.add(self.repo.path, paths=["foo"]) |
||
233 | |||
234 | |||
235 | class RemoveTests(PorcelainTestCase): |
||
236 | |||
237 | def test_remove_file(self): |
||
238 | with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
||
239 | f.write("BAR")
|
||
240 | porcelain.add(self.repo.path, paths=["foo"]) |
||
241 | porcelain.rm(self.repo.path, paths=["foo"]) |
||
242 | |||
243 | |||
244 | class LogTests(PorcelainTestCase): |
||
245 | |||
246 | def test_simple(self): |
||
247 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
248 | [3, 1, 2]]) |
||
249 | self.repo.refs[b"HEAD"] = c3.id |
||
250 | outstream = StringIO() |
||
251 | porcelain.log(self.repo.path, outstream=outstream)
|
||
252 | self.assertEqual(3, outstream.getvalue().count("-" * 50)) |
||
253 | |||
254 | def test_max_entries(self): |
||
255 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
256 | [3, 1, 2]]) |
||
257 | self.repo.refs[b"HEAD"] = c3.id |
||
258 | outstream = StringIO() |
||
259 | porcelain.log(self.repo.path, outstream=outstream, max_entries=1) |
||
260 | self.assertEqual(1, outstream.getvalue().count("-" * 50)) |
||
261 | |||
262 | |||
263 | class ShowTests(PorcelainTestCase): |
||
264 | |||
265 | def test_nolist(self): |
||
266 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
267 | [3, 1, 2]]) |
||
268 | self.repo.refs[b"HEAD"] = c3.id |
||
269 | outstream = StringIO() |
||
270 | porcelain.show(self.repo.path, objects=c3.id, outstream=outstream)
|
||
271 | self.assertTrue(outstream.getvalue().startswith("-" * 50)) |
||
272 | |||
273 | def test_simple(self): |
||
274 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
275 | [3, 1, 2]]) |
||
276 | self.repo.refs[b"HEAD"] = c3.id |
||
277 | outstream = StringIO() |
||
278 | porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream)
|
||
279 | self.assertTrue(outstream.getvalue().startswith("-" * 50)) |
||
280 | |||
281 | def test_blob(self): |
||
282 | b = Blob.from_string(b"The Foo\n")
|
||
283 | self.repo.object_store.add_object(b)
|
||
284 | outstream = StringIO() |
||
285 | porcelain.show(self.repo.path, objects=[b.id], outstream=outstream)
|
||
286 | self.assertEqual(outstream.getvalue(), "The Foo\n") |
||
287 | |||
288 | |||
289 | class SymbolicRefTests(PorcelainTestCase): |
||
290 | |||
291 | def test_set_wrong_symbolic_ref(self): |
||
292 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
293 | [3, 1, 2]]) |
||
294 | self.repo.refs[b"HEAD"] = c3.id |
||
295 | |||
296 | self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path, b'foobar') |
||
297 | |||
298 | def test_set_force_wrong_symbolic_ref(self): |
||
299 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
300 | [3, 1, 2]]) |
||
301 | self.repo.refs[b"HEAD"] = c3.id |
||
302 | |||
303 | porcelain.symbolic_ref(self.repo.path, b'force_foobar', force=True) |
||
304 | |||
305 | #test if we actually changed the file
|
||
306 | with self.repo.get_named_file('HEAD') as f: |
||
307 | new_ref = f.read() |
||
308 | self.assertEqual(new_ref, b'ref: refs/heads/force_foobar\n') |
||
309 | |||
310 | def test_set_symbolic_ref(self): |
||
311 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
312 | [3, 1, 2]]) |
||
313 | self.repo.refs[b"HEAD"] = c3.id |
||
314 | |||
315 | porcelain.symbolic_ref(self.repo.path, b'master') |
||
316 | |||
317 | def test_set_symbolic_ref_other_than_master(self): |
||
318 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
319 | [3, 1, 2]], attrs=dict(refs='develop')) |
||
320 | self.repo.refs[b"HEAD"] = c3.id |
||
321 | self.repo.refs[b"refs/heads/develop"] = c3.id |
||
322 | |||
323 | porcelain.symbolic_ref(self.repo.path, b'develop') |
||
324 | |||
325 | #test if we actually changed the file
|
||
326 | with self.repo.get_named_file('HEAD') as f: |
||
327 | new_ref = f.read() |
||
328 | self.assertEqual(new_ref, b'ref: refs/heads/develop\n') |
||
329 | |||
330 | |||
331 | class DiffTreeTests(PorcelainTestCase): |
||
332 | |||
333 | def test_empty(self): |
||
334 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
335 | [3, 1, 2]]) |
||
336 | self.repo.refs[b"HEAD"] = c3.id |
||
337 | outstream = BytesIO() |
||
338 | porcelain.diff_tree(self.repo.path, c2.tree, c3.tree, outstream=outstream)
|
||
339 | self.assertEqual(outstream.getvalue(), b"") |
||
340 | |||
341 | |||
342 | class CommitTreeTests(PorcelainTestCase): |
||
343 | |||
344 | def test_simple(self): |
||
345 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
346 | [3, 1, 2]]) |
||
347 | b = Blob() |
||
348 | b.data = b"foo the bar"
|
||
349 | t = Tree() |
||
350 | t.add(b"somename", 0o100644, b.id) |
||
351 | self.repo.object_store.add_object(t)
|
||
352 | self.repo.object_store.add_object(b)
|
||
353 | sha = porcelain.commit_tree( |
||
354 | self.repo.path, t.id, message=b"Withcommit.", |
||
355 | author=b"Joe <joe@example.com>",
|
||
356 | committer=b"Jane <jane@example.com>")
|
||
357 | self.assertTrue(isinstance(sha, bytes)) |
||
358 | self.assertEqual(len(sha), 40) |
||
359 | |||
360 | |||
361 | class RevListTests(PorcelainTestCase): |
||
362 | |||
363 | def test_simple(self): |
||
364 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
365 | [3, 1, 2]]) |
||
366 | outstream = BytesIO() |
||
367 | porcelain.rev_list( |
||
368 | self.repo.path, [c3.id], outstream=outstream)
|
||
369 | self.assertEqual(
|
||
370 | c3.id + b"\n" +
|
||
371 | c2.id + b"\n" +
|
||
372 | c1.id + b"\n",
|
||
373 | outstream.getvalue()) |
||
374 | |||
375 | |||
376 | class TagCreateTests(PorcelainTestCase): |
||
377 | |||
378 | def test_annotated(self): |
||
379 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
380 | [3, 1, 2]]) |
||
381 | self.repo.refs[b"HEAD"] = c3.id |
||
382 | |||
383 | porcelain.tag_create(self.repo.path, b"tryme", b'foo <foo@bar.com>', |
||
384 | b'bar', annotated=True) |
||
385 | |||
386 | tags = self.repo.refs.as_dict(b"refs/tags") |
||
387 | self.assertEqual(list(tags.keys()), [b"tryme"]) |
||
388 | tag = self.repo[b'refs/tags/tryme'] |
||
389 | self.assertTrue(isinstance(tag, Tag)) |
||
390 | self.assertEqual(b"foo <foo@bar.com>", tag.tagger) |
||
391 | self.assertEqual(b"bar", tag.message) |
||
392 | self.assertLess(time.time() - tag.tag_time, 5) |
||
393 | |||
394 | def test_unannotated(self): |
||
395 | c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], |
||
396 | [3, 1, 2]]) |
||
397 | self.repo.refs[b"HEAD"] = c3.id |
||
398 | |||
399 | porcelain.tag_create(self.repo.path, b"tryme", annotated=False) |
||
400 | |||
401 | tags = self.repo.refs.as_dict(b"refs/tags") |
||
402 | self.assertEqual(list(tags.keys()), [b"tryme"]) |
||
403 | self.repo[b'refs/tags/tryme'] |
||
404 | self.assertEqual(list(tags.values()), [self.repo.head()]) |
||
405 | |||
406 | |||
407 | class TagListTests(PorcelainTestCase): |
||
408 | |||
409 | def test_empty(self): |
||
410 | tags = porcelain.tag_list(self.repo.path)
|
||
411 | self.assertEqual([], tags)
|
||
412 | |||
413 | def test_simple(self): |
||
414 | self.repo.refs[b"refs/tags/foo"] = b"aa" * 20 |
||
415 | self.repo.refs[b"refs/tags/bar/bla"] = b"bb" * 20 |
||
416 | tags = porcelain.tag_list(self.repo.path)
|
||
417 | |||
418 | self.assertEqual([b"bar/bla", b"foo"], tags) |
||
419 | |||
420 | |||
421 | class TagDeleteTests(PorcelainTestCase): |
||
422 | |||
423 | def test_simple(self): |
||
424 | [c1] = build_commit_graph(self.repo.object_store, [[1]]) |
||
425 | self.repo[b"HEAD"] = c1.id |
||
426 | porcelain.tag_create(self.repo, b'foo') |
||
427 | self.assertTrue(b"foo" in porcelain.tag_list(self.repo)) |
||
428 | porcelain.tag_delete(self.repo, b'foo') |
||
429 | self.assertFalse(b"foo" in porcelain.tag_list(self.repo)) |
||
430 | |||
431 | |||
432 | class ResetTests(PorcelainTestCase): |
||
433 | |||
434 | def test_hard_head(self): |
||
435 | with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
||
436 | f.write("BAR")
|
||
437 | porcelain.add(self.repo.path, paths=["foo"]) |
||
438 | porcelain.commit(self.repo.path, message=b"Some message", |
||
439 | committer=b"Jane <jane@example.com>",
|
||
440 | author=b"John <john@example.com>")
|
||
441 | |||
442 | with open(os.path.join(self.repo.path, 'foo'), 'wb') as f: |
||
443 | f.write(b"OOH")
|
||
444 | |||
445 | porcelain.reset(self.repo, "hard", b"HEAD") |
||
446 | |||
447 | index = self.repo.open_index()
|
||
448 | changes = list(tree_changes(self.repo, |
||
449 | index.commit(self.repo.object_store),
|
||
450 | self.repo[b'HEAD'].tree)) |
||
451 | |||
452 | self.assertEqual([], changes)
|
||
453 | |||
454 | def test_hard_commit(self): |
||
455 | with open(os.path.join(self.repo.path, 'foo'), 'w') as f: |
||
456 | f.write("BAR")
|
||
457 | porcelain.add(self.repo.path, paths=["foo"]) |
||
458 | sha = porcelain.commit(self.repo.path, message=b"Some message", |
||
459 | committer=b"Jane <jane@example.com>",
|
||
460 | author=b"John <john@example.com>")
|
||
461 | |||
462 | with open(os.path.join(self.repo.path, 'foo'), 'wb') as f: |
||
463 | f.write(b"BAZ")
|
||
464 | porcelain.add(self.repo.path, paths=["foo"]) |
||
465 | porcelain.commit(self.repo.path, message=b"Some other message", |
||
466 | committer=b"Jane <jane@example.com>",
|
||
467 | author=b"John <john@example.com>")
|
||
468 | |||
469 | porcelain.reset(self.repo, "hard", sha) |
||
470 | |||
471 | index = self.repo.open_index()
|
||
472 | changes = list(tree_changes(self.repo, |
||
473 | index.commit(self.repo.object_store),
|
||
474 | self.repo[sha].tree))
|
||
475 | |||
476 | self.assertEqual([], changes)
|
||
477 | |||
478 | |||
479 | class PushTests(PorcelainTestCase): |
||
480 | |||
481 | def test_simple(self): |
||
482 | """
|
||
483 | Basic test of porcelain push where self.repo is the remote. First
|
||
484 | clone the remote, commit a file to the clone, then push the changes
|
||
485 | back to the remote.
|
||
486 | """
|
||
487 | outstream = BytesIO() |
||
488 | errstream = BytesIO() |
||
489 | |||
490 | porcelain.commit(repo=self.repo.path, message=b'init', |
||
491 | author=b'', committer=b'') |
||
492 | |||
493 | # Setup target repo cloned from temp test repo
|
||
494 | clone_path = tempfile.mkdtemp() |
||
495 | self.addCleanup(shutil.rmtree, clone_path)
|
||
496 | target_repo = porcelain.clone(self.repo.path, target=clone_path,
|
||
497 | errstream=errstream) |
||
498 | try:
|
||
499 | self.assertEqual(target_repo[b'HEAD'], self.repo[b'HEAD']) |
||
500 | finally:
|
||
501 | target_repo.close() |
||
502 | |||
503 | # create a second file to be pushed back to origin
|
||
504 | handle, fullpath = tempfile.mkstemp(dir=clone_path) |
||
505 | os.close(handle) |
||
506 | porcelain.add(repo=clone_path, paths=[os.path.basename(fullpath)]) |
||
507 | porcelain.commit(repo=clone_path, message=b'push',
|
||
508 | author=b'', committer=b'') |
||
509 | |||
510 | # Setup a non-checked out branch in the remote
|
||
511 | refs_path = b"refs/heads/foo"
|
||
512 | new_id = self.repo[b'HEAD'].id |
||
513 | self.assertNotEqual(new_id, ZERO_SHA)
|
||
514 | self.repo.refs[refs_path] = new_id
|
||
515 | |||
516 | # Push to the remote
|
||
517 | porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path, outstream=outstream, |
||
518 | errstream=errstream) |
||
519 | |||
520 | # Check that the target and source
|
||
521 | with Repo(clone_path) as r_clone: |
||
522 | self.assertEqual({
|
||
523 | b'HEAD': new_id,
|
||
524 | b'refs/heads/foo': r_clone[b'HEAD'].id, |
||
525 | b'refs/heads/master': new_id,
|
||
526 | }, self.repo.get_refs())
|
||
527 | self.assertEqual(r_clone[b'HEAD'].id, self.repo.refs[refs_path]) |
||
528 | |||
529 | # Get the change in the target repo corresponding to the add
|
||
530 | # this will be in the foo branch.
|
||
531 | change = list(tree_changes(self.repo, self.repo[b'HEAD'].tree, |
||
532 | self.repo[b'refs/heads/foo'].tree))[0] |
||
533 | self.assertEqual(os.path.basename(fullpath),
|
||
534 | change.new.path.decode('ascii'))
|
||
535 | |||
536 | def test_delete(self): |
||
537 | """Basic test of porcelain push, removing a branch.
|
||
538 | """
|
||
539 | outstream = BytesIO() |
||
540 | errstream = BytesIO() |
||
541 | |||
542 | porcelain.commit(repo=self.repo.path, message=b'init', |
||
543 | author=b'', committer=b'') |
||
544 | |||
545 | # Setup target repo cloned from temp test repo
|
||
546 | clone_path = tempfile.mkdtemp() |
||
547 | self.addCleanup(shutil.rmtree, clone_path)
|
||
548 | target_repo = porcelain.clone(self.repo.path, target=clone_path,
|
||
549 | errstream=errstream) |
||
550 | target_repo.close() |
||
551 | |||
552 | # Setup a non-checked out branch in the remote
|
||
553 | refs_path = b"refs/heads/foo"
|
||
554 | new_id = self.repo[b'HEAD'].id |
||
555 | self.assertNotEqual(new_id, ZERO_SHA)
|
||
556 | self.repo.refs[refs_path] = new_id
|
||
557 | |||
558 | # Push to the remote
|
||
559 | porcelain.push(clone_path, self.repo.path, b":" + refs_path, outstream=outstream, |
||
560 | errstream=errstream) |
||
561 | |||
562 | self.assertEqual({
|
||
563 | b'HEAD': new_id,
|
||
564 | b'refs/heads/master': new_id,
|
||
565 | }, self.repo.get_refs())
|
||
566 | |||
567 | |||
568 | |||
569 | class PullTests(PorcelainTestCase): |
||
570 | |||
571 | def setUp(self): |
||
572 | super(PullTests, self).setUp() |
||
573 | # create a file for initial commit
|
||
574 | handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
||
575 | os.close(handle) |
||
576 | filename = os.path.basename(fullpath) |
||
577 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
578 | porcelain.commit(repo=self.repo.path, message=b'test', |
||
579 | author=b'test', committer=b'test') |
||
580 | |||
581 | # Setup target repo
|
||
582 | self.target_path = tempfile.mkdtemp()
|
||
583 | self.addCleanup(shutil.rmtree, self.target_path) |
||
584 | target_repo = porcelain.clone(self.repo.path, target=self.target_path, |
||
585 | errstream=BytesIO()) |
||
586 | target_repo.close() |
||
587 | |||
588 | # create a second file to be pushed
|
||
589 | handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
||
590 | os.close(handle) |
||
591 | filename = os.path.basename(fullpath) |
||
592 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
593 | porcelain.commit(repo=self.repo.path, message=b'test2', |
||
594 | author=b'test2', committer=b'test2') |
||
595 | |||
596 | self.assertTrue(b'refs/heads/master' in self.repo.refs) |
||
597 | self.assertTrue(b'refs/heads/master' in target_repo.refs) |
||
598 | |||
599 | def test_simple(self): |
||
600 | outstream = BytesIO() |
||
601 | errstream = BytesIO() |
||
602 | |||
603 | # Pull changes into the cloned repo
|
||
604 | porcelain.pull(self.target_path, self.repo.path, b'refs/heads/master', |
||
605 | outstream=outstream, errstream=errstream) |
||
606 | |||
607 | # Check the target repo for pushed changes
|
||
608 | with Repo(self.target_path) as r: |
||
609 | self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) |
||
610 | |||
611 | def test_no_refspec(self): |
||
612 | outstream = BytesIO() |
||
613 | errstream = BytesIO() |
||
614 | |||
615 | # Pull changes into the cloned repo
|
||
616 | porcelain.pull(self.target_path, self.repo.path, outstream=outstream, |
||
617 | errstream=errstream) |
||
618 | |||
619 | # Check the target repo for pushed changes
|
||
620 | with Repo(self.target_path) as r: |
||
621 | self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) |
||
622 | |||
623 | |||
624 | class StatusTests(PorcelainTestCase): |
||
625 | |||
626 | def test_empty(self): |
||
627 | results = porcelain.status(self.repo)
|
||
628 | self.assertEqual(
|
||
629 | {'add': [], 'delete': [], 'modify': []}, |
||
630 | results.staged) |
||
631 | self.assertEqual([], results.unstaged)
|
||
632 | |||
633 | def test_status(self): |
||
634 | """Integration test for `status` functionality."""
|
||
635 | |||
636 | # Commit a dummy file then modify it
|
||
637 | fullpath = os.path.join(self.repo.path, 'foo') |
||
638 | with open(fullpath, 'w') as f: |
||
639 | f.write('origstuff')
|
||
640 | |||
641 | porcelain.add(repo=self.repo.path, paths=['foo']) |
||
642 | porcelain.commit(repo=self.repo.path, message=b'test status', |
||
643 | author=b'', committer=b'') |
||
644 | |||
645 | # modify access and modify time of path
|
||
646 | os.utime(fullpath, (0, 0)) |
||
647 | |||
648 | with open(fullpath, 'wb') as f: |
||
649 | f.write(b'stuff')
|
||
650 | |||
651 | # Make a dummy file and stage it
|
||
652 | filename_add = 'bar'
|
||
653 | fullpath = os.path.join(self.repo.path, filename_add)
|
||
654 | with open(fullpath, 'w') as f: |
||
655 | f.write('stuff')
|
||
656 | porcelain.add(repo=self.repo.path, paths=filename_add)
|
||
657 | |||
658 | results = porcelain.status(self.repo)
|
||
659 | |||
660 | self.assertEqual(results.staged['add'][0], filename_add.encode('ascii')) |
||
661 | self.assertEqual(results.unstaged, [b'foo']) |
||
662 | |||
663 | def test_get_tree_changes_add(self): |
||
664 | """Unit test for get_tree_changes add."""
|
||
665 | |||
666 | # Make a dummy file, stage
|
||
667 | filename = 'bar'
|
||
668 | with open(os.path.join(self.repo.path, filename), 'w') as f: |
||
669 | f.write('stuff')
|
||
670 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
671 | porcelain.commit(repo=self.repo.path, message=b'test status', |
||
672 | author=b'', committer=b'') |
||
673 | |||
674 | filename = 'foo'
|
||
675 | with open(os.path.join(self.repo.path, filename), 'w') as f: |
||
676 | f.write('stuff')
|
||
677 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
678 | changes = porcelain.get_tree_changes(self.repo.path)
|
||
679 | |||
680 | self.assertEqual(changes['add'][0], filename.encode('ascii')) |
||
681 | self.assertEqual(len(changes['add']), 1) |
||
682 | self.assertEqual(len(changes['modify']), 0) |
||
683 | self.assertEqual(len(changes['delete']), 0) |
||
684 | |||
685 | def test_get_tree_changes_modify(self): |
||
686 | """Unit test for get_tree_changes modify."""
|
||
687 | |||
688 | # Make a dummy file, stage, commit, modify
|
||
689 | filename = 'foo'
|
||
690 | fullpath = os.path.join(self.repo.path, filename)
|
||
691 | with open(fullpath, 'w') as f: |
||
692 | f.write('stuff')
|
||
693 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
694 | porcelain.commit(repo=self.repo.path, message=b'test status', |
||
695 | author=b'', committer=b'') |
||
696 | with open(fullpath, 'w') as f: |
||
697 | f.write('otherstuff')
|
||
698 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
699 | changes = porcelain.get_tree_changes(self.repo.path)
|
||
700 | |||
701 | self.assertEqual(changes['modify'][0], filename.encode('ascii')) |
||
702 | self.assertEqual(len(changes['add']), 0) |
||
703 | self.assertEqual(len(changes['modify']), 1) |
||
704 | self.assertEqual(len(changes['delete']), 0) |
||
705 | |||
706 | def test_get_tree_changes_delete(self): |
||
707 | """Unit test for get_tree_changes delete."""
|
||
708 | |||
709 | # Make a dummy file, stage, commit, remove
|
||
710 | filename = 'foo'
|
||
711 | with open(os.path.join(self.repo.path, filename), 'w') as f: |
||
712 | f.write('stuff')
|
||
713 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
714 | porcelain.commit(repo=self.repo.path, message=b'test status', |
||
715 | author=b'', committer=b'') |
||
716 | porcelain.rm(repo=self.repo.path, paths=[filename])
|
||
717 | changes = porcelain.get_tree_changes(self.repo.path)
|
||
718 | |||
719 | self.assertEqual(changes['delete'][0], filename.encode('ascii')) |
||
720 | self.assertEqual(len(changes['add']), 0) |
||
721 | self.assertEqual(len(changes['modify']), 0) |
||
722 | self.assertEqual(len(changes['delete']), 1) |
||
723 | |||
724 | |||
725 | # TODO(jelmer): Add test for dulwich.porcelain.daemon
|
||
726 | |||
727 | |||
728 | class UploadPackTests(PorcelainTestCase): |
||
729 | """Tests for upload_pack."""
|
||
730 | |||
731 | def test_upload_pack(self): |
||
732 | outf = BytesIO() |
||
733 | exitcode = porcelain.upload_pack(self.repo.path, BytesIO(b"0000"), outf) |
||
734 | outlines = outf.getvalue().splitlines() |
||
735 | self.assertEqual([b"0000"], outlines) |
||
736 | self.assertEqual(0, exitcode) |
||
737 | |||
738 | |||
739 | class ReceivePackTests(PorcelainTestCase): |
||
740 | """Tests for receive_pack."""
|
||
741 | |||
742 | def test_receive_pack(self): |
||
743 | filename = 'foo'
|
||
744 | with open(os.path.join(self.repo.path, filename), 'w') as f: |
||
745 | f.write('stuff')
|
||
746 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
747 | self.repo.do_commit(message=b'test status', |
||
748 | author=b'', committer=b'', author_timestamp=1402354300, |
||
749 | commit_timestamp=1402354300, author_timezone=0, commit_timezone=0) |
||
750 | outf = BytesIO() |
||
751 | exitcode = porcelain.receive_pack(self.repo.path, BytesIO(b"0000"), outf) |
||
752 | outlines = outf.getvalue().splitlines() |
||
753 | self.assertEqual([
|
||
754 | b'00739e65bdcf4a22cdd4f3700604a275cd2aaf146b23 HEAD\x00 report-status '
|
||
755 | b'delete-refs quiet ofs-delta side-band-64k no-done',
|
||
756 | b'003f9e65bdcf4a22cdd4f3700604a275cd2aaf146b23 refs/heads/master',
|
||
757 | b'0000'], outlines)
|
||
758 | self.assertEqual(0, exitcode) |
||
759 | |||
760 | |||
761 | class BranchListTests(PorcelainTestCase): |
||
762 | |||
763 | def test_standard(self): |
||
764 | self.assertEqual(set([]), set(porcelain.branch_list(self.repo))) |
||
765 | |||
766 | def test_new_branch(self): |
||
767 | [c1] = build_commit_graph(self.repo.object_store, [[1]]) |
||
768 | self.repo[b"HEAD"] = c1.id |
||
769 | porcelain.branch_create(self.repo, b"foo") |
||
770 | self.assertEqual(
|
||
771 | set([b"master", b"foo"]), |
||
772 | set(porcelain.branch_list(self.repo))) |
||
773 | |||
774 | |||
775 | class BranchCreateTests(PorcelainTestCase): |
||
776 | |||
777 | def test_branch_exists(self): |
||
778 | [c1] = build_commit_graph(self.repo.object_store, [[1]]) |
||
779 | self.repo[b"HEAD"] = c1.id |
||
780 | porcelain.branch_create(self.repo, b"foo") |
||
781 | self.assertRaises(KeyError, porcelain.branch_create, self.repo, b"foo") |
||
782 | porcelain.branch_create(self.repo, b"foo", force=True) |
||
783 | |||
784 | def test_new_branch(self): |
||
785 | [c1] = build_commit_graph(self.repo.object_store, [[1]]) |
||
786 | self.repo[b"HEAD"] = c1.id |
||
787 | porcelain.branch_create(self.repo, b"foo") |
||
788 | self.assertEqual(
|
||
789 | set([b"master", b"foo"]), |
||
790 | set(porcelain.branch_list(self.repo))) |
||
791 | |||
792 | |||
793 | class BranchDeleteTests(PorcelainTestCase): |
||
794 | |||
795 | def test_simple(self): |
||
796 | [c1] = build_commit_graph(self.repo.object_store, [[1]]) |
||
797 | self.repo[b"HEAD"] = c1.id |
||
798 | porcelain.branch_create(self.repo, b'foo') |
||
799 | self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) |
||
800 | porcelain.branch_delete(self.repo, b'foo') |
||
801 | self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) |
||
802 | |||
803 | |||
804 | class FetchTests(PorcelainTestCase): |
||
805 | |||
806 | def test_simple(self): |
||
807 | outstream = BytesIO() |
||
808 | errstream = BytesIO() |
||
809 | |||
810 | # create a file for initial commit
|
||
811 | handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
||
812 | os.close(handle) |
||
813 | filename = os.path.basename(fullpath) |
||
814 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
815 | porcelain.commit(repo=self.repo.path, message=b'test', |
||
816 | author=b'test', committer=b'test') |
||
817 | |||
818 | # Setup target repo
|
||
819 | target_path = tempfile.mkdtemp() |
||
820 | self.addCleanup(shutil.rmtree, target_path)
|
||
821 | target_repo = porcelain.clone(self.repo.path, target=target_path,
|
||
822 | errstream=errstream) |
||
823 | |||
824 | # create a second file to be pushed
|
||
825 | handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
||
826 | os.close(handle) |
||
827 | filename = os.path.basename(fullpath) |
||
828 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
829 | porcelain.commit(repo=self.repo.path, message=b'test2', |
||
830 | author=b'test2', committer=b'test2') |
||
831 | |||
832 | self.assertFalse(self.repo[b'HEAD'].id in target_repo) |
||
833 | target_repo.close() |
||
834 | |||
835 | # Fetch changes into the cloned repo
|
||
836 | porcelain.fetch(target_path, self.repo.path, outstream=outstream,
|
||
837 | errstream=errstream) |
||
838 | |||
839 | # Check the target repo for pushed changes
|
||
840 | with Repo(target_path) as r: |
||
841 | self.assertTrue(self.repo[b'HEAD'].id in r) |
||
842 | |||
843 | |||
844 | class RepackTests(PorcelainTestCase): |
||
845 | |||
846 | def test_empty(self): |
||
847 | porcelain.repack(self.repo)
|
||
848 | |||
849 | def test_simple(self): |
||
850 | handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
|
||
851 | os.close(handle) |
||
852 | filename = os.path.basename(fullpath) |
||
853 | porcelain.add(repo=self.repo.path, paths=filename)
|
||
854 | porcelain.repack(self.repo)
|
||
855 | |||
856 | |||
857 | class LsTreeTests(PorcelainTestCase): |
||
858 | |||
859 | def test_empty(self): |
||
860 | porcelain.commit(repo=self.repo.path, message=b'test status', |
||
861 | author=b'', committer=b'') |
||
862 | |||
863 | f = StringIO() |
||
864 | porcelain.ls_tree(self.repo, b"HEAD", outstream=f) |
||
865 | self.assertEqual(f.getvalue(), "") |
||
866 | |||
867 | def test_simple(self): |
||
868 | # Commit a dummy file then modify it
|
||
869 | fullpath = os.path.join(self.repo.path, 'foo') |
||
870 | with open(fullpath, 'w') as f: |
||
871 | f.write('origstuff')
|
||
872 | |||
873 | porcelain.add(repo=self.repo.path, paths=['foo']) |
||
874 | porcelain.commit(repo=self.repo.path, message=b'test status', |
||
875 | author=b'', committer=b'') |
||
876 | |||
877 | f = StringIO() |
||
878 | porcelain.ls_tree(self.repo, b"HEAD", outstream=f) |
||
879 | self.assertEqual(
|
||
880 | f.getvalue(), |
||
881 | '100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tfoo\n')
|
||
882 | |||
883 | |||
884 | class LsRemoteTests(PorcelainTestCase): |
||
885 | |||
886 | def test_empty(self): |
||
887 | self.assertEqual({}, porcelain.ls_remote(self.repo.path)) |
||
888 | |||
889 | def test_some(self): |
||
890 | cid = porcelain.commit(repo=self.repo.path, message=b'test status', |
||
891 | author=b'', committer=b'') |
||
892 | |||
893 | self.assertEqual({
|
||
894 | b'refs/heads/master': cid,
|
||
895 | b'HEAD': cid},
|
||
896 | porcelain.ls_remote(self.repo.path)) |