Back to home page

Redis cross reference

 
 

    


0001 /* ae.c module for illumos event ports.
0002  *
0003  * Copyright (c) 2012, Joyent, Inc. All rights reserved.
0004  *
0005  * Redistribution and use in source and binary forms, with or without
0006  * modification, are permitted provided that the following conditions are met:
0007  *
0008  *   * Redistributions of source code must retain the above copyright notice,
0009  *     this list of conditions and the following disclaimer.
0010  *   * Redistributions in binary form must reproduce the above copyright
0011  *     notice, this list of conditions and the following disclaimer in the
0012  *     documentation and/or other materials provided with the distribution.
0013  *   * Neither the name of Redis nor the names of its contributors may be used
0014  *     to endorse or promote products derived from this software without
0015  *     specific prior written permission.
0016  *
0017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0027  * POSSIBILITY OF SUCH DAMAGE.
0028  */
0029 
0030 
0031 #include <assert.h>
0032 #include <errno.h>
0033 #include <port.h>
0034 #include <poll.h>
0035 
0036 #include <sys/types.h>
0037 #include <sys/time.h>
0038 
0039 #include <stdio.h>
0040 
0041 static int evport_debug = 0;
0042 
0043 /*
0044  * This file implements the ae API using event ports, present on Solaris-based
0045  * systems since Solaris 10.  Using the event port interface, we associate file
0046  * descriptors with the port.  Each association also includes the set of poll(2)
0047  * events that the consumer is interested in (e.g., POLLIN and POLLOUT).
0048  *
0049  * There's one tricky piece to this implementation: when we return events via
0050  * aeApiPoll, the corresponding file descriptors become dissociated from the
0051  * port.  This is necessary because poll events are level-triggered, so if the
0052  * fd didn't become dissociated, it would immediately fire another event since
0053  * the underlying state hasn't changed yet.  We must re-associate the file
0054  * descriptor, but only after we know that our caller has actually read from it.
0055  * The ae API does not tell us exactly when that happens, but we do know that
0056  * it must happen by the time aeApiPoll is called again.  Our solution is to
0057  * keep track of the last fds returned by aeApiPoll and re-associate them next
0058  * time aeApiPoll is invoked.
0059  *
0060  * To summarize, in this module, each fd association is EITHER (a) represented
0061  * only via the in-kernel association OR (b) represented by pending_fds and
0062  * pending_masks.  (b) is only true for the last fds we returned from aeApiPoll,
0063  * and only until we enter aeApiPoll again (at which point we restore the
0064  * in-kernel association).
0065  */
0066 #define MAX_EVENT_BATCHSZ 512
0067 
0068 typedef struct aeApiState {
0069     int     portfd;                             /* event port */
0070     int     npending;                           /* # of pending fds */
0071     int     pending_fds[MAX_EVENT_BATCHSZ];     /* pending fds */
0072     int     pending_masks[MAX_EVENT_BATCHSZ];   /* pending fds' masks */
0073 } aeApiState;
0074 
0075 static int aeApiCreate(aeEventLoop *eventLoop) {
0076     int i;
0077     aeApiState *state = zmalloc(sizeof(aeApiState));
0078     if (!state) return -1;
0079 
0080     state->portfd = port_create();
0081     if (state->portfd == -1) {
0082         zfree(state);
0083         return -1;
0084     }
0085 
0086     state->npending = 0;
0087 
0088     for (i = 0; i < MAX_EVENT_BATCHSZ; i++) {
0089         state->pending_fds[i] = -1;
0090         state->pending_masks[i] = AE_NONE;
0091     }
0092 
0093     eventLoop->apidata = state;
0094     return 0;
0095 }
0096 
0097 static void aeApiFree(aeEventLoop *eventLoop) {
0098     aeApiState *state = eventLoop->apidata;
0099 
0100     close(state->portfd);
0101     zfree(state);
0102 }
0103 
0104 static int aeApiLookupPending(aeApiState *state, int fd) {
0105     int i;
0106 
0107     for (i = 0; i < state->npending; i++) {
0108         if (state->pending_fds[i] == fd)
0109             return (i);
0110     }
0111 
0112     return (-1);
0113 }
0114 
0115 /*
0116  * Helper function to invoke port_associate for the given fd and mask.
0117  */
0118 static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
0119     int events = 0;
0120     int rv, err;
0121 
0122     if (mask & AE_READABLE)
0123         events |= POLLIN;
0124     if (mask & AE_WRITABLE)
0125         events |= POLLOUT;
0126 
0127     if (evport_debug)
0128         fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);
0129 
0130     rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
0131         (void *)(uintptr_t)mask);
0132     err = errno;
0133 
0134     if (evport_debug)
0135         fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err));
0136 
0137     if (rv == -1) {
0138         fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err));
0139 
0140         if (err == EAGAIN)
0141             fprintf(stderr, "aeApiAssociate: event port limit exceeded.");
0142     }
0143 
0144     return rv;
0145 }
0146 
0147 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
0148     aeApiState *state = eventLoop->apidata;
0149     int fullmask, pfd;
0150 
0151     if (evport_debug)
0152         fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);
0153 
0154     /*
0155      * Since port_associate's "events" argument replaces any existing events, we
0156      * must be sure to include whatever events are already associated when
0157      * we call port_associate() again.
0158      */
0159     fullmask = mask | eventLoop->events[fd].mask;
0160     pfd = aeApiLookupPending(state, fd);
0161 
0162     if (pfd != -1) {
0163         /*
0164          * This fd was recently returned from aeApiPoll.  It should be safe to
0165          * assume that the consumer has processed that poll event, but we play
0166          * it safer by simply updating pending_mask.  The fd will be
0167          * re-associated as usual when aeApiPoll is called again.
0168          */
0169         if (evport_debug)
0170             fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
0171         state->pending_masks[pfd] |= fullmask;
0172         return 0;
0173     }
0174 
0175     return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask));
0176 }
0177 
0178 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
0179     aeApiState *state = eventLoop->apidata;
0180     int fullmask, pfd;
0181 
0182     if (evport_debug)
0183         fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);
0184 
0185     pfd = aeApiLookupPending(state, fd);
0186 
0187     if (pfd != -1) {
0188         if (evport_debug)
0189             fprintf(stderr, "deleting event from pending fd %d\n", fd);
0190 
0191         /*
0192          * This fd was just returned from aeApiPoll, so it's not currently
0193          * associated with the port.  All we need to do is update
0194          * pending_mask appropriately.
0195          */
0196         state->pending_masks[pfd] &= ~mask;
0197 
0198         if (state->pending_masks[pfd] == AE_NONE)
0199             state->pending_fds[pfd] = -1;
0200 
0201         return;
0202     }
0203 
0204     /*
0205      * The fd is currently associated with the port.  Like with the add case
0206      * above, we must look at the full mask for the file descriptor before
0207      * updating that association.  We don't have a good way of knowing what the
0208      * events are without looking into the eventLoop state directly.  We rely on
0209      * the fact that our caller has already updated the mask in the eventLoop.
0210      */
0211 
0212     fullmask = eventLoop->events[fd].mask;
0213     if (fullmask == AE_NONE) {
0214         /*
0215          * We're removing *all* events, so use port_dissociate to remove the
0216          * association completely.  Failure here indicates a bug.
0217          */
0218         if (evport_debug)
0219             fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd);
0220 
0221         if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) {
0222             perror("aeApiDelEvent: port_dissociate");
0223             abort(); /* will not return */
0224         }
0225     } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd,
0226         fullmask) != 0) {
0227         /*
0228          * ENOMEM is a potentially transient condition, but the kernel won't
0229          * generally return it unless things are really bad.  EAGAIN indicates
0230          * we've reached an resource limit, for which it doesn't make sense to
0231          * retry (counter-intuitively).  All other errors indicate a bug.  In any
0232          * of these cases, the best we can do is to abort.
0233          */
0234         abort(); /* will not return */
0235     }
0236 }
0237 
0238 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
0239     aeApiState *state = eventLoop->apidata;
0240     struct timespec timeout, *tsp;
0241     int mask, i;
0242     uint_t nevents;
0243     port_event_t event[MAX_EVENT_BATCHSZ];
0244 
0245     /*
0246      * If we've returned fd events before, we must re-associate them with the
0247      * port now, before calling port_get().  See the block comment at the top of
0248      * this file for an explanation of why.
0249      */
0250     for (i = 0; i < state->npending; i++) {
0251         if (state->pending_fds[i] == -1)
0252             /* This fd has since been deleted. */
0253             continue;
0254 
0255         if (aeApiAssociate("aeApiPoll", state->portfd,
0256             state->pending_fds[i], state->pending_masks[i]) != 0) {
0257             /* See aeApiDelEvent for why this case is fatal. */
0258             abort();
0259         }
0260 
0261         state->pending_masks[i] = AE_NONE;
0262         state->pending_fds[i] = -1;
0263     }
0264 
0265     state->npending = 0;
0266 
0267     if (tvp != NULL) {
0268         timeout.tv_sec = tvp->tv_sec;
0269         timeout.tv_nsec = tvp->tv_usec * 1000;
0270         tsp = &timeout;
0271     } else {
0272         tsp = NULL;
0273     }
0274 
0275     /*
0276      * port_getn can return with errno == ETIME having returned some events (!).
0277      * So if we get ETIME, we check nevents, too.
0278      */
0279     nevents = 1;
0280     if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
0281         tsp) == -1 && (errno != ETIME || nevents == 0)) {
0282         if (errno == ETIME || errno == EINTR)
0283             return 0;
0284 
0285         /* Any other error indicates a bug. */
0286         perror("aeApiPoll: port_get");
0287         abort();
0288     }
0289 
0290     state->npending = nevents;
0291 
0292     for (i = 0; i < nevents; i++) {
0293             mask = 0;
0294             if (event[i].portev_events & POLLIN)
0295                 mask |= AE_READABLE;
0296             if (event[i].portev_events & POLLOUT)
0297                 mask |= AE_WRITABLE;
0298 
0299             eventLoop->fired[i].fd = event[i].portev_object;
0300             eventLoop->fired[i].mask = mask;
0301 
0302             if (evport_debug)
0303                 fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
0304                     (int)event[i].portev_object, mask);
0305 
0306             state->pending_fds[i] = event[i].portev_object;
0307             state->pending_masks[i] = (uintptr_t)event[i].portev_user;
0308     }
0309 
0310     return nevents;
0311 }
0312 
0313 static char *aeApiName(void) {
0314     return "evport";
0315 }