Back to home page

Redis cross reference

 
 

    


0001 /* Background I/O service for Redis.
0002  *
0003  * This file implements operations that we need to perform in the background.
0004  * Currently there is only a single operation, that is a background close(2)
0005  * system call. This is needed as when the process is the last owner of a
0006  * reference to a file closing it means unlinking it, and the deletion of the
0007  * file is slow, blocking the server.
0008  *
0009  * In the future we'll either continue implementing new things we need or
0010  * we'll switch to libeio. However there are probably long term uses for this
0011  * file as we may want to put here Redis specific background tasks (for instance
0012  * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
0013  * implementation).
0014  *
0015  * DESIGN
0016  * ------
0017  *
0018  * The design is trivial, we have a structure representing a job to perform
0019  * and a different thread and job queue for every job type.
0020  * Every thread wait for new jobs in its queue, and process every job
0021  * sequentially.
0022  *
0023  * Jobs of the same type are guaranteed to be processed from the least
0024  * recently inserted to the most recently inserted (older jobs processed
0025  * first).
0026  *
0027  * Currently there is no way for the creator of the job to be notified about
0028  * the completion of the operation, this will only be added when/if needed.
0029  *
0030  * ----------------------------------------------------------------------------
0031  *
0032  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
0033  * All rights reserved.
0034  *
0035  * Redistribution and use in source and binary forms, with or without
0036  * modification, are permitted provided that the following conditions are met:
0037  *
0038  *   * Redistributions of source code must retain the above copyright notice,
0039  *     this list of conditions and the following disclaimer.
0040  *   * Redistributions in binary form must reproduce the above copyright
0041  *     notice, this list of conditions and the following disclaimer in the
0042  *     documentation and/or other materials provided with the distribution.
0043  *   * Neither the name of Redis nor the names of its contributors may be used
0044  *     to endorse or promote products derived from this software without
0045  *     specific prior written permission.
0046  *
0047  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0048  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0049  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0050  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0051  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0052  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0053  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0054  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0055  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0056  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0057  * POSSIBILITY OF SUCH DAMAGE.
0058  */
0059 
0060 
0061 #include "redis.h"
0062 #include "bio.h"
0063 
0064 static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
0065 static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
0066 static list *bio_jobs[REDIS_BIO_NUM_OPS];
0067 /* The following array is used to hold the number of pending jobs for every
0068  * OP type. This allows us to export the bioPendingJobsOfType() API that is
0069  * useful when the main thread wants to perform some operation that may involve
0070  * objects shared with the background thread. The main thread will just wait
0071  * that there are no longer jobs of this type to be executed before performing
0072  * the sensible operation. This data is also useful for reporting. */
0073 static unsigned long long bio_pending[REDIS_BIO_NUM_OPS];
0074 
0075 /* This structure represents a background Job. It is only used locally to this
0076  * file as the API does not expose the internals at all. */
0077 struct bio_job {
0078     time_t time; /* Time at which the job was created. */
0079     /* Job specific arguments pointers. If we need to pass more than three
0080      * arguments we can just pass a pointer to a structure or alike. */
0081     void *arg1, *arg2, *arg3;
0082 };
0083 
0084 void *bioProcessBackgroundJobs(void *arg);
0085 
0086 /* Make sure we have enough stack to perform all the things we do in the
0087  * main thread. */
0088 #define REDIS_THREAD_STACK_SIZE (1024*1024*4)
0089 
0090 /* Initialize the background system, spawning the thread. */
0091 void bioInit(void) {
0092     pthread_attr_t attr;
0093     pthread_t thread;
0094     size_t stacksize;
0095     int j;
0096 
0097     /* Initialization of state vars and objects */
0098     for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
0099         pthread_mutex_init(&bio_mutex[j],NULL);
0100         pthread_cond_init(&bio_condvar[j],NULL);
0101         bio_jobs[j] = listCreate();
0102         bio_pending[j] = 0;
0103     }
0104 
0105     /* Set the stack size as by default it may be small in some system */
0106     pthread_attr_init(&attr);
0107     pthread_attr_getstacksize(&attr,&stacksize);
0108     if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
0109     while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
0110     pthread_attr_setstacksize(&attr, stacksize);
0111 
0112     /* Ready to spawn our threads. We use the single argument the thread
0113      * function accepts in order to pass the job ID the thread is
0114      * responsible of. */
0115     for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
0116         void *arg = (void*)(unsigned long) j;
0117         if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
0118             redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
0119             exit(1);
0120         }
0121     }
0122 }
0123 
0124 void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
0125     struct bio_job *job = zmalloc(sizeof(*job));
0126 
0127     job->time = time(NULL);
0128     job->arg1 = arg1;
0129     job->arg2 = arg2;
0130     job->arg3 = arg3;
0131     pthread_mutex_lock(&bio_mutex[type]);
0132     listAddNodeTail(bio_jobs[type],job);
0133     bio_pending[type]++;
0134     pthread_cond_signal(&bio_condvar[type]);
0135     pthread_mutex_unlock(&bio_mutex[type]);
0136 }
0137 
0138 void *bioProcessBackgroundJobs(void *arg) {
0139     struct bio_job *job;
0140     unsigned long type = (unsigned long) arg;
0141     sigset_t sigset;
0142 
0143     pthread_detach(pthread_self());
0144     pthread_mutex_lock(&bio_mutex[type]);
0145     /* Block SIGALRM so we are sure that only the main thread will
0146      * receive the watchdog signal. */
0147     sigemptyset(&sigset);
0148     sigaddset(&sigset, SIGALRM);
0149     if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
0150         redisLog(REDIS_WARNING,
0151             "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
0152 
0153     while(1) {
0154         listNode *ln;
0155 
0156         /* The loop always starts with the lock hold. */
0157         if (listLength(bio_jobs[type]) == 0) {
0158             pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
0159             continue;
0160         }
0161         /* Pop the job from the queue. */
0162         ln = listFirst(bio_jobs[type]);
0163         job = ln->value;
0164         /* It is now possible to unlock the background system as we know have
0165          * a stand alone job structure to process.*/
0166         pthread_mutex_unlock(&bio_mutex[type]);
0167 
0168         /* Process the job accordingly to its type. */
0169         if (type == REDIS_BIO_CLOSE_FILE) {
0170             close((long)job->arg1);
0171         } else if (type == REDIS_BIO_AOF_FSYNC) {
0172             aof_fsync((long)job->arg1);
0173         } else {
0174             redisPanic("Wrong job type in bioProcessBackgroundJobs().");
0175         }
0176         zfree(job);
0177 
0178         /* Lock again before reiterating the loop, if there are no longer
0179          * jobs to process we'll block again in pthread_cond_wait(). */
0180         pthread_mutex_lock(&bio_mutex[type]);
0181         listDelNode(bio_jobs[type],ln);
0182         bio_pending[type]--;
0183     }
0184 }
0185 
0186 /* Return the number of pending jobs of the specified type. */
0187 unsigned long long bioPendingJobsOfType(int type) {
0188     unsigned long long val;
0189     pthread_mutex_lock(&bio_mutex[type]);
0190     val = bio_pending[type];
0191     pthread_mutex_unlock(&bio_mutex[type]);
0192     return val;
0193 }
0194 
0195 #if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE
0196          probably needs a rewrite using conditional variables instead of the
0197          current implementation. */
0198          
0199 
0200 /* Wait until the number of pending jobs of the specified type are
0201  * less or equal to the specified number.
0202  *
0203  * This function may block for long time, it should only be used to perform
0204  * the following tasks:
0205  *
0206  * 1) To avoid that the main thread is pushing jobs of a given time so fast
0207  *    that the background thread can't process them at the same speed.
0208  *    So before creating a new job of a given type the main thread should
0209  *    call something like: bioWaitPendingJobsLE(job_type,10000);
0210  * 2) In order to perform special operations that make it necessary to be sure
0211  *    no one is touching shared resourced in the background.
0212  */
0213 void bioWaitPendingJobsLE(int type, unsigned long long num) {
0214     unsigned long long iteration = 0;
0215 
0216     /* We poll the jobs queue aggressively to start, and gradually relax
0217      * the polling speed if it is going to take too much time. */
0218     while(1) {
0219         iteration++;
0220         if (iteration > 1000 && iteration <= 10000) {
0221             usleep(100);
0222         } else if (iteration > 10000) {
0223             usleep(1000);
0224         }
0225         if (bioPendingJobsOfType(type) <= num) break;
0226     }
0227 }
0228 
0229 /* Return the older job of the specified type. */
0230 time_t bioOlderJobOfType(int type) {
0231     time_t time;
0232     listNode *ln;
0233     struct bio_job *job;
0234 
0235     pthread_mutex_lock(&bio_mutex[type]);
0236     ln = listFirst(bio_jobs[type]);
0237     if (ln == NULL) {
0238         pthread_mutex_unlock(&bio_mutex[type]);
0239         return 0;
0240     }
0241     job = ln->value;
0242     time = job->time;
0243     pthread_mutex_unlock(&bio_mutex[type]);
0244     return time;
0245 }
0246 
0247 #endif