Back to home page

Redis cross reference

 
 

    


0001 /* anet.c -- Basic TCP socket stuff made a bit less boring
0002  *
0003  * Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com>
0004  * All rights reserved.
0005  *
0006  * Redistribution and use in source and binary forms, with or without
0007  * modification, are permitted provided that the following conditions are met:
0008  *
0009  *   * Redistributions of source code must retain the above copyright notice,
0010  *     this list of conditions and the following disclaimer.
0011  *   * Redistributions in binary form must reproduce the above copyright
0012  *     notice, this list of conditions and the following disclaimer in the
0013  *     documentation and/or other materials provided with the distribution.
0014  *   * Neither the name of Redis nor the names of its contributors may be used
0015  *     to endorse or promote products derived from this software without
0016  *     specific prior written permission.
0017  *
0018  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0019  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0020  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0021  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0022  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0023  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0024  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0025  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0026  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0027  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0028  * POSSIBILITY OF SUCH DAMAGE.
0029  */
0030 
0031 #include "fmacros.h"
0032 
0033 #include <sys/types.h>
0034 #include <sys/socket.h>
0035 #include <sys/stat.h>
0036 #include <sys/un.h>
0037 #include <netinet/in.h>
0038 #include <netinet/tcp.h>
0039 #include <arpa/inet.h>
0040 #include <unistd.h>
0041 #include <fcntl.h>
0042 #include <string.h>
0043 #include <netdb.h>
0044 #include <errno.h>
0045 #include <stdarg.h>
0046 #include <stdio.h>
0047 
0048 #include "anet.h"
0049 
0050 static void anetSetError(char *err, const char *fmt, ...)
0051 {
0052     va_list ap;
0053 
0054     if (!err) return;
0055     va_start(ap, fmt);
0056     vsnprintf(err, ANET_ERR_LEN, fmt, ap);
0057     va_end(ap);
0058 }
0059 
0060 int anetNonBlock(char *err, int fd)
0061 {
0062     int flags;
0063 
0064     /* Set the socket non-blocking.
0065      * Note that fcntl(2) for F_GETFL and F_SETFL can't be
0066      * interrupted by a signal. */
0067     if ((flags = fcntl(fd, F_GETFL)) == -1) {
0068         anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
0069         return ANET_ERR;
0070     }
0071     if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
0072         anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
0073         return ANET_ERR;
0074     }
0075     return ANET_OK;
0076 }
0077 
0078 /* Set TCP keep alive option to detect dead peers. The interval option
0079  * is only used for Linux as we are using Linux-specific APIs to set
0080  * the probe send time, interval, and count. */
0081 int anetKeepAlive(char *err, int fd, int interval)
0082 {
0083     int val = 1;
0084 
0085     if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1)
0086     {
0087         anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));
0088         return ANET_ERR;
0089     }
0090 
0091 #ifdef __linux__
0092     /* Default settings are more or less garbage, with the keepalive time
0093      * set to 7200 by default on Linux. Modify settings to make the feature
0094      * actually useful. */
0095 
0096     /* Send first probe after interval. */
0097     val = interval;
0098     if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) {
0099         anetSetError(err, "setsockopt TCP_KEEPIDLE: %s\n", strerror(errno));
0100         return ANET_ERR;
0101     }
0102 
0103     /* Send next probes after the specified interval. Note that we set the
0104      * delay as interval / 3, as we send three probes before detecting
0105      * an error (see the next setsockopt call). */
0106     val = interval/3;
0107     if (val == 0) val = 1;
0108     if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) {
0109         anetSetError(err, "setsockopt TCP_KEEPINTVL: %s\n", strerror(errno));
0110         return ANET_ERR;
0111     }
0112 
0113     /* Consider the socket in error state after three we send three ACK
0114      * probes without getting a reply. */
0115     val = 3;
0116     if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) {
0117         anetSetError(err, "setsockopt TCP_KEEPCNT: %s\n", strerror(errno));
0118         return ANET_ERR;
0119     }
0120 #endif
0121 
0122     return ANET_OK;
0123 }
0124 
0125 static int anetSetTcpNoDelay(char *err, int fd, int val)
0126 {
0127     if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1)
0128     {
0129         anetSetError(err, "setsockopt TCP_NODELAY: %s", strerror(errno));
0130         return ANET_ERR;
0131     }
0132     return ANET_OK;
0133 }
0134 
0135 int anetEnableTcpNoDelay(char *err, int fd)
0136 {
0137     return anetSetTcpNoDelay(err, fd, 1);
0138 }
0139 
0140 int anetDisableTcpNoDelay(char *err, int fd) 
0141 {
0142     return anetSetTcpNoDelay(err, fd, 0);
0143 }
0144 
0145 
0146 int anetSetSendBuffer(char *err, int fd, int buffsize)
0147 {
0148     if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffsize, sizeof(buffsize)) == -1)
0149     {
0150         anetSetError(err, "setsockopt SO_SNDBUF: %s", strerror(errno));
0151         return ANET_ERR;
0152     }
0153     return ANET_OK;
0154 }
0155 
0156 int anetTcpKeepAlive(char *err, int fd)
0157 {
0158     int yes = 1;
0159     if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)) == -1) {
0160         anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));
0161         return ANET_ERR;
0162     }
0163     return ANET_OK;
0164 }
0165 
0166 int anetResolve(char *err, char *host, char *ipbuf)
0167 {
0168     struct sockaddr_in sa;
0169 
0170     sa.sin_family = AF_INET;
0171     if (inet_aton(host, &sa.sin_addr) == 0) {
0172         struct hostent *he;
0173 
0174         he = gethostbyname(host);
0175         if (he == NULL) {
0176             anetSetError(err, "can't resolve: %s", host);
0177             return ANET_ERR;
0178         }
0179         memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
0180     }
0181     strcpy(ipbuf,inet_ntoa(sa.sin_addr));
0182     return ANET_OK;
0183 }
0184 
0185 static int anetCreateSocket(char *err, int domain) {
0186     int s, on = 1;
0187     if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {
0188         anetSetError(err, "creating socket: %s", strerror(errno));
0189         return ANET_ERR;
0190     }
0191 
0192     /* Make sure connection-intensive things like the redis benchmark
0193      * will be able to close/open sockets a zillion of times */
0194     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
0195         anetSetError(err, "setsockopt SO_REUSEADDR: %s", strerror(errno));
0196         return ANET_ERR;
0197     }
0198     return s;
0199 }
0200 
0201 #define ANET_CONNECT_NONE 0
0202 #define ANET_CONNECT_NONBLOCK 1
0203 static int anetTcpGenericConnect(char *err, char *addr, int port, int flags)
0204 {
0205     int s;
0206     struct sockaddr_in sa;
0207 
0208     if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)
0209         return ANET_ERR;
0210 
0211     sa.sin_family = AF_INET;
0212     sa.sin_port = htons(port);
0213     if (inet_aton(addr, &sa.sin_addr) == 0) {
0214         struct hostent *he;
0215 
0216         he = gethostbyname(addr);
0217         if (he == NULL) {
0218             anetSetError(err, "can't resolve: %s", addr);
0219             close(s);
0220             return ANET_ERR;
0221         }
0222         memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
0223     }
0224     if (flags & ANET_CONNECT_NONBLOCK) {
0225         if (anetNonBlock(err,s) != ANET_OK)
0226             return ANET_ERR;
0227     }
0228     if (connect(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) {
0229         if (errno == EINPROGRESS &&
0230             flags & ANET_CONNECT_NONBLOCK)
0231             return s;
0232 
0233         anetSetError(err, "connect: %s", strerror(errno));
0234         close(s);
0235         return ANET_ERR;
0236     }
0237     return s;
0238 }
0239 
0240 int anetTcpConnect(char *err, char *addr, int port)
0241 {
0242     return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONE);
0243 }
0244 
0245 int anetTcpNonBlockConnect(char *err, char *addr, int port)
0246 {
0247     return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK);
0248 }
0249 
0250 int anetUnixGenericConnect(char *err, char *path, int flags)
0251 {
0252     int s;
0253     struct sockaddr_un sa;
0254 
0255     if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)
0256         return ANET_ERR;
0257 
0258     sa.sun_family = AF_LOCAL;
0259     strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
0260     if (flags & ANET_CONNECT_NONBLOCK) {
0261         if (anetNonBlock(err,s) != ANET_OK)
0262             return ANET_ERR;
0263     }
0264     if (connect(s,(struct sockaddr*)&sa,sizeof(sa)) == -1) {
0265         if (errno == EINPROGRESS &&
0266             flags & ANET_CONNECT_NONBLOCK)
0267             return s;
0268 
0269         anetSetError(err, "connect: %s", strerror(errno));
0270         close(s);
0271         return ANET_ERR;
0272     }
0273     return s;
0274 }
0275 
0276 int anetUnixConnect(char *err, char *path)
0277 {
0278     return anetUnixGenericConnect(err,path,ANET_CONNECT_NONE);
0279 }
0280 
0281 int anetUnixNonBlockConnect(char *err, char *path)
0282 {
0283     return anetUnixGenericConnect(err,path,ANET_CONNECT_NONBLOCK);
0284 }
0285 
0286 /* Like read(2) but make sure 'count' is read before to return
0287  * (unless error or EOF condition is encountered) */
0288 int anetRead(int fd, char *buf, int count)
0289 {
0290     int nread, totlen = 0;
0291     while(totlen != count) {
0292         nread = read(fd,buf,count-totlen);
0293         if (nread == 0) return totlen;
0294         if (nread == -1) return -1;
0295         totlen += nread;
0296         buf += nread;
0297     }
0298     return totlen;
0299 }
0300 
0301 /* Like write(2) but make sure 'count' is read before to return
0302  * (unless error is encountered) */
0303 int anetWrite(int fd, char *buf, int count)
0304 {
0305     int nwritten, totlen = 0;
0306     while(totlen != count) {
0307         nwritten = write(fd,buf,count-totlen);
0308         if (nwritten == 0) return totlen;
0309         if (nwritten == -1) return -1;
0310         totlen += nwritten;
0311         buf += nwritten;
0312     }
0313     return totlen;
0314 }
0315 
0316 static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {
0317     if (bind(s,sa,len) == -1) {
0318         anetSetError(err, "bind: %s", strerror(errno));
0319         close(s);
0320         return ANET_ERR;
0321     }
0322 
0323     /* Use a backlog of 512 entries. We pass 511 to the listen() call because
0324      * the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
0325      * which will thus give us a backlog of 512 entries */
0326     if (listen(s, 511) == -1) {
0327         anetSetError(err, "listen: %s", strerror(errno));
0328         close(s);
0329         return ANET_ERR;
0330     }
0331     return ANET_OK;
0332 }
0333 
0334 int anetTcpServer(char *err, int port, char *bindaddr)
0335 {
0336     int s;
0337     struct sockaddr_in sa;
0338 
0339     if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)
0340         return ANET_ERR;
0341 
0342     memset(&sa,0,sizeof(sa));
0343     sa.sin_family = AF_INET;
0344     sa.sin_port = htons(port);
0345     sa.sin_addr.s_addr = htonl(INADDR_ANY);
0346     if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {
0347         anetSetError(err, "invalid bind address");
0348         close(s);
0349         return ANET_ERR;
0350     }
0351     if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)
0352         return ANET_ERR;
0353     return s;
0354 }
0355 
0356 int anetUnixServer(char *err, char *path, mode_t perm)
0357 {
0358     int s;
0359     struct sockaddr_un sa;
0360 
0361     if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)
0362         return ANET_ERR;
0363 
0364     memset(&sa,0,sizeof(sa));
0365     sa.sun_family = AF_LOCAL;
0366     strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
0367     if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)
0368         return ANET_ERR;
0369     if (perm)
0370         chmod(sa.sun_path, perm);
0371     return s;
0372 }
0373 
0374 static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
0375     int fd;
0376     while(1) {
0377         fd = accept(s,sa,len);
0378         if (fd == -1) {
0379             if (errno == EINTR)
0380                 continue;
0381             else {
0382                 anetSetError(err, "accept: %s", strerror(errno));
0383                 return ANET_ERR;
0384             }
0385         }
0386         break;
0387     }
0388     return fd;
0389 }
0390 
0391 int anetTcpAccept(char *err, int s, char *ip, int *port) {
0392     int fd;
0393     struct sockaddr_in sa;
0394     socklen_t salen = sizeof(sa);
0395     if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
0396         return ANET_ERR;
0397 
0398     if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
0399     if (port) *port = ntohs(sa.sin_port);
0400     return fd;
0401 }
0402 
0403 int anetUnixAccept(char *err, int s) {
0404     int fd;
0405     struct sockaddr_un sa;
0406     socklen_t salen = sizeof(sa);
0407     if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
0408         return ANET_ERR;
0409 
0410     return fd;
0411 }
0412 
0413 int anetPeerToString(int fd, char *ip, int *port) {
0414     struct sockaddr_in sa;
0415     socklen_t salen = sizeof(sa);
0416 
0417     if (getpeername(fd,(struct sockaddr*)&sa,&salen) == -1) {
0418         *port = 0;
0419         ip[0] = '?';
0420         ip[1] = '\0';
0421         return -1;
0422     }
0423     if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
0424     if (port) *port = ntohs(sa.sin_port);
0425     return 0;
0426 }
0427 
0428 int anetSockName(int fd, char *ip, int *port) {
0429     struct sockaddr_in sa;
0430     socklen_t salen = sizeof(sa);
0431 
0432     if (getsockname(fd,(struct sockaddr*)&sa,&salen) == -1) {
0433         *port = 0;
0434         ip[0] = '?';
0435         ip[1] = '\0';
0436         return -1;
0437     }
0438     if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
0439     if (port) *port = ntohs(sa.sin_port);
0440     return 0;
0441 }