为了实现冷热数据分离,热数据在内存,冷数据会置换到持久化存储,但是为了保证内存检索高效,会将所有key和频率统计信息保存在内存。冷数据的选择采用的选择算法和redis本身的数据淘汰选择算法一致,使用采样最优的方式。
为了将冷数据和热数据区分,方便冷数据采样,所以在每个db里增加了一个名为index的dict,用于保存key的index和频率统计信息。
换出策略
什么时候会触发冷数据的持久化来释放内存?需要根据应用场景定义不同的置换策略。
置换策略可以采用多种可配置方式:
- maxhotmemory
增加一项配置项,在内存数据达到阈值,触发数据强制置换,策略类似maxmemory删除策略,为了不影响原有数据淘汰逻辑,maxhotmemory要小于maxmemory,当达阈值后从db->dict采样key,根据访问的频率,选择最适合置换的key,然后持久化,逻辑如下:
- 将对于的redis object decode成rawstring,调用RocksDB PUT接口,将数据存储到磁盘;
- 将key和初始的频率访问信息添加到db->index;
- 将key从db->dict清理掉,释放内存。
- 冷热数据百分比
根据冷热数据比例,在内存使用率到一定比例的前提下,由定时任务周期性的选择合适的key进行持久化,定时任务只是生成置换任务,交给IO线程异步处理,IO操作不阻塞主线程。持久化逻辑同上。
冷数据加载
冷数据加载采用lazy的处理方式,在读写key之前,会先去db->index中进行查询,如果在index中,则阻塞客户端,将key加入阻塞任务中,异步IO线程进行数据加载。加载逻辑:
- 读取RockDB,生成redis object对象;
- 从db->index中删除key;
- 将新生成的value对象加入db->dict。
其中存在的问题就是当采用maxhotmemory策略,当usedmemory大于maxhotmemory,在进行写入操作会产生频繁的swap操作,所以还需要增加定时任务策略,尽量保证使usedmemory小于maxhotmemory; 当内存中全部存放的是key的index时,持续写入会使usedmemory达到maxmemory,触发数据淘汰逻辑,当配置了allkeys淘汰逻辑,server会按概率从dict或index中获取要淘汰的key。
RocksDB API
/*
* decode robj from key and value to rawstring
* call rocksdb_put to save key-value pair
*/
int rocksAdd(rocksdb_t *rdb, rocksdb_writeoptions_t *options, robj *key, robj *val) {
robj *dec;
char *err = NULL;
sds key_ptr, val_ptr;
dec = getDecodedObject(val);
key_ptr = key->ptr;
val_ptr = dec->ptr;
rocksdb_put(rdb, options, key_ptr, sdslen(key_ptr), val_ptr, sdslen(val_ptr), &err);
decrRefCount(dec);
if (err != NULL) {
serverLog(LL_WARNING, "rocksAdd failed, %s", err);
return C_ERR;
}
return C_OK;
}
/*
* decode robj from key to rawstring
* call rocksdb_get to get value from rocksdb of the key
* encode the value to robj
* */
robj *rocksGet(rocksdb_t *rdb, rocksdb_readoptions_t *options, robj *key) {
robj *val;
char *err = NULL;
size_t vallen;
sds key_ptr;
char *val_raw;
key_ptr = key->ptr;
val_raw = rocksdb_get(rdb, options, key_ptr, sdslen(key_ptr), &vallen, &err);
if (err != NULL) {
serverLog(LL_WARNING, "rocksGet failed, %s", err);
return NULL;
}
if (val_raw == NULL) {
// not found
return NULL;
} else {
val = createRawStringObject(val_raw, vallen);
// try encoding obj
val = tryObjectEncoding(val);
free(val_raw);
}
return val;
}
int rocksDel(rocksdb_t *rdb, rocksdb_writeoptions_t *options, robj *key) {
char *err = NULL;
sds key_ptr;
key_ptr = key->ptr;
rocksdb_delete(rdb, options, key_ptr, sdslen(key_ptr), &err);
if (err != NULL) {
serverLog(LL_WARNING, "rocksDel failed, %s", err);
return C_ERR;
}
return C_OK;
}
void rocksOpen() {
if (server.rdb) return;
server.r_options = rocksdb_options_create();
server.r_read_options = rocksdb_readoptions_create();
server.r_write_options = rocksdb_writeoptions_create();
// default config
.....
/* rocksdb block based options */
rocksdb_block_based_table_options_t *b_options = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_cache_index_and_filter_blocks(b_options, 0);
rocksdb_options_set_block_based_table_factory(server.r_options, b_options);
char *err = NULL;
server.rdb = rocksdb_open(server.r_options, "/tmp/rocksdb_redis", &err);
if (err != NULL) {
serverLog(LL_WARNING, "open rocksdb failed, %s", err);
}
}
void rocksClose() {
if (!server.r_options) {
rocksdb_options_destroy(server.r_options);
}
if (!server.r_read_options) {
rocksdb_readoptions_destroy(server.r_read_options);
}
if (!server.r_write_options) {
rocksdb_writeoptions_destroy(server.r_write_options);
}
if (!server.rdb) {
rocksdb_close(server.rdb);
}
}