Skip to content

Commit c30ba58

Browse files
committed
[split] finagle redis client from Tumblr
it's currently unpublished, but fully functional.
1 parent ce6fd00 commit c30ba58

File tree

16 files changed

+3077
-0
lines changed

16 files changed

+3077
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.twitter.finagle.redis
2+
3+
case class ServerError(message: String) extends Exception(message)
4+
case class ClientError(message: String) extends Exception(message)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.twitter.finagle.redis
2+
3+
import protocol.{Command, CommandCodec, Reply, ReplyCodec}
4+
5+
import com.twitter.finagle.{Codec, CodecFactory, Service}
6+
import com.twitter.finagle.tracing.ClientRequestTracingFilter
7+
import com.twitter.naggati.{Codec => NaggatiCodec}
8+
import com.twitter.util.Future
9+
import org.jboss.netty.channel.{ChannelPipelineFactory, Channels}
10+
11+
object Redis {
12+
def apply() = new Redis
13+
def get() = apply()
14+
}
15+
16+
class Redis extends CodecFactory[Command, Reply] {
17+
def server = Function.const {
18+
new Codec[Command, Reply] {
19+
def pipelineFactory = new ChannelPipelineFactory {
20+
def getPipeline() = {
21+
val pipeline = Channels.pipeline()
22+
val commandCodec = new CommandCodec
23+
val replyCodec = new ReplyCodec
24+
25+
pipeline.addLast("codec", new NaggatiCodec(commandCodec.decode, replyCodec.encode))
26+
27+
pipeline
28+
}
29+
}
30+
}
31+
}
32+
33+
def client = Function.const {
34+
new Codec[Command, Reply] {
35+
36+
def pipelineFactory = new ChannelPipelineFactory {
37+
def getPipeline() = {
38+
val pipeline = Channels.pipeline()
39+
val commandCodec = new CommandCodec
40+
val replyCodec = new ReplyCodec
41+
42+
pipeline.addLast("codec", new NaggatiCodec(replyCodec.decode, commandCodec.encode))
43+
44+
pipeline
45+
}
46+
}
47+
48+
override def prepareService(underlying: Service[Command, Reply]) = {
49+
Future.value((new RedisTracingFilter()) andThen underlying)
50+
}
51+
52+
}
53+
}
54+
}
55+
56+
private class RedisTracingFilter extends ClientRequestTracingFilter[Command, Reply] {
57+
val serviceName = "redis"
58+
def methodName(req: Command): String = req.getClass().getSimpleName()
59+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.twitter.finagle.redis
2+
package protocol
3+
4+
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
5+
import util.StringToChannelBuffer
6+
import scala.collection.immutable.WrappedString
7+
8+
private[redis] object RedisCodec {
9+
object NilValue extends WrappedString("nil") {
10+
def getBytes(charset: String = "UTF-8") = Array[Byte]()
11+
def getBytes = Array[Byte]()
12+
}
13+
14+
val STATUS_REPLY = '+'
15+
val ERROR_REPLY = '-'
16+
val INTEGER_REPLY = ':'
17+
val BULK_REPLY = '$'
18+
val MBULK_REPLY = '*'
19+
20+
val ARG_COUNT_MARKER = '*'
21+
val ARG_SIZE_MARKER = '$'
22+
23+
val TOKEN_DELIMITER = ' '
24+
val EOL_DELIMITER = "\r\n"
25+
26+
val NIL_VALUE = NilValue
27+
val NIL_VALUE_BA = NilValue.getBytes
28+
29+
def toUnifiedFormat(args: List[Array[Byte]], includeHeader: Boolean = true) = {
30+
val buffer = ChannelBuffers.dynamicBuffer()
31+
includeHeader match {
32+
case true =>
33+
val argHeader = "%c%d%s".format(ARG_COUNT_MARKER, args.length, EOL_DELIMITER)
34+
buffer.writeBytes(argHeader.getBytes)
35+
case false =>
36+
}
37+
args.foreach { arg =>
38+
if (arg.length == 0) {
39+
buffer.writeBytes("%c-1%s".format(ARG_SIZE_MARKER, EOL_DELIMITER).getBytes)
40+
} else {
41+
val sizeHeader = "%c%d%s".format(ARG_SIZE_MARKER, arg.length, EOL_DELIMITER)
42+
buffer.writeBytes(sizeHeader.getBytes)
43+
buffer.writeBytes(arg)
44+
buffer.writeBytes(EOL_DELIMITER.getBytes)
45+
}
46+
}
47+
buffer
48+
}
49+
def toInlineFormat(args: List[String]) = {
50+
StringToChannelBuffer(args.mkString(TOKEN_DELIMITER.toString) + EOL_DELIMITER)
51+
}
52+
}
53+
abstract class RedisMessage {
54+
def toChannelBuffer: ChannelBuffer
55+
def toByteArray: Array[Byte] = toChannelBuffer.array
56+
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package com.twitter.finagle.redis
2+
package protocol
3+
4+
import util._
5+
6+
object RequireClientProtocol extends ErrorConversion {
7+
override def getException(msg: String) = new ClientError(msg)
8+
}
9+
10+
abstract class Command extends RedisMessage
11+
12+
object Commands {
13+
// Key Commands
14+
val DEL = "DEL"
15+
val EXISTS = "EXISTS"
16+
val EXPIRE = "EXPIRE"
17+
val EXPIREAT = "EXPIREAT"
18+
val KEYS = "KEYS"
19+
val PERSIST = "PERSIST"
20+
val RANDOMKEY = "RANDOMKEY"
21+
val RENAME = "RENAME"
22+
val RENAMENX = "RENAMENX"
23+
val TTL = "TTL"
24+
val TYPE = "TYPE"
25+
26+
// String Commands
27+
val APPEND = "APPEND"
28+
val DECR = "DECR"
29+
val DECRBY = "DECRBY"
30+
val GET = "GET"
31+
val GETBIT = "GETBIT"
32+
val GETRANGE = "GETRANGE"
33+
val GETSET = "GETSET"
34+
val INCR = "INCR"
35+
val INCRBY = "INCRBY"
36+
val MGET = "MGET"
37+
val MSET = "MSET"
38+
val MSETNX = "MSETNX"
39+
val SET = "SET"
40+
val SETBIT = "SETBIT"
41+
val SETEX = "SETEX"
42+
val SETNX = "SETNX"
43+
val SETRANGE = "SETRANGE"
44+
val STRLEN = "STRLEN"
45+
46+
// Sorted Sets
47+
val ZADD = "ZADD"
48+
val ZCARD = "ZCARD"
49+
val ZCOUNT = "ZCOUNT"
50+
val ZINCRBY = "ZINCRBY"
51+
val ZINTERSTORE = "ZINTERSTORE"
52+
val ZRANGE = "ZRANGE"
53+
val ZRANGEBYSCORE = "ZRANGEBYSCORE"
54+
val ZRANK = "ZRANK"
55+
val ZREM = "ZREM"
56+
val ZREMRANGEBYRANK = "ZREMRANGEBYRANK"
57+
val ZREMRANGEBYSCORE = "ZREMRANGEBYSCORE"
58+
val ZREVRANGE = "ZREVRANGE"
59+
val ZREVRANGEBYSCORE = "ZREVRANGEBYSCORE"
60+
val ZREVRANK = "ZREVRANK"
61+
val ZSCORE = "ZSCORE"
62+
val ZUNIONSTORE = "ZUNIONSTORE"
63+
64+
val commandMap: Map[String,Function1[List[Array[Byte]],Command]] = Map(
65+
// key commands
66+
DEL -> {args => Del(BytesToString.fromList(args))},
67+
EXISTS -> {Exists(_)},
68+
EXPIRE -> {Expire(_)},
69+
EXPIREAT -> {ExpireAt(_)},
70+
KEYS -> {Keys(_)},
71+
PERSIST -> {Persist(_)},
72+
RANDOMKEY -> {args => Randomkey()},
73+
RENAME -> {Rename(_)},
74+
RENAMENX -> {RenameNx(_)},
75+
TTL -> {Ttl(_)},
76+
TYPE -> {Type(_)},
77+
78+
// string commands
79+
APPEND -> {Append(_)},
80+
DECR -> {Decr(_)},
81+
DECRBY -> {DecrBy(_)},
82+
GET -> {Get(_)},
83+
GETBIT -> {GetBit(_)},
84+
GETRANGE -> {GetRange(_)},
85+
GETSET -> {GetSet(_)},
86+
INCR -> {Incr(_)},
87+
INCRBY -> {IncrBy(_)},
88+
MGET -> {args => MGet(BytesToString.fromList(args))},
89+
MSET -> {MSet(_)},
90+
MSETNX -> {MSetNx(_)},
91+
SET -> {Set(_)},
92+
SETBIT -> {SetBit(_)},
93+
SETEX -> {SetEx(_)},
94+
SETNX -> {SetNx(_)},
95+
SETRANGE -> {SetRange(_)},
96+
STRLEN -> {Strlen(_)},
97+
98+
// sorted sets
99+
ZADD -> {ZAdd(_)},
100+
ZCARD -> {ZCard(_)},
101+
ZCOUNT -> {ZCount(_)},
102+
ZINCRBY -> {ZIncrBy(_)},
103+
ZINTERSTORE -> {ZInterStore(_)},
104+
ZRANGE -> {ZRange(_)},
105+
ZRANGEBYSCORE -> {ZRangeByScore(_)},
106+
ZRANK -> {ZRank(_)},
107+
ZREM -> {ZRem(_)},
108+
ZREMRANGEBYRANK -> {ZRemRangeByRank(_)},
109+
ZREMRANGEBYSCORE -> {ZRemRangeByScore(_)},
110+
ZREVRANGE -> {ZRevRange(_)},
111+
ZREVRANGEBYSCORE -> {ZRevRangeByScore(_)},
112+
ZREVRANK -> {ZRevRank(_)},
113+
ZSCORE -> {ZScore(_)},
114+
ZUNIONSTORE -> {ZUnionStore(_)}
115+
)
116+
117+
def doMatch(cmd: String, args: List[Array[Byte]]) = commandMap.get(cmd).map {
118+
_(args)
119+
}.getOrElse(throw ClientError("Unsupported command: " + cmd))
120+
121+
def trimList(list: List[Array[Byte]], count: Int, from: String = "") = {
122+
RequireClientProtocol(list != null, "%s Empty list found".format(from))
123+
RequireClientProtocol(
124+
list.length == count,
125+
"%s Expected %d elements, found %d".format(from, count, list.length))
126+
val newList = list.take(count)
127+
newList.foreach { item => RequireClientProtocol(item != null, "Found empty item in list") }
128+
newList
129+
}
130+
}
131+
132+
class CommandCodec extends UnifiedProtocolCodec {
133+
import com.twitter.naggati.{Emit, Encoder, NextStep}
134+
import com.twitter.naggati.Stages._
135+
import RedisCodec._
136+
import com.twitter.logging.Logger
137+
138+
val log = Logger(getClass)
139+
140+
val decode = readBytes(1) { bytes =>
141+
bytes(0) match {
142+
case ARG_COUNT_MARKER =>
143+
val doneFn = { lines => commandDecode(lines) }
144+
RequireClientProtocol.safe {
145+
readLine { line => decodeUnifiedFormat(NumberFormat.toLong(line), doneFn) }
146+
}
147+
case b: Byte =>
148+
decodeInlineRequest(b.asInstanceOf[Char])
149+
}
150+
}
151+
152+
val encode = new Encoder[Command] {
153+
def encode(obj: Command) = Some(obj.toChannelBuffer)
154+
}
155+
156+
def decodeInlineRequest(c: Char) = readLine { line =>
157+
val listOfArrays = (c + line).split(' ').toList.map { args => args.getBytes("UTF-8") }
158+
val cmd = commandDecode(listOfArrays)
159+
emit(cmd)
160+
}
161+
162+
def commandDecode(lines: List[Array[Byte]]): Command = {
163+
RequireClientProtocol(lines != null && lines.length > 0, "Invalid client command protocol")
164+
val cmd = new String(lines.head)
165+
val args = lines.tail
166+
try {
167+
Commands.doMatch(cmd, args)
168+
} catch {
169+
case e: ClientError => throw e
170+
case t: Throwable =>
171+
log.warning(t, "Unhandled exception %s(%s)".format(t.getClass.toString, t.getMessage))
172+
throw new ClientError(t.getMessage)
173+
}
174+
}
175+
176+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.twitter.finagle.redis
2+
package protocol
3+
4+
import util._
5+
import RedisCodec._
6+
7+
import com.twitter.naggati.{Emit, Encoder, NextStep, ProtocolError}
8+
import com.twitter.naggati.Stages._
9+
10+
trait UnifiedProtocolCodec {
11+
12+
type ByteArrays = List[Array[Byte]]
13+
14+
def decodeUnifiedFormat[T <: AnyRef](argCount: Long, doneFn: ByteArrays => T) =
15+
argCount match {
16+
case n if n < 0 => throw new ProtocolError("Invalid argument count specified")
17+
case n => decodeRequestLines(n, Nil, { lines => doneFn(lines) } )
18+
}
19+
20+
def decodeRequestLines[T <: AnyRef](
21+
i: Long,
22+
lines: ByteArrays,
23+
doneFn: ByteArrays => T): NextStep =
24+
{
25+
if (i <= 0) {
26+
emit(doneFn(lines.reverse))
27+
} else {
28+
readLine { line =>
29+
val header = line(0)
30+
header match {
31+
case ARG_SIZE_MARKER =>
32+
val size = NumberFormat.toInt(line.drop(1))
33+
if (size < 1) {
34+
decodeRequestLines(i - 1, lines.+:(RedisCodec.NIL_VALUE_BA), doneFn)
35+
} else {
36+
readBytes(size) { byteArray =>
37+
readBytes(2) { eol =>
38+
if (eol(0) != '\r' || eol(1) != '\n') {
39+
throw new ProtocolError("Expected EOL after line data and didn't find it")
40+
}
41+
decodeRequestLines(i - 1, lines.+:(byteArray), doneFn)
42+
}
43+
}
44+
}
45+
case b: Char =>
46+
throw new ProtocolError("Expected size marker $, got " + b)
47+
} // header match
48+
} // readLine
49+
} // else
50+
} // decodeRequestLines
51+
}

0 commit comments

Comments
 (0)