diff --git a/src/rmt_redis.c b/src/rmt_redis.c index dfc58e6..0e2aa23 100644 --- a/src/rmt_redis.c +++ b/src/rmt_redis.c @@ -66,6 +66,7 @@ #define REDIS_RDB_TYPE_SET 2 #define REDIS_RDB_TYPE_ZSET 3 #define REDIS_RDB_TYPE_HASH 4 +#define REDIS_RDB_TYPE_ZSET2 5 /* Object types for encoded objects. */ #define REDIS_RDB_TYPE_HASH_ZIPMAP 9 @@ -2989,7 +2990,6 @@ redis_parse_req(struct msg *r) if (str6icmp(m, 's', 'e', 'l', 'e', 'c', 't')) { r->type = MSG_REQ_REDIS_SELECT; - r->noforward = 1; break; } @@ -5469,6 +5469,7 @@ struct msg *redis_generate_msg_with_key_value(rmtContext *ctx, mbuf_base *mb, } msg_owner->nfrag = 0; + log_notice("sub_msg_count is: %u",sub_msg_count); msg_owner->frag_seq = rmt_alloc(sub_msg_count * sizeof(msg)); if(msg_owner->frag_seq == NULL){ goto enomem; @@ -5525,6 +5526,7 @@ struct msg *redis_generate_msg_with_key_value(rmtContext *ctx, mbuf_base *mb, msg->type = MSG_REQ_REDIS_SADD; break; case REDIS_ZSET: + case REDIS_ZSET2: ret = redis_msg_append_bulk_full(msg, REDIS_INSERT_ZSET, rmt_strlen(REDIS_INSERT_ZSET)); msg->type = MSG_REQ_REDIS_ZADD; @@ -5832,6 +5834,12 @@ static sds redis_rdb_file_load_double_str(redis_rdb *rdb) { } } +static sds redis_rdb_file_load_binary_double_str(redis_rdb *rdb) { + char buf[256]; + if (redis_rdb_file_read(rdb,buf,8) != RMT_OK) return NULL; + return sdsnewlen(buf, 8); +} + static sds redis_rdb_file_load_lzf_str(redis_rdb *rdb) { unsigned int len, clen; unsigned char *c = NULL; @@ -5939,7 +5947,7 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype) str = array_push(value); if ((*str = redis_rdb_file_load_str(rdb)) == NULL) goto error; } - }else if (rdbtype == REDIS_RDB_TYPE_ZSET) { + }else if (rdbtype == REDIS_RDB_TYPE_ZSET || rdbtype == REDIS_RDB_TYPE_ZSET2) { if ((len = redis_rdb_file_load_len(rdb,NULL)) == REDIS_RDB_LENERR) goto error; value = redis_value_create((uint32_t)(2*len)); @@ -5950,9 +5958,17 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype) while(len--) { if ((elem1 = redis_rdb_file_load_str(rdb)) == NULL) goto error; - if ((elem2 = redis_rdb_file_load_double_str(rdb)) == NULL) { - sdsfree(elem1); - goto error; + if (rdbtype == REDIS_RDB_TYPE_ZSET) { + if ((elem2 = redis_rdb_file_load_double_str(rdb)) == NULL) { + sdsfree(elem1); + goto error; + } + } else { + if ((elem2 = redis_rdb_file_load_binary_double_str(rdb)) == NULL) { + log_error("the error key name: %s\n", elem1); + sdsfree(elem1); + goto error; + } } str = array_push(value); @@ -6196,6 +6212,10 @@ static int redis_object_type_get_by_rdbtype(int dbtype) return REDIS_STRING; break; + case REDIS_RDB_TYPE_ZSET2: + + return REDIS_ZSET2; + break; case REDIS_RDB_TYPE_LIST: case REDIS_RDB_TYPE_LIST_ZIPLIST: case REDIS_RDB_TYPE_LIST_QUICKLIST: @@ -6382,6 +6402,149 @@ int redis_key_value_send(redis_node *srnode, sds key, return -1; } +int redis_select_cmd_send(redis_node *srnode, const char *dbid, void *data){ + int ret; + rmtContext *ctx = srnode->ctx; + redis_group *srgroup = srnode->owner; + mbuf_base *mb = srgroup->mb; + redis_group *trgroup = data; + redis_node *trnode; + redis_node **rnode; + struct msg *msg = NULL; + uint32_t i; + int mbuf_count = 0; + + //get target node + rnode = array_get(trgroup->route, 0); + trnode = *rnode; + if (trnode == NULL){ + log_error("ERROR: [cmd] backend node is NULL"); + goto error; + } + msg = redis_generate_select_cmd_msg(ctx, mb, dbid); + if (msg == NULL){ + log_error("ERROR: generate msg for select cmd error"); + goto error; + } + if (msg->frag_seq == NULL){ + mbuf_count += listLength(msg->data); + ret = prepare_send_msg(srnode, msg, trnode); + if (ret != RMT_OK){ + log_error("ERROR1: prepare send select msg to node[%s] failed",trnode->addr); + goto error; + } + msg = NULL; + } + else{ + for (i = 0; i < msg->nfrag; i++){ + mbuf_count += listLength(msg->frag_seq[i]->data); + ret = prepare_send_msg(srnode, msg->frag_seq[i], trnode); + if (ret != RMT_OK){ + log_error("ERROR2: prepare send select cmd to node[%s] failed",trnode->addr); + goto error; + } + msg->frag_seq[i] = NULL; + } + msg_put(msg); + msg_free(msg); + msg = NULL; + } + return mbuf_count; + +error: + + if (msg != NULL){ + if (msg->frag_seq != NULL){ + for (i = 0; i < msg->nfrag; i++){ + if (msg->frag_seq[i] != NULL){ + msg_put(msg->frag_seq[i]); + msg_free(msg->frag_seq[i]); + msg->frag_seq[i] = NULL; + } + } + } + msg_put(msg); + msg_free(msg); + } + return -1; +} + +struct msg *redis_generate_select_cmd_msg(rmtContext *ctx, mbuf_base *mb, const char *dbid){ + int ret; + struct msg *msg, *msg_owner; + uint32_t sub_msg_count = 0; + uint32_t field_count, i; + + msg = NULL; + msg_owner = NULL; + /*msg_owner = msg_get(mb, 1, REDIS_DATA_TYPE_CMD); + if (msg_owner == NULL){ + goto enomem; + } + msg_owner->nfrag = 0; + msg_owner->frag_seq = rmt_alloc()*/ +next: + msg = msg_get(mb, 1, REDIS_DATA_TYPE_RDB); + if (msg == NULL){ + goto enomem; + } + field_count = 2; + //add *2 reference aof protocol + ret = redis_msg_append_multi_bulk_len_full(msg, field_count); + if (ret != RMT_OK){ + log_error("[cmd] cmd msg append bulk len error"); + if (ret == RMT_ENOMEM){ + goto enomem; + } + goto error; + } + // append select command to bulk. + ret = redis_msg_append_bulk_full(msg, "SELECT", rmt_strlen("SELECT")); + msg->type = MSG_REQ_REDIS_SELECT; + if (ret != RMT_OK){ + log_error("[cmd] cmd msg append select error: %s",msg_type_string(msg->type)); + if (ret == RMT_ENOMEM){ + goto enomem; + } + goto error; + } + // append dbid to bulk, dbid type must be a string point + ret = redis_msg_append_bulk_full(msg, dbid, rmt_strlen(dbid)); + if (ret != RMT_OK){ + log_error("[cmd] cmd msg append dbid error"); + if (ret == RMT_ENOMEM){ + goto enomem; + } + goto error; + } + // set reply info + if (ctx->noreply){ + msg->noreply = 1; + } + if (msg_owner == NULL){ + return msg; + } + +enomem: + log_error("ERROR: out of memory"); + +error: + if (msg != NULL){ + msg_put(msg); + msg_free(msg); + } + + if (msg_owner != NULL){ + for (i = 0; i < msg_owner->nfrag; i++){ + msg_put(msg_owner->frag_seq[i]); + msg_free(msg_owner->frag_seq[i]); + } + msg_put(msg_owner); + msg_free(msg_owner); + } + return NULL; +} + int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) { int ret; @@ -6441,7 +6604,7 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) } rdb->rdbver = rmt_atoi(buf+len, 4); - if (rdb->rdbver < 1 || rdb->rdbver > REDIS_RDB_VERSION) { + if (rdb->rdbver < 1) { log_error("ERROR: Can't handle RDB format version %d", rdb->fname, rdb->rdbver); goto error; @@ -6501,6 +6664,17 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) rdb->fname); goto eoferr; } + else{ + log_notice("fetch select db:[%u]",dbid); + char db[5]; + sprintf(db,"%d",dbid); + ret = redis_select_cmd_send(srnode, db, trgroup); + if (ret < 0){ + log_notice("send %s failed",db); + break; + } + mbuf_count += ret; + } log_debug(LOG_INFO, "dbid: %d", dbid); continue; diff --git a/src/rmt_redis.h b/src/rmt_redis.h index a69bc43..8181d4a 100644 --- a/src/rmt_redis.h +++ b/src/rmt_redis.h @@ -7,6 +7,7 @@ #define REDIS_SET 2 #define REDIS_ZSET 3 #define REDIS_HASH 4 +#define REDIS_ZSET2 5 #define REDIS_REPLY_STATUS_OK "+OK\r\n" #define REDIS_REPLY_STATUS_PONG "+PONG\r\n" @@ -255,6 +256,8 @@ void redis_value_destroy(struct array *value); char *get_redis_type_string(int type); struct array *get_multi_bulk_array_from_mbuf_list(list *mbufs); +struct msg *redis_generate_select_cmd_msg(struct rmtContext *ctx, mbuf_base *mb, const char *dbid); +int redis_select_cmd_send(redis_node *srnode, const char *dbid, void *data); #endif