Back to home page

Redis cross reference

 
 

    


0001 /*
0002  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
0003  * All rights reserved.
0004  *
0005  * Redistribution and use in source and binary forms, with or without
0006  * modification, are permitted provided that the following conditions are met:
0007  *
0008  *   * Redistributions of source code must retain the above copyright notice,
0009  *     this list of conditions and the following disclaimer.
0010  *   * Redistributions in binary form must reproduce the above copyright
0011  *     notice, this list of conditions and the following disclaimer in the
0012  *     documentation and/or other materials provided with the distribution.
0013  *   * Neither the name of Redis nor the names of its contributors may be used
0014  *     to endorse or promote products derived from this software without
0015  *     specific prior written permission.
0016  *
0017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0027  * POSSIBILITY OF SUCH DAMAGE.
0028  */
0029 
0030 #include "redis.h"
0031 
0032 /*-----------------------------------------------------------------------------
0033  * Pubsub low level API
0034  *----------------------------------------------------------------------------*/
0035 
0036 void freePubsubPattern(void *p) {
0037     pubsubPattern *pat = p;
0038 
0039     decrRefCount(pat->pattern);
0040     zfree(pat);
0041 }
0042 
0043 int listMatchPubsubPattern(void *a, void *b) {
0044     pubsubPattern *pa = a, *pb = b;
0045 
0046     return (pa->client == pb->client) &&
0047            (equalStringObjects(pa->pattern,pb->pattern));
0048 }
0049 
0050 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
0051  * 0 if the client was already subscribed to that channel. */
0052 int pubsubSubscribeChannel(redisClient *c, robj *channel) {
0053     struct dictEntry *de;
0054     list *clients = NULL;
0055     int retval = 0;
0056 
0057     /* Add the channel to the client -> channels hash table */
0058     if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
0059         retval = 1;
0060         incrRefCount(channel);
0061         /* Add the client to the channel -> list of clients hash table */
0062         de = dictFind(server.pubsub_channels,channel);
0063         if (de == NULL) {
0064             clients = listCreate();
0065             dictAdd(server.pubsub_channels,channel,clients);
0066             incrRefCount(channel);
0067         } else {
0068             clients = dictGetVal(de);
0069         }
0070         listAddNodeTail(clients,c);
0071     }
0072     /* Notify the client */
0073     addReply(c,shared.mbulkhdr[3]);
0074     addReply(c,shared.subscribebulk);
0075     addReplyBulk(c,channel);
0076     addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
0077     return retval;
0078 }
0079 
0080 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
0081  * 0 if the client was not subscribed to the specified channel. */
0082 int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
0083     struct dictEntry *de;
0084     list *clients;
0085     listNode *ln;
0086     int retval = 0;
0087 
0088     /* Remove the channel from the client -> channels hash table */
0089     incrRefCount(channel); /* channel may be just a pointer to the same object
0090                             we have in the hash tables. Protect it... */
0091     if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
0092         retval = 1;
0093         /* Remove the client from the channel -> clients list hash table */
0094         de = dictFind(server.pubsub_channels,channel);
0095         redisAssertWithInfo(c,NULL,de != NULL);
0096         clients = dictGetVal(de);
0097         ln = listSearchKey(clients,c);
0098         redisAssertWithInfo(c,NULL,ln != NULL);
0099         listDelNode(clients,ln);
0100         if (listLength(clients) == 0) {
0101             /* Free the list and associated hash entry at all if this was
0102              * the latest client, so that it will be possible to abuse
0103              * Redis PUBSUB creating millions of channels. */
0104             dictDelete(server.pubsub_channels,channel);
0105         }
0106     }
0107     /* Notify the client */
0108     if (notify) {
0109         addReply(c,shared.mbulkhdr[3]);
0110         addReply(c,shared.unsubscribebulk);
0111         addReplyBulk(c,channel);
0112         addReplyLongLong(c,dictSize(c->pubsub_channels)+
0113                        listLength(c->pubsub_patterns));
0114 
0115     }
0116     decrRefCount(channel); /* it is finally safe to release it */
0117     return retval;
0118 }
0119 
0120 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
0121 int pubsubSubscribePattern(redisClient *c, robj *pattern) {
0122     int retval = 0;
0123 
0124     if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
0125         retval = 1;
0126         pubsubPattern *pat;
0127         listAddNodeTail(c->pubsub_patterns,pattern);
0128         incrRefCount(pattern);
0129         pat = zmalloc(sizeof(*pat));
0130         pat->pattern = getDecodedObject(pattern);
0131         pat->client = c;
0132         listAddNodeTail(server.pubsub_patterns,pat);
0133     }
0134     /* Notify the client */
0135     addReply(c,shared.mbulkhdr[3]);
0136     addReply(c,shared.psubscribebulk);
0137     addReplyBulk(c,pattern);
0138     addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
0139     return retval;
0140 }
0141 
0142 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
0143  * 0 if the client was not subscribed to the specified channel. */
0144 int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
0145     listNode *ln;
0146     pubsubPattern pat;
0147     int retval = 0;
0148 
0149     incrRefCount(pattern); /* Protect the object. May be the same we remove */
0150     if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
0151         retval = 1;
0152         listDelNode(c->pubsub_patterns,ln);
0153         pat.client = c;
0154         pat.pattern = pattern;
0155         ln = listSearchKey(server.pubsub_patterns,&pat);
0156         listDelNode(server.pubsub_patterns,ln);
0157     }
0158     /* Notify the client */
0159     if (notify) {
0160         addReply(c,shared.mbulkhdr[3]);
0161         addReply(c,shared.punsubscribebulk);
0162         addReplyBulk(c,pattern);
0163         addReplyLongLong(c,dictSize(c->pubsub_channels)+
0164                        listLength(c->pubsub_patterns));
0165     }
0166     decrRefCount(pattern);
0167     return retval;
0168 }
0169 
0170 /* Unsubscribe from all the channels. Return the number of channels the
0171  * client was subscribed from. */
0172 int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
0173     dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
0174     dictEntry *de;
0175     int count = 0;
0176 
0177     while((de = dictNext(di)) != NULL) {
0178         robj *channel = dictGetKey(de);
0179 
0180         count += pubsubUnsubscribeChannel(c,channel,notify);
0181     }
0182     /* We were subscribed to nothing? Still reply to the client. */
0183     if (notify && count == 0) {
0184         addReply(c,shared.mbulkhdr[3]);
0185         addReply(c,shared.unsubscribebulk);
0186         addReply(c,shared.nullbulk);
0187         addReplyLongLong(c,dictSize(c->pubsub_channels)+
0188                        listLength(c->pubsub_patterns));
0189     }
0190     dictReleaseIterator(di);
0191     return count;
0192 }
0193 
0194 /* Unsubscribe from all the patterns. Return the number of patterns the
0195  * client was subscribed from. */
0196 int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
0197     listNode *ln;
0198     listIter li;
0199     int count = 0;
0200 
0201     listRewind(c->pubsub_patterns,&li);
0202     while ((ln = listNext(&li)) != NULL) {
0203         robj *pattern = ln->value;
0204 
0205         count += pubsubUnsubscribePattern(c,pattern,notify);
0206     }
0207     if (notify && count == 0) {
0208         /* We were subscribed to nothing? Still reply to the client. */
0209         addReply(c,shared.mbulkhdr[3]);
0210         addReply(c,shared.punsubscribebulk);
0211         addReply(c,shared.nullbulk);
0212         addReplyLongLong(c,dictSize(c->pubsub_channels)+
0213                        listLength(c->pubsub_patterns));
0214     }
0215     return count;
0216 }
0217 
0218 /* Publish a message */
0219 int pubsubPublishMessage(robj *channel, robj *message) {
0220     int receivers = 0;
0221     struct dictEntry *de;
0222     listNode *ln;
0223     listIter li;
0224 
0225     /* Send to clients listening for that channel */
0226     de = dictFind(server.pubsub_channels,channel);
0227     if (de) {
0228         list *list = dictGetVal(de);
0229         listNode *ln;
0230         listIter li;
0231 
0232         listRewind(list,&li);
0233         while ((ln = listNext(&li)) != NULL) {
0234             redisClient *c = ln->value;
0235 
0236             addReply(c,shared.mbulkhdr[3]);
0237             addReply(c,shared.messagebulk);
0238             addReplyBulk(c,channel);
0239             addReplyBulk(c,message);
0240             receivers++;
0241         }
0242     }
0243     /* Send to clients listening to matching channels */
0244     if (listLength(server.pubsub_patterns)) {
0245         listRewind(server.pubsub_patterns,&li);
0246         channel = getDecodedObject(channel);
0247         while ((ln = listNext(&li)) != NULL) {
0248             pubsubPattern *pat = ln->value;
0249 
0250             if (stringmatchlen((char*)pat->pattern->ptr,
0251                                 sdslen(pat->pattern->ptr),
0252                                 (char*)channel->ptr,
0253                                 sdslen(channel->ptr),0)) {
0254                 addReply(pat->client,shared.mbulkhdr[4]);
0255                 addReply(pat->client,shared.pmessagebulk);
0256                 addReplyBulk(pat->client,pat->pattern);
0257                 addReplyBulk(pat->client,channel);
0258                 addReplyBulk(pat->client,message);
0259                 receivers++;
0260             }
0261         }
0262         decrRefCount(channel);
0263     }
0264     return receivers;
0265 }
0266 
0267 /*-----------------------------------------------------------------------------
0268  * Pubsub commands implementation
0269  *----------------------------------------------------------------------------*/
0270 
0271 void subscribeCommand(redisClient *c) {
0272     int j;
0273 
0274     for (j = 1; j < c->argc; j++)
0275         pubsubSubscribeChannel(c,c->argv[j]);
0276 }
0277 
0278 void unsubscribeCommand(redisClient *c) {
0279     if (c->argc == 1) {
0280         pubsubUnsubscribeAllChannels(c,1);
0281     } else {
0282         int j;
0283 
0284         for (j = 1; j < c->argc; j++)
0285             pubsubUnsubscribeChannel(c,c->argv[j],1);
0286     }
0287 }
0288 
0289 void psubscribeCommand(redisClient *c) {
0290     int j;
0291 
0292     for (j = 1; j < c->argc; j++)
0293         pubsubSubscribePattern(c,c->argv[j]);
0294 }
0295 
0296 void punsubscribeCommand(redisClient *c) {
0297     if (c->argc == 1) {
0298         pubsubUnsubscribeAllPatterns(c,1);
0299     } else {
0300         int j;
0301 
0302         for (j = 1; j < c->argc; j++)
0303             pubsubUnsubscribePattern(c,c->argv[j],1);
0304     }
0305 }
0306 
0307 void publishCommand(redisClient *c) {
0308     int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
0309     addReplyLongLong(c,receivers);
0310 }