Back to home page

Redis cross reference

 
 

    


0001 #include "redis.h"
0002 #include "endianconv.h"
0003 
0004 /* -----------------------------------------------------------------------------
0005  * DUMP, RESTORE and MIGRATE commands
0006  * -------------------------------------------------------------------------- */
0007 
0008 /* Generates a DUMP-format representation of the object 'o', adding it to the
0009  * io stream pointed by 'rio'. This function can't fail. */
0010 void createDumpPayload(rio *payload, robj *o) {
0011     unsigned char buf[2];
0012     uint64_t crc;
0013 
0014     /* Serialize the object in a RDB-like format. It consist of an object type
0015      * byte followed by the serialized object. This is understood by RESTORE. */
0016     rioInitWithBuffer(payload,sdsempty());
0017     redisAssert(rdbSaveObjectType(payload,o));
0018     redisAssert(rdbSaveObject(payload,o));
0019 
0020     /* Write the footer, this is how it looks like:
0021      * ----------------+---------------------+---------------+
0022      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
0023      * ----------------+---------------------+---------------+
0024      * RDB version and CRC are both in little endian.
0025      */
0026 
0027     /* RDB version */
0028     buf[0] = REDIS_RDB_VERSION & 0xff;
0029     buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff;
0030     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
0031 
0032     /* CRC64 */
0033     crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
0034                 sdslen(payload->io.buffer.ptr));
0035     memrev64ifbe(&crc);
0036     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
0037 }
0038 
0039 /* Verify that the RDB version of the dump payload matches the one of this Redis
0040  * instance and that the checksum is ok.
0041  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
0042  * is returned. */
0043 int verifyDumpPayload(unsigned char *p, size_t len) {
0044     unsigned char *footer;
0045     uint16_t rdbver;
0046     uint64_t crc;
0047 
0048     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
0049     if (len < 10) return REDIS_ERR;
0050     footer = p+(len-10);
0051 
0052     /* Verify RDB version */
0053     rdbver = (footer[1] << 8) | footer[0];
0054     if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
0055 
0056     /* Verify CRC64 */
0057     crc = crc64(0,p,len-8);
0058     memrev64ifbe(&crc);
0059     return (memcmp(&crc,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
0060 }
0061 
0062 /* DUMP keyname
0063  * DUMP is actually not used by Redis Cluster but it is the obvious
0064  * complement of RESTORE and can be useful for different applications. */
0065 void dumpCommand(redisClient *c) {
0066     robj *o, *dumpobj;
0067     rio payload;
0068 
0069     /* Check if the key is here. */
0070     if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
0071         addReply(c,shared.nullbulk);
0072         return;
0073     }
0074 
0075     /* Create the DUMP encoded representation. */
0076     createDumpPayload(&payload,o);
0077 
0078     /* Transfer to the client */
0079     dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
0080     addReplyBulk(c,dumpobj);
0081     decrRefCount(dumpobj);
0082     return;
0083 }
0084 
0085 /* RESTORE key ttl serialized-value */
0086 void restoreCommand(redisClient *c) {
0087     long ttl;
0088     rio payload;
0089     int type;
0090     robj *obj;
0091 
0092     /* Make sure this key does not already exist here... */
0093     if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
0094         addReplyError(c,"Target key name is busy.");
0095         return;
0096     }
0097 
0098     /* Check if the TTL value makes sense */
0099     if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
0100         return;
0101     } else if (ttl < 0) {
0102         addReplyError(c,"Invalid TTL value, must be >= 0");
0103         return;
0104     }
0105 
0106     /* Verify RDB version and data checksum. */
0107     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) {
0108         addReplyError(c,"DUMP payload version or checksum are wrong");
0109         return;
0110     }
0111 
0112     rioInitWithBuffer(&payload,c->argv[3]->ptr);
0113     if (((type = rdbLoadObjectType(&payload)) == -1) ||
0114         ((obj = rdbLoadObject(type,&payload)) == NULL))
0115     {
0116         addReplyError(c,"Bad data format");
0117         return;
0118     }
0119 
0120     /* Create the key and set the TTL if any */
0121     dbAdd(c->db,c->argv[1],obj);
0122     if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
0123     signalModifiedKey(c->db,c->argv[1]);
0124     addReply(c,shared.ok);
0125     server.dirty++;
0126 }
0127 
0128 /* MIGRATE host port key dbid timeout */
0129 void migrateCommand(redisClient *c) {
0130     int fd;
0131     long timeout;
0132     long dbid;
0133     long long ttl = 0, expireat;
0134     robj *o;
0135     rio cmd, payload;
0136 
0137     /* Sanity check */
0138     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
0139         return;
0140     if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
0141         return;
0142     if (timeout <= 0) timeout = 1000;
0143 
0144     /* Check if the key is here. If not we reply with success as there is
0145      * nothing to migrate (for instance the key expired in the meantime), but
0146      * we include such information in the reply string. */
0147     if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
0148         addReplySds(c,sdsnew("+NOKEY\r\n"));
0149         return;
0150     }
0151     
0152     /* Connect */
0153     fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
0154                 atoi(c->argv[2]->ptr));
0155     if (fd == -1) {
0156         addReplyErrorFormat(c,"Can't connect to target node: %s",
0157             server.neterr);
0158         return;
0159     }
0160     if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
0161         close(fd);
0162         addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
0163         return;
0164     }
0165 
0166     /* Create RESTORE payload and generate the protocol to call the command. */
0167     rioInitWithBuffer(&cmd,sdsempty());
0168     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
0169     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
0170     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
0171 
0172     expireat = getExpire(c->db,c->argv[3]);
0173     if (expireat != -1) {
0174         ttl = expireat-mstime();
0175         if (ttl < 1) ttl = 1;
0176     }
0177     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
0178     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
0179     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
0180     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
0181     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
0182 
0183     /* Finally the last argument that is the serailized object payload
0184      * in the DUMP format. */
0185     createDumpPayload(&payload,o);
0186     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
0187                                 sdslen(payload.io.buffer.ptr)));
0188     sdsfree(payload.io.buffer.ptr);
0189 
0190     /* Tranfer the query to the other node in 64K chunks. */
0191     {
0192         sds buf = cmd.io.buffer.ptr;
0193         size_t pos = 0, towrite;
0194         int nwritten = 0;
0195 
0196         while ((towrite = sdslen(buf)-pos) > 0) {
0197             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
0198             nwritten = syncWrite(fd,buf+pos,towrite,timeout);
0199             if (nwritten != (signed)towrite) goto socket_wr_err;
0200             pos += nwritten;
0201         }
0202     }
0203 
0204     /* Read back the reply. */
0205     {
0206         char buf1[1024];
0207         char buf2[1024];
0208 
0209         /* Read the two replies */
0210         if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
0211             goto socket_rd_err;
0212         if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
0213             goto socket_rd_err;
0214         if (buf1[0] == '-' || buf2[0] == '-') {
0215             addReplyErrorFormat(c,"Target instance replied with error: %s",
0216                 (buf1[0] == '-') ? buf1+1 : buf2+1);
0217         } else {
0218             robj *aux;
0219 
0220             dbDelete(c->db,c->argv[3]);
0221             signalModifiedKey(c->db,c->argv[3]);
0222             addReply(c,shared.ok);
0223             server.dirty++;
0224 
0225             /* Translate MIGRATE as DEL for replication/AOF. */
0226             aux = createStringObject("DEL",3);
0227             rewriteClientCommandVector(c,2,aux,c->argv[3]);
0228             decrRefCount(aux);
0229         }
0230     }
0231 
0232     sdsfree(cmd.io.buffer.ptr);
0233     close(fd);
0234     return;
0235 
0236 socket_wr_err:
0237     addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
0238     sdsfree(cmd.io.buffer.ptr);
0239     close(fd);
0240     return;
0241 
0242 socket_rd_err:
0243     addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
0244     sdsfree(cmd.io.buffer.ptr);
0245     close(fd);
0246     return;
0247 }