This repository was archived by the owner on Nov 23, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 159
Expand file tree
/
Copy pathrparser.pyx
More file actions
198 lines (170 loc) · 6.03 KB
/
Copy pathrparser.pyx
File metadata and controls
198 lines (170 loc) · 6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from itertools import starmap
import cython
cdef class Task
cdef bytes RESPONSE_INTEGER = b':'
cdef bytes RESPONSE_STRING = b'$'
cdef bytes RESPONSE_ARRAY = b'*'
cdef bytes RESPONSE_STATUS = b'+'
cdef bytes RESPONSE_ERROR = b'-'
cdef bytes nil = b'$-1\r\n'
cdef bytes null_array = b'*-1\r\n'
cdef class RedisParser:
cdef object _protocolError
cdef object _responseError
cdef object _encoding
cdef object _inbuffer
cdef Task _current
def __cinit__(self, object perr, object rerr):
self._protocolError = perr
self._responseError = rerr
self._inbuffer = bytearray()
def on_connect(self, connection):
if connection.decode_responses:
self._encoding = connection.encoding
def on_disconnect(self):
pass
# DECODER
def get(self):
if self._current:
return self._resume(self._current, False)
else:
return self._get(None)
def feed(self, stream):
self._inbuffer.extend(stream)
def buffer(self):
return self._inbuffer
# CLIENT ENCODERS
def pack_command(self, args):
return b''.join(self._pack_command(args))
def pack_pipeline(self, commands):
pack = lambda *args: b''.join(self._pack_command(args))
return b''.join(starmap(pack, (args for args, _ in commands)))
# SERVER ENCODERS
def bulk(self, bytes value=None):
if value is None:
return nil
else:
return ('$%d\r\n' % len(value)).encode('utf-8') + value + b'\r\n'
def multi_bulk_len(self, len):
return ('*%s\r\n' % len).encode('utf-8')
def multi_bulk(self, args):
return null_array if args is None else b''.join(self._pack(args))
# INTERNALS
def _pack_command(self, args):
yield ('*%d\r\n' % len(args)).encode('utf-8')
for value in args:
if isinstance(value, str):
value = value.encode('utf-8')
elif not isinstance(value, bytes):
value = str(value).encode('utf-8')
yield ('$%d\r\n' % len(value)).encode('utf-8')
yield value
yield CRLF
def _pack(self, args):
yield ('*%d\r\n' % len(args)).encode('utf-8')
for value in args:
if value is None:
yield nil
elif isinstance(value, bytes):
yield ('$%d\r\n' % len(value)).encode('utf-8')
yield value
yield CRLF
elif isinstance(value, str):
value = value.encode('utf-8')
yield ('$%d\r\n' % len(value)).encode('utf-8')
yield value
yield CRLF
elif hasattr(value, 'items'):
for value in self._pack(tuple(self._lua_dict(value))):
yield value
elif hasattr(value, '__len__'):
for value in self._pack(value):
yield value
else:
value = str(value).encode('utf-8')
yield ('$%d\r\n' % len(value)).encode('utf-8')
yield value
yield CRLF
def _lua_dict(self, d):
index = 0
while True:
index += 1
v = d.get(index)
if v is None:
break
yield v
cdef object _get(self, Task next):
b = self._inbuffer
cdef int length = b.find(b'\r\n')
if length >= 0:
self._inbuffer, response = b[length+2:], bytes(b[:length])
rtype, response = response[:1], response[1:]
if rtype == RESPONSE_ERROR:
return self._responseError(response.decode('utf-8'))
elif rtype == RESPONSE_INTEGER:
return long(response)
elif rtype == RESPONSE_STATUS:
return response
elif rtype == RESPONSE_STRING:
task = Task(long(response), next)
return task.decode(self, False)
elif rtype == RESPONSE_ARRAY:
task = ArrayTask(long(response), next)
return task.decode(self, False)
else:
# Clear the buffer and raise
self._inbuffer = bytearray()
raise self._protocolError('Protocol Error')
else:
return False
cdef object _resume(self, Task task, object result):
result = task.decode(self, result)
if result is not False and task._next:
return self._resume(task._next, result)
else:
return result
@cython.internal
cdef class Task:
cdef long _length
cdef Task _next
def __cinit__(self, long length, Task next):
self._length = length
self._next = next
cdef object decode(self, RedisParser parser, object result):
cdef long length = self._length
cdef bytes chunk
parser._current = None
if length >= 0:
b = parser._inbuffer
if len(b) >= length+2:
parser._inbuffer, chunk = b[length+2:], bytes(b[:length])
if parser._encoding:
return chunk.decode(parser._encoding)
else:
return chunk
else:
parser._current = self
return False
@cython.internal
cdef class ArrayTask(Task):
cdef list _response
cdef object decode(self, RedisParser parser, object result):
cdef long length = self._length
cdef list response = self._response
parser._current = None
if response is None:
self._response = response = []
if length >= 0:
if result is not False:
response.append(result)
while len(response) < length:
result = parser._get(self)
if result is False:
break
response.append(result)
if len(response) == length:
parser._current = None
return response
elif not parser._current:
parser._current = self
return False