Rocketmq redis replicator implement Redis Replication protocol written in java. It can parse, filter, broadcast the RDB and AOF events in a real time manner and downstream these event to RocketMQ.
+-------+ PSNC +--------------+
| |<--------------| | event +--------------+
| Redis | | |------------->| |
| |-------------->|Rocketmq-redis| event | |
+-------+ data | (parse data) |------------->| Rocketmq |
| | event | |
| |------------->| |
+--------------+ +--------------+jdk 1.8+
maven-3.3.1+
redis 2.6 - 5.0.x
rocketmq 4.2.0 or higher
$mvn clean install package -Dmaven.test.skip=true
Configure configure = new Configure();
Replicator replicator = new RocketMQRedisReplicator(configure);
final RocketMQRedisProducer producer = new RocketMQRedisProducer(configure);
producer.open();
replicator.addEventListener(new EventListener() {
@Override public void onEvent(Replicator replicator, Event event) {
try {
if (!producer.send(event)) {
LOGGER.error("Failed to send Event");
}
} catch (Exception e) {
LOGGER.error("Failed to send Event", e);
}
}
});
replicator.addCloseListener(new CloseListener() {
@Override public void handle(Replicator replicator) {
producer.close();
}
});
replicator.open();mvn clean package -Dmaven.test.skipsh target/rocketmq-redis-pack/bin/start.sh
Configure configure = new Configure();
RocketMQRedisConsumer consumer = new RocketMQRedisConsumer(configure);
consumer.addEventListener(new EventListener() {
@Override public void onEvent(Event event) {
if (event instanceof PreRdbSyncEvent) {
// pre rdb sync
// your code goes here
} else if (event instanceof AuxField) {
// rdb aux field event
// your code goes here
} else if (event instanceof KeyValuePair) {
// rdb event
// your code goes here
} else if (event instanceof PostRdbSyncEvent) {
// post full sync
// your code goes here
} else if (event instanceof Command) {
// aof command event
// your code goes here
} else if (event instanceof PreCommandSyncEvent) {
// pre command sync
// your code goes here
} else if (event instanceof PostCommandSyncEvent) {
// post command sync
// your code goes here
}
}
});
consumer.open();The config file located at target/rocketmq-redis-pack/conf/replicator.conf
| parameter | default value | detail |
|---|---|---|
| rocketmq.nameserver.address | 127.0.0.1:9876 | rocketmq server address |
| rocketmq.producer.groupname | REDIS_REPLICATOR_PRODUCER_GROUP | rocketmq producer group name |
| rocketmq.consumer.groupname | REDIS_REPLICATOR_CONSUMER_GROUP | rocketmq consumer group name |
| rocketmq.data.topic | redisdata | rocketmq topic name |
| deploy.model | single | single or cluster |
| zookeeper.address | 127.0.0.1:2181 | run on cluster model |
| redis.uri | redis://127.0.0.1:6379 | the uri of redis master which replicate from |
By default the configuration file replicator.conf loaded from your classpath.
But you can specify your own configuration using Configure like following:
Properties properties = new Properties()
properties.setProperty("zookeeper.address", "127.0.0.1:2181");
properties.setProperty("redis.uri", "redis://127.0.0.1:6379");
properties.setProperty("rocketmq.nameserver.address", "localhost:9876");
properties.setProperty("rocketmq.producer.groupname", "REDIS_REPLICATOR_PRODUCER_GROUP");
properties.setProperty("rocketmq.consumer.groupname", "REDIS_REPLICATOR_CONSUMER_GROUP");
properties.setProperty("rocketmq.data.topic", "redisdata");
properties.setProperty("deploy.model", "single");
Configure configure = new Configure(properties);
| commands | commands | commands | commands | commands | commands |
|---|---|---|---|---|---|
| PING | APPEND | SET | SETEX | MSET | DEL |
| SADD | HMSET | HSET | LSET | EXPIRE | EXPIREAT |
| GETSET | HSETNX | MSETNX | PSETEX | SETNX | SETRANGE |
| HDEL | UNLINK | SREM | LPOP | LPUSH | LPUSHX |
| LRem | RPOP | RPUSH | RPUSHX | ZREM | ZINTERSTORE |
| INCR | DECR | INCRBY | PERSIST | SELECT | FLUSHALL |
| FLUSHDB | HINCRBY | ZINCRBY | MOVE | SMOVE | BRPOPLPUSH |
| PFCOUNT | PFMERGE | SDIFFSTORE | RENAMENX | PEXPIREAT | SINTERSTORE |
| ZADD | BITFIELD | SUNIONSTORE | RESTORE | LINSERT | ZREMRANGEBYLEX |
| GEOADD | PEXPIRE | ZUNIONSTORE | EVAL | SCRIPT | ZREMRANGEBYRANK |
| PUBLISH | BITOP | SETBIT | SWAPDB | PFADD | ZREMRANGEBYSCORE |
| RENAME | MULTI | EXEC | LTRIM | RPOPLPUSH | SORT |
| EVALSHA | ZPOPMAX | ZPOPMIN | XACK | XADD | XCLAIM |
| XDEL | XGROUP | XTRIM | XSETID |
- Adjust redis server setting like the following. more details please refer to redis.conf
client-output-buffer-limit slave 0 0 0WARNNING: this setting may run out of memory of redis server in some cases.
- If you are using log4j2, add logger like the following:
<Logger name="com.moilioncircle" level="info">
<AppenderRef ref="YourAppender"/>
</Logger> // redis uri
"redis://127.0.0.1:6379?verbose=yes" // redis uri
"redis://127.0.0.1:6379?authPassword=foobared"- Adjust redis server setting like the following
repl-backlog-size
repl-backlog-ttl
repl-ping-slave-periodsrepl-ping-slave-period MUST less than readTimeout, default readTimeout is 30 seconds