Statistics
| Revision:

gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / tests / test_web.py @ 959

History | View | Annotate | Download (18.7 KB)

1
# test_web.py -- Tests for the git HTTP server
2
# Copyright (C) 2010 Google, Inc.
3
#
4
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
5
# General Public License as public by the Free Software Foundation; version 2.0
6
# or (at your option) any later version. You can redistribute it and/or
7
# modify it under the terms of either of these two licenses.
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
#
15
# You should have received a copy of the licenses; if not, see
16
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
17
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
18
# License, Version 2.0.
19
#
20

    
21
"""Tests for the Git HTTP server."""
22

    
23
from io import BytesIO
24
import gzip
25
import re
26
import os
27

    
28
from dulwich.object_store import (
29
    MemoryObjectStore,
30
    )
31
from dulwich.objects import (
32
    Blob,
33
    )
34
from dulwich.repo import (
35
    BaseRepo,
36
    MemoryRepo,
37
    )
38
from dulwich.server import (
39
    DictBackend,
40
    )
41
from dulwich.tests import (
42
    TestCase,
43
    )
44
from dulwich.web import (
45
    HTTP_OK,
46
    HTTP_NOT_FOUND,
47
    HTTP_FORBIDDEN,
48
    HTTP_ERROR,
49
    GunzipFilter,
50
    send_file,
51
    get_text_file,
52
    get_loose_object,
53
    get_pack_file,
54
    get_idx_file,
55
    get_info_refs,
56
    get_info_packs,
57
    handle_service_request,
58
    _LengthLimitedFile,
59
    HTTPGitRequest,
60
    HTTPGitApplication,
61
    )
62

    
63
from dulwich.tests.utils import (
64
    make_object,
65
    make_tag,
66
    )
67

    
68

    
69
class MinimalistWSGIInputStream(object):
70
    """WSGI input stream with no 'seek()' and 'tell()' methods."""
71
    def __init__(self, data):
72
        self.data = data
73
        self.pos = 0
74

    
75
    def read(self, howmuch):
76
        start = self.pos
77
        end = self.pos + howmuch
78
        if start >= len(self.data):
79
            return ''
80
        self.pos = end
81
        return self.data[start:end]
82

    
83

    
84
class MinimalistWSGIInputStream2(MinimalistWSGIInputStream):
85
    """WSGI input stream with no *working* 'seek()' and 'tell()' methods."""
86
    def seek(self, pos):
87
        raise NotImplementedError
88

    
89
    def tell(self):
90
        raise NotImplementedError
91

    
92

    
93
class TestHTTPGitRequest(HTTPGitRequest):
94
    """HTTPGitRequest with overridden methods to help test caching."""
95

    
96
    def __init__(self, *args, **kwargs):
97
        HTTPGitRequest.__init__(self, *args, **kwargs)
98
        self.cached = None
99

    
100
    def nocache(self):
101
        self.cached = False
102

    
103
    def cache_forever(self):
104
        self.cached = True
105

    
106

    
107
class WebTestCase(TestCase):
108
    """Base TestCase with useful instance vars and utility functions."""
109

    
110
    _req_class = TestHTTPGitRequest
111

    
112
    def setUp(self):
113
        super(WebTestCase, self).setUp()
114
        self._environ = {}
115
        self._req = self._req_class(self._environ, self._start_response,
116
                                    handlers=self._handlers())
117
        self._status = None
118
        self._headers = []
119
        self._output = BytesIO()
120

    
121
    def _start_response(self, status, headers):
122
        self._status = status
123
        self._headers = list(headers)
124
        return self._output.write
125

    
126
    def _handlers(self):
127
        return None
128

    
129
    def assertContentTypeEquals(self, expected):
130
        self.assertTrue(('Content-Type', expected) in self._headers)
131

    
132

    
133
def _test_backend(objects, refs=None, named_files=None):
134
    if not refs:
135
        refs = {}
136
    if not named_files:
137
        named_files = {}
138
    repo = MemoryRepo.init_bare(objects, refs)
139
    for path, contents in named_files.items():
140
        repo._put_named_file(path, contents)
141
    return DictBackend({'/': repo})
142

    
143

    
144
class DumbHandlersTestCase(WebTestCase):
145

    
146
    def test_send_file_not_found(self):
147
        list(send_file(self._req, None, 'text/plain'))
148
        self.assertEqual(HTTP_NOT_FOUND, self._status)
149

    
150
    def test_send_file(self):
151
        f = BytesIO(b'foobar')
152
        output = b''.join(send_file(self._req, f, 'some/thing'))
153
        self.assertEqual(b'foobar', output)
154
        self.assertEqual(HTTP_OK, self._status)
155
        self.assertContentTypeEquals('some/thing')
156
        self.assertTrue(f.closed)
157

    
158
    def test_send_file_buffered(self):
159
        bufsize = 10240
160
        xs = b'x' * bufsize
161
        f = BytesIO(2 * xs)
162
        self.assertEqual([xs, xs],
163
                          list(send_file(self._req, f, 'some/thing')))
164
        self.assertEqual(HTTP_OK, self._status)
165
        self.assertContentTypeEquals('some/thing')
166
        self.assertTrue(f.closed)
167

    
168
    def test_send_file_error(self):
169
        class TestFile(object):
170
            def __init__(self, exc_class):
171
                self.closed = False
172
                self._exc_class = exc_class
173

    
174
            def read(self, size=-1):
175
                raise self._exc_class()
176

    
177
            def close(self):
178
                self.closed = True
179

    
180
        f = TestFile(IOError)
181
        list(send_file(self._req, f, 'some/thing'))
182
        self.assertEqual(HTTP_ERROR, self._status)
183
        self.assertTrue(f.closed)
184
        self.assertFalse(self._req.cached)
185

    
186
        # non-IOErrors are reraised
187
        f = TestFile(AttributeError)
188
        self.assertRaises(AttributeError, list,
189
                          send_file(self._req, f, 'some/thing'))
190
        self.assertTrue(f.closed)
191
        self.assertFalse(self._req.cached)
192

    
193
    def test_get_text_file(self):
194
        backend = _test_backend([], named_files={'description': b'foo'})
195
        mat = re.search('.*', 'description')
196
        output = b''.join(get_text_file(self._req, backend, mat))
197
        self.assertEqual(b'foo', output)
198
        self.assertEqual(HTTP_OK, self._status)
199
        self.assertContentTypeEquals('text/plain')
200
        self.assertFalse(self._req.cached)
201

    
202
    def test_get_loose_object(self):
203
        blob = make_object(Blob, data=b'foo')
204
        backend = _test_backend([blob])
205
        mat = re.search('^(..)(.{38})$', blob.id.decode('ascii'))
206
        output = b''.join(get_loose_object(self._req, backend, mat))
207
        self.assertEqual(blob.as_legacy_object(), output)
208
        self.assertEqual(HTTP_OK, self._status)
209
        self.assertContentTypeEquals('application/x-git-loose-object')
210
        self.assertTrue(self._req.cached)
211

    
212
    def test_get_loose_object_missing(self):
213
        mat = re.search('^(..)(.{38})$', '1' * 40)
214
        list(get_loose_object(self._req, _test_backend([]), mat))
215
        self.assertEqual(HTTP_NOT_FOUND, self._status)
216

    
217
    def test_get_loose_object_error(self):
218
        blob = make_object(Blob, data=b'foo')
219
        backend = _test_backend([blob])
220
        mat = re.search('^(..)(.{38})$', blob.id.decode('ascii'))
221

    
222
        def as_legacy_object_error(self):
223
            raise IOError
224

    
225
        self.addCleanup(
226
            setattr, Blob, 'as_legacy_object', Blob.as_legacy_object)
227
        Blob.as_legacy_object = as_legacy_object_error
228
        list(get_loose_object(self._req, backend, mat))
229
        self.assertEqual(HTTP_ERROR, self._status)
230

    
231
    def test_get_pack_file(self):
232
        pack_name = os.path.join('objects', 'pack', 'pack-%s.pack' % ('1' * 40))
233
        backend = _test_backend([], named_files={pack_name: b'pack contents'})
234
        mat = re.search('.*', pack_name)
235
        output = b''.join(get_pack_file(self._req, backend, mat))
236
        self.assertEqual(b'pack contents', output)
237
        self.assertEqual(HTTP_OK, self._status)
238
        self.assertContentTypeEquals('application/x-git-packed-objects')
239
        self.assertTrue(self._req.cached)
240

    
241
    def test_get_idx_file(self):
242
        idx_name = os.path.join('objects', 'pack', 'pack-%s.idx' % ('1' * 40))
243
        backend = _test_backend([], named_files={idx_name: b'idx contents'})
244
        mat = re.search('.*', idx_name)
245
        output = b''.join(get_idx_file(self._req, backend, mat))
246
        self.assertEqual(b'idx contents', output)
247
        self.assertEqual(HTTP_OK, self._status)
248
        self.assertContentTypeEquals('application/x-git-packed-objects-toc')
249
        self.assertTrue(self._req.cached)
250

    
251
    def test_get_info_refs(self):
252
        self._environ['QUERY_STRING'] = ''
253

    
254
        blob1 = make_object(Blob, data=b'1')
255
        blob2 = make_object(Blob, data=b'2')
256
        blob3 = make_object(Blob, data=b'3')
257

    
258
        tag1 = make_tag(blob2, name=b'tag-tag')
259

    
260
        objects = [blob1, blob2, blob3, tag1]
261
        refs = {
262
          b'HEAD': b'000',
263
          b'refs/heads/master': blob1.id,
264
          b'refs/tags/tag-tag': tag1.id,
265
          b'refs/tags/blob-tag': blob3.id,
266
          }
267
        backend = _test_backend(objects, refs=refs)
268

    
269
        mat = re.search('.*', '//info/refs')
270
        self.assertEqual([blob1.id + b'\trefs/heads/master\n',
271
                           blob3.id + b'\trefs/tags/blob-tag\n',
272
                           tag1.id + b'\trefs/tags/tag-tag\n',
273
                           blob2.id + b'\trefs/tags/tag-tag^{}\n'],
274
                          list(get_info_refs(self._req, backend, mat)))
275
        self.assertEqual(HTTP_OK, self._status)
276
        self.assertContentTypeEquals('text/plain')
277
        self.assertFalse(self._req.cached)
278

    
279
    def test_get_info_packs(self):
280
        class TestPackData(object):
281

    
282
            def __init__(self, sha):
283
                self.filename = "pack-%s.pack" % sha
284

    
285
        class TestPack(object):
286
            def __init__(self, sha):
287
                self.data = TestPackData(sha)
288

    
289
        packs = [TestPack(str(i) * 40) for i in range(1, 4)]
290

    
291
        class TestObjectStore(MemoryObjectStore):
292
            # property must be overridden, can't be assigned
293
            @property
294
            def packs(self):
295
                return packs
296

    
297
        store = TestObjectStore()
298
        repo = BaseRepo(store, None)
299
        backend = DictBackend({'/': repo})
300
        mat = re.search('.*', '//info/packs')
301
        output = b''.join(get_info_packs(self._req, backend, mat))
302
        expected = b''.join(
303
            [(b'P pack-' + s + b'.pack\n') for s in [b'1' * 40, b'2' * 40, b'3' * 40]])
304
        self.assertEqual(expected, output)
305
        self.assertEqual(HTTP_OK, self._status)
306
        self.assertContentTypeEquals('text/plain')
307
        self.assertFalse(self._req.cached)
308

    
309

    
310
class SmartHandlersTestCase(WebTestCase):
311

    
312
    class _TestUploadPackHandler(object):
313
        def __init__(self, backend, args, proto, http_req=None,
314
                     advertise_refs=False):
315
            self.args = args
316
            self.proto = proto
317
            self.http_req = http_req
318
            self.advertise_refs = advertise_refs
319

    
320
        def handle(self):
321
            self.proto.write(b'handled input: ' + self.proto.recv(1024))
322

    
323
    def _make_handler(self, *args, **kwargs):
324
        self._handler = self._TestUploadPackHandler(*args, **kwargs)
325
        return self._handler
326

    
327
    def _handlers(self):
328
        return {b'git-upload-pack': self._make_handler}
329

    
330
    def test_handle_service_request_unknown(self):
331
        mat = re.search('.*', '/git-evil-handler')
332
        content = list(handle_service_request(self._req, 'backend', mat))
333
        self.assertEqual(HTTP_FORBIDDEN, self._status)
334
        self.assertFalse(b'git-evil-handler' in b"".join(content))
335
        self.assertFalse(self._req.cached)
336

    
337
    def _run_handle_service_request(self, content_length=None):
338
        self._environ['wsgi.input'] = BytesIO(b'foo')
339
        if content_length is not None:
340
            self._environ['CONTENT_LENGTH'] = content_length
341
        mat = re.search('.*', '/git-upload-pack')
342
        handler_output = b''.join(
343
          handle_service_request(self._req, 'backend', mat))
344
        write_output = self._output.getvalue()
345
        # Ensure all output was written via the write callback.
346
        self.assertEqual(b'', handler_output)
347
        self.assertEqual(b'handled input: foo', write_output)
348
        self.assertContentTypeEquals('application/x-git-upload-pack-result')
349
        self.assertFalse(self._handler.advertise_refs)
350
        self.assertTrue(self._handler.http_req)
351
        self.assertFalse(self._req.cached)
352

    
353
    def test_handle_service_request(self):
354
        self._run_handle_service_request()
355

    
356
    def test_handle_service_request_with_length(self):
357
        self._run_handle_service_request(content_length='3')
358

    
359
    def test_handle_service_request_empty_length(self):
360
        self._run_handle_service_request(content_length='')
361

    
362
    def test_get_info_refs_unknown(self):
363
        self._environ['QUERY_STRING'] = 'service=git-evil-handler'
364
        content = list(get_info_refs(self._req, b'backend', None))
365
        self.assertFalse(b'git-evil-handler' in b"".join(content))
366
        self.assertEqual(HTTP_FORBIDDEN, self._status)
367
        self.assertFalse(self._req.cached)
368

    
369
    def test_get_info_refs(self):
370
        self._environ['wsgi.input'] = BytesIO(b'foo')
371
        self._environ['QUERY_STRING'] = 'service=git-upload-pack'
372

    
373
        mat = re.search('.*', '/git-upload-pack')
374
        handler_output = b''.join(get_info_refs(self._req, b'backend', mat))
375
        write_output = self._output.getvalue()
376
        self.assertEqual((b'001e# service=git-upload-pack\n'
377
                           b'0000'
378
                           # input is ignored by the handler
379
                           b'handled input: '), write_output)
380
        # Ensure all output was written via the write callback.
381
        self.assertEqual(b'', handler_output)
382
        self.assertTrue(self._handler.advertise_refs)
383
        self.assertTrue(self._handler.http_req)
384
        self.assertFalse(self._req.cached)
385

    
386

    
387
class LengthLimitedFileTestCase(TestCase):
388
    def test_no_cutoff(self):
389
        f = _LengthLimitedFile(BytesIO(b'foobar'), 1024)
390
        self.assertEqual(b'foobar', f.read())
391

    
392
    def test_cutoff(self):
393
        f = _LengthLimitedFile(BytesIO(b'foobar'), 3)
394
        self.assertEqual(b'foo', f.read())
395
        self.assertEqual(b'', f.read())
396

    
397
    def test_multiple_reads(self):
398
        f = _LengthLimitedFile(BytesIO(b'foobar'), 3)
399
        self.assertEqual(b'fo', f.read(2))
400
        self.assertEqual(b'o', f.read(2))
401
        self.assertEqual(b'', f.read())
402

    
403

    
404
class HTTPGitRequestTestCase(WebTestCase):
405

    
406
    # This class tests the contents of the actual cache headers
407
    _req_class = HTTPGitRequest
408

    
409
    def test_not_found(self):
410
        self._req.cache_forever()  # cache headers should be discarded
411
        message = 'Something not found'
412
        self.assertEqual(message.encode('ascii'), self._req.not_found(message))
413
        self.assertEqual(HTTP_NOT_FOUND, self._status)
414
        self.assertEqual(set([('Content-Type', 'text/plain')]),
415
                          set(self._headers))
416

    
417
    def test_forbidden(self):
418
        self._req.cache_forever()  # cache headers should be discarded
419
        message = 'Something not found'
420
        self.assertEqual(message.encode('ascii'), self._req.forbidden(message))
421
        self.assertEqual(HTTP_FORBIDDEN, self._status)
422
        self.assertEqual(set([('Content-Type', 'text/plain')]),
423
                          set(self._headers))
424

    
425
    def test_respond_ok(self):
426
        self._req.respond()
427
        self.assertEqual([], self._headers)
428
        self.assertEqual(HTTP_OK, self._status)
429

    
430
    def test_respond(self):
431
        self._req.nocache()
432
        self._req.respond(status=402, content_type='some/type',
433
                          headers=[('X-Foo', 'foo'), ('X-Bar', 'bar')])
434
        self.assertEqual(set([
435
          ('X-Foo', 'foo'),
436
          ('X-Bar', 'bar'),
437
          ('Content-Type', 'some/type'),
438
          ('Expires', 'Fri, 01 Jan 1980 00:00:00 GMT'),
439
          ('Pragma', 'no-cache'),
440
          ('Cache-Control', 'no-cache, max-age=0, must-revalidate'),
441
          ]), set(self._headers))
442
        self.assertEqual(402, self._status)
443

    
444

    
445
class HTTPGitApplicationTestCase(TestCase):
446

    
447
    def setUp(self):
448
        super(HTTPGitApplicationTestCase, self).setUp()
449
        self._app = HTTPGitApplication('backend')
450

    
451
        self._environ = {
452
            'PATH_INFO': '/foo',
453
            'REQUEST_METHOD': 'GET',
454
        }
455

    
456
    def _test_handler(self, req, backend, mat):
457
        # tests interface used by all handlers
458
        self.assertEqual(self._environ, req.environ)
459
        self.assertEqual('backend', backend)
460
        self.assertEqual('/foo', mat.group(0))
461
        return 'output'
462

    
463
    def _add_handler(self, app):
464
        req = self._environ['REQUEST_METHOD']
465
        app.services = {
466
          (req, re.compile('/foo$')): self._test_handler,
467
        }
468

    
469
    def test_call(self):
470
        self._add_handler(self._app)
471
        self.assertEqual('output', self._app(self._environ, None))
472

    
473
    def test_fallback_app(self):
474
        def test_app(environ, start_response):
475
            return 'output'
476

    
477
        app = HTTPGitApplication('backend', fallback_app=test_app)
478
        self.assertEqual('output', app(self._environ, None))
479

    
480

    
481
class GunzipTestCase(HTTPGitApplicationTestCase):
482
    __doc__ = """TestCase for testing the GunzipFilter, ensuring the wsgi.input
483
    is correctly decompressed and headers are corrected.
484
    """
485
    example_text = __doc__.encode('ascii')
486

    
487
    def setUp(self):
488
        super(GunzipTestCase, self).setUp()
489
        self._app = GunzipFilter(self._app)
490
        self._environ['HTTP_CONTENT_ENCODING'] = 'gzip'
491
        self._environ['REQUEST_METHOD'] = 'POST'
492

    
493
    def _get_zstream(self, text):
494
        zstream = BytesIO()
495
        zfile = gzip.GzipFile(fileobj=zstream, mode='w')
496
        zfile.write(text)
497
        zfile.close()
498
        zlength = zstream.tell()
499
        zstream.seek(0)
500
        return zstream, zlength
501

    
502
    def _test_call(self, orig, zstream, zlength):
503
        self._add_handler(self._app.app)
504
        self.assertLess(zlength, len(orig))
505
        self.assertEqual(self._environ['HTTP_CONTENT_ENCODING'], 'gzip')
506
        self._environ['CONTENT_LENGTH'] = zlength
507
        self._environ['wsgi.input'] = zstream
508
        self._app(self._environ, None)
509
        buf = self._environ['wsgi.input']
510
        self.assertIsNot(buf, zstream)
511
        buf.seek(0)
512
        self.assertEqual(orig, buf.read())
513
        self.assertIs(None, self._environ.get('CONTENT_LENGTH'))
514
        self.assertNotIn('HTTP_CONTENT_ENCODING', self._environ)
515

    
516
    def test_call(self):
517
        self._test_call(
518
            self.example_text,
519
            *self._get_zstream(self.example_text)
520
        )
521

    
522
    def test_call_no_seek(self):
523
        """
524
        This ensures that the gunzipping code doesn't require any methods on
525
        'wsgi.input' except for '.read()'.  (In particular, it shouldn't
526
        require '.seek()'. See https://github.com/jelmer/dulwich/issues/140.)
527
        """
528
        zstream, zlength = self._get_zstream(self.example_text)
529
        self._test_call(self.example_text,
530
            MinimalistWSGIInputStream(zstream.read()), zlength)
531

    
532
    def test_call_no_working_seek(self):
533
        """
534
        Similar to 'test_call_no_seek', but this time the methods are available
535
        (but defunct).  See https://github.com/jonashaag/klaus/issues/154.
536
        """
537
        zstream, zlength = self._get_zstream(self.example_text)
538
        self._test_call(self.example_text,
539
            MinimalistWSGIInputStream2(zstream.read()), zlength)