Statistics
| Revision:

svn-gvsig-desktop / tags / v1_1_Build_914 / extensions / extScripting / scripts / jython / Lib / xdrlib.py @ 11873

History | View | Annotate | Download (7.09 KB)

1
"""Implements (a subset of) Sun XDR -- eXternal Data Representation.
2

3
See: RFC 1014
4

5
"""
6

    
7
import struct
8

    
9
__all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
10

    
11
# exceptions
12
class Error:
13
    """Exception class for this module. Use:
14

15
    except xdrlib.Error, var:
16
        # var has the Error instance for the exception
17

18
    Public ivars:
19
        msg -- contains the message
20

21
    """
22
    def __init__(self, msg):
23
        self.msg = msg
24
    def __repr__(self):
25
        return repr(self.msg)
26
    def __str__(self):
27
        return str(self.msg)
28

    
29

    
30
class ConversionError(Error):
31
    pass
32

    
33

    
34

    
35
class Packer:
36
    """Pack various data representations into a buffer."""
37

    
38
    def __init__(self):
39
        self.reset()
40

    
41
    def reset(self):
42
        self.__buf = ''
43

    
44
    def get_buffer(self):
45
        return self.__buf
46
    # backwards compatibility
47
    get_buf = get_buffer
48

    
49
    def pack_uint(self, x):
50
        self.__buf = self.__buf + struct.pack('>L', x)
51

    
52
    pack_int = pack_uint
53
    pack_enum = pack_int
54

    
55
    def pack_bool(self, x):
56
        if x: self.__buf = self.__buf + '\0\0\0\1'
57
        else: self.__buf = self.__buf + '\0\0\0\0'
58

    
59
    def pack_uhyper(self, x):
60
        self.pack_uint(x>>32 & 0xffffffffL)
61
        self.pack_uint(x & 0xffffffffL)
62

    
63
    pack_hyper = pack_uhyper
64

    
65
    def pack_float(self, x):
66
        try: self.__buf = self.__buf + struct.pack('>f', x)
67
        except struct.error, msg:
68
            raise ConversionError, msg
69

    
70
    def pack_double(self, x):
71
        try: self.__buf = self.__buf + struct.pack('>d', x)
72
        except struct.error, msg:
73
            raise ConversionError, msg
74

    
75
    def pack_fstring(self, n, s):
76
        if n < 0:
77
            raise ValueError, 'fstring size must be nonnegative'
78
        n = ((n+3)/4)*4
79
        data = s[:n]
80
        data = data + (n - len(data)) * '\0'
81
        self.__buf = self.__buf + data
82

    
83
    pack_fopaque = pack_fstring
84

    
85
    def pack_string(self, s):
86
        n = len(s)
87
        self.pack_uint(n)
88
        self.pack_fstring(n, s)
89

    
90
    pack_opaque = pack_string
91
    pack_bytes = pack_string
92

    
93
    def pack_list(self, list, pack_item):
94
        for item in list:
95
            self.pack_uint(1)
96
            pack_item(item)
97
        self.pack_uint(0)
98

    
99
    def pack_farray(self, n, list, pack_item):
100
        if len(list) != n:
101
            raise ValueError, 'wrong array size'
102
        for item in list:
103
            pack_item(item)
104

    
105
    def pack_array(self, list, pack_item):
106
        n = len(list)
107
        self.pack_uint(n)
108
        self.pack_farray(n, list, pack_item)
109

    
110

    
111

    
112
class Unpacker:
113
    """Unpacks various data representations from the given buffer."""
114

    
115
    def __init__(self, data):
116
        self.reset(data)
117

    
118
    def reset(self, data):
119
        self.__buf = data
120
        self.__pos = 0
121

    
122
    def get_position(self):
123
        return self.__pos
124

    
125
    def set_position(self, position):
126
        self.__pos = position
127

    
128
    def get_buffer(self):
129
        return self.__buf
130

    
131
    def done(self):
132
        if self.__pos < len(self.__buf):
133
            raise Error('unextracted data remains')
134

    
135
    def unpack_uint(self):
136
        i = self.__pos
137
        self.__pos = j = i+4
138
        data = self.__buf[i:j]
139
        if len(data) < 4:
140
            raise EOFError
141
        x = struct.unpack('>L', data)[0]
142
        try:
143
            return int(x)
144
        except OverflowError:
145
            return x
146

    
147
    def unpack_int(self):
148
        i = self.__pos
149
        self.__pos = j = i+4
150
        data = self.__buf[i:j]
151
        if len(data) < 4:
152
            raise EOFError
153
        return struct.unpack('>l', data)[0]
154

    
155
    unpack_enum = unpack_int
156
    unpack_bool = unpack_int
157

    
158
    def unpack_uhyper(self):
159
        hi = self.unpack_uint()
160
        lo = self.unpack_uint()
161
        return long(hi)<<32 | lo
162

    
163
    def unpack_hyper(self):
164
        x = self.unpack_uhyper()
165
        if x >= 0x8000000000000000L:
166
            x = x - 0x10000000000000000L
167
        return x
168

    
169
    def unpack_float(self):
170
        i = self.__pos
171
        self.__pos = j = i+4
172
        data = self.__buf[i:j]
173
        if len(data) < 4:
174
            raise EOFError
175
        return struct.unpack('>f', data)[0]
176

    
177
    def unpack_double(self):
178
        i = self.__pos
179
        self.__pos = j = i+8
180
        data = self.__buf[i:j]
181
        if len(data) < 8:
182
            raise EOFError
183
        return struct.unpack('>d', data)[0]
184

    
185
    def unpack_fstring(self, n):
186
        if n < 0:
187
            raise ValueError, 'fstring size must be nonnegative'
188
        i = self.__pos
189
        j = i + (n+3)/4*4
190
        if j > len(self.__buf):
191
            raise EOFError
192
        self.__pos = j
193
        return self.__buf[i:i+n]
194

    
195
    unpack_fopaque = unpack_fstring
196

    
197
    def unpack_string(self):
198
        n = self.unpack_uint()
199
        return self.unpack_fstring(n)
200

    
201
    unpack_opaque = unpack_string
202
    unpack_bytes = unpack_string
203

    
204
    def unpack_list(self, unpack_item):
205
        list = []
206
        while 1:
207
            x = self.unpack_uint()
208
            if x == 0: break
209
            if x != 1:
210
                raise ConversionError, '0 or 1 expected, got ' + `x`
211
            item = unpack_item()
212
            list.append(item)
213
        return list
214

    
215
    def unpack_farray(self, n, unpack_item):
216
        list = []
217
        for i in range(n):
218
            list.append(unpack_item())
219
        return list
220

    
221
    def unpack_array(self, unpack_item):
222
        n = self.unpack_uint()
223
        return self.unpack_farray(n, unpack_item)
224

    
225

    
226
# test suite
227
def _test():
228
    p = Packer()
229
    packtest = [
230
        (p.pack_uint,    (9,)),
231
        (p.pack_bool,    (None,)),
232
        (p.pack_bool,    ('hello',)),
233
        (p.pack_uhyper,  (45L,)),
234
        (p.pack_float,   (1.9,)),
235
        (p.pack_double,  (1.9,)),
236
        (p.pack_string,  ('hello world',)),
237
        (p.pack_list,    (range(5), p.pack_uint)),
238
        (p.pack_array,   (['what', 'is', 'hapnin', 'doctor'], p.pack_string)),
239
        ]
240
    succeedlist = [1] * len(packtest)
241
    count = 0
242
    for method, args in packtest:
243
        print 'pack test', count,
244
        try:
245
            apply(method, args)
246
            print 'succeeded'
247
        except ConversionError, var:
248
            print 'ConversionError:', var.msg
249
            succeedlist[count] = 0
250
        count = count + 1
251
    data = p.get_buffer()
252
    # now verify
253
    up = Unpacker(data)
254
    unpacktest = [
255
        (up.unpack_uint,   (), lambda x: x == 9),
256
        (up.unpack_bool,   (), lambda x: not x),
257
        (up.unpack_bool,   (), lambda x: x),
258
        (up.unpack_uhyper, (), lambda x: x == 45L),
259
        (up.unpack_float,  (), lambda x: 1.89 < x < 1.91),
260
        (up.unpack_double, (), lambda x: 1.89 < x < 1.91),
261
        (up.unpack_string, (), lambda x: x == 'hello world'),
262
        (up.unpack_list,   (up.unpack_uint,), lambda x: x == range(5)),
263
        (up.unpack_array,  (up.unpack_string,),
264
         lambda x: x == ['what', 'is', 'hapnin', 'doctor']),
265
        ]
266
    count = 0
267
    for method, args, pred in unpacktest:
268
        print 'unpack test', count,
269
        try:
270
            if succeedlist[count]:
271
                x = apply(method, args)
272
                print pred(x) and 'succeeded' or 'failed', ':', x
273
            else:
274
                print 'skipping'
275
        except ConversionError, var:
276
            print 'ConversionError:', var.msg
277
        count = count + 1
278

    
279

    
280
if __name__ == '__main__':
281
    _test()