gvsig-scripting / org.gvsig.scripting / trunk / org.gvsig.scripting / org.gvsig.scripting.app / org.gvsig.scripting.app.mainplugin / src / main / resources-plugin / scripting / lib / dulwich / greenthreads.py @ 959
History | View | Annotate | Download (4.97 KB)
1 |
# greenthreads.py -- Utility module for querying an ObjectStore with gevent
|
---|---|
2 |
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
|
3 |
#
|
4 |
# Author: Fabien Boucher <fabien.boucher@enovance.com>
|
5 |
#
|
6 |
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
|
7 |
# General Public License as public by the Free Software Foundation; version 2.0
|
8 |
# or (at your option) any later version. You can redistribute it and/or
|
9 |
# modify it under the terms of either of these two licenses.
|
10 |
#
|
11 |
# Unless required by applicable law or agreed to in writing, software
|
12 |
# distributed under the License is distributed on an "AS IS" BASIS,
|
13 |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 |
# See the License for the specific language governing permissions and
|
15 |
# limitations under the License.
|
16 |
#
|
17 |
# You should have received a copy of the licenses; if not, see
|
18 |
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
|
19 |
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
|
20 |
# License, Version 2.0.
|
21 |
#
|
22 |
|
23 |
"""Utility module for querying an ObjectStore with gevent."""
|
24 |
|
25 |
import gevent |
26 |
from gevent import pool |
27 |
|
28 |
from dulwich.objects import ( |
29 |
Commit, |
30 |
Tag, |
31 |
) |
32 |
from dulwich.object_store import ( |
33 |
MissingObjectFinder, |
34 |
_collect_filetree_revs, |
35 |
ObjectStoreIterator, |
36 |
) |
37 |
|
38 |
|
39 |
def _split_commits_and_tags(obj_store, lst, |
40 |
ignore_unknown=False, pool=None): |
41 |
"""Split object id list into two list with commit SHA1s and tag SHA1s.
|
42 |
|
43 |
Same implementation as object_store._split_commits_and_tags
|
44 |
except we use gevent to parallelize object retrieval.
|
45 |
"""
|
46 |
commits = set()
|
47 |
tags = set()
|
48 |
|
49 |
def find_commit_type(sha): |
50 |
try:
|
51 |
o = obj_store[sha] |
52 |
except KeyError: |
53 |
if not ignore_unknown: |
54 |
raise
|
55 |
else:
|
56 |
if isinstance(o, Commit): |
57 |
commits.add(sha) |
58 |
elif isinstance(o, Tag): |
59 |
tags.add(sha) |
60 |
commits.add(o.object[1])
|
61 |
else:
|
62 |
raise KeyError('Not a commit or a tag: %s' % sha) |
63 |
jobs = [pool.spawn(find_commit_type, s) for s in lst] |
64 |
gevent.joinall(jobs) |
65 |
return (commits, tags)
|
66 |
|
67 |
|
68 |
class GreenThreadsMissingObjectFinder(MissingObjectFinder): |
69 |
"""Find the objects missing from another object store.
|
70 |
|
71 |
Same implementation as object_store.MissingObjectFinder
|
72 |
except we use gevent to parallelize object retrieval.
|
73 |
"""
|
74 |
def __init__(self, object_store, haves, wants, |
75 |
progress=None, get_tagged=None, |
76 |
concurrency=1, get_parents=None): |
77 |
|
78 |
def collect_tree_sha(sha): |
79 |
self.sha_done.add(sha)
|
80 |
cmt = object_store[sha] |
81 |
_collect_filetree_revs(object_store, cmt.tree, self.sha_done)
|
82 |
|
83 |
self.object_store = object_store
|
84 |
p = pool.Pool(size=concurrency) |
85 |
|
86 |
have_commits, have_tags = \ |
87 |
_split_commits_and_tags(object_store, haves, |
88 |
True, p)
|
89 |
want_commits, want_tags = \ |
90 |
_split_commits_and_tags(object_store, wants, |
91 |
False, p)
|
92 |
all_ancestors = object_store._collect_ancestors(have_commits)[0]
|
93 |
missing_commits, common_commits = \ |
94 |
object_store._collect_ancestors(want_commits, all_ancestors) |
95 |
|
96 |
self.sha_done = set() |
97 |
jobs = [p.spawn(collect_tree_sha, c) for c in common_commits] |
98 |
gevent.joinall(jobs) |
99 |
for t in have_tags: |
100 |
self.sha_done.add(t)
|
101 |
missing_tags = want_tags.difference(have_tags) |
102 |
wants = missing_commits.union(missing_tags) |
103 |
self.objects_to_send = set([(w, None, False) for w in wants]) |
104 |
if progress is None: |
105 |
self.progress = lambda x: None |
106 |
else:
|
107 |
self.progress = progress
|
108 |
self._tagged = get_tagged and get_tagged() or {} |
109 |
|
110 |
|
111 |
class GreenThreadsObjectStoreIterator(ObjectStoreIterator): |
112 |
"""ObjectIterator that works on top of an ObjectStore.
|
113 |
|
114 |
Same implementation as object_store.ObjectStoreIterator
|
115 |
except we use gevent to parallelize object retrieval.
|
116 |
"""
|
117 |
def __init__(self, store, shas, finder, concurrency=1): |
118 |
self.finder = finder
|
119 |
self.p = pool.Pool(size=concurrency)
|
120 |
super(GreenThreadsObjectStoreIterator, self).__init__(store, shas) |
121 |
|
122 |
def retrieve(self, args): |
123 |
sha, path = args |
124 |
return self.store[sha], path |
125 |
|
126 |
def __iter__(self): |
127 |
for sha, path in self.p.imap_unordered(self.retrieve, |
128 |
self.itershas()):
|
129 |
yield sha, path
|
130 |
|
131 |
def __len__(self): |
132 |
if len(self._shas) > 0: |
133 |
return len(self._shas) |
134 |
while len(self.finder.objects_to_send): |
135 |
jobs = [] |
136 |
for _ in range(0, len(self.finder.objects_to_send)): |
137 |
jobs.append(self.p.spawn(self.finder.next)) |
138 |
gevent.joinall(jobs) |
139 |
for j in jobs: |
140 |
if j.value is not None: |
141 |
self._shas.append(j.value)
|
142 |
return len(self._shas) |