关于Redis网络模型的源码详析_Redis

来源:脚本之家  责任编辑:小易  

redis.c 太短了?还得长一点www.zgxue.com防采集请勿采集本网。

前言

Redis的网络模型是基于I/O多路复用程序来实现的。源码中包含四种多路复用函数库epoll、select、evport、kqueue。在程序编译时会根据系统自动选择这四种库其中之一。下面以epoll为例,来分析Redis的I/O模块的源码。

Redis对于Linux是官方支持的,安装和使用没有什么好说的,普通使用按照官方指导,5分钟以内就能搞定。详情请参考: http://redis.io/download 但有时候又想在windows下折腾下Redis,可以从redis下载页面看到

epoll系统调用方法

如何高效深入的阅读Redis的源码 先看dict.c dict.h,这两个是redis最基本的存储结构。Redis中的增删改查最后都会调用这里的函数。然后再看看string类型的增删改查和各种操作的源码。这样基本就能

Redis网络事件处理模块的代码都是围绕epoll那三个系统方法来写的。先把这三个方法弄清楚,后面就不难了。

Spark源码是有Scala语言写成的,目前,IDEA对Scala的支持要比eclipse要好,大多数人会选在在IDEA上完成Spark平台应用的开发。因此,Spark源码阅读的IDE理所当然的选择了IDEA。

epfd = epoll_create(1024);

1.先进入redis目录,将redis安装到指定目录 使用知make PREFIX=usr/local/redis install 命令 2.进入redis目录复制redis.conf文件到道usr/local/redis文件下 cp redis.conf/usr/local/redis 3.

创建epoll实例

进入先前解压后得到的文件夹(我的在/usr/redis),复制配置文件redis.conf到/etc/redis/下,并用vi命令编辑该文件,将“daemonize no”修改为“daemonize yes”,即设置成作为后台进程运行,

参数:表示该 epoll 实例最多可监听的 socket fd(文件描述符)数量。

返回: epoll 专用的文件描述符。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

管理epoll中的事件,对事件进行注册、修改和删除。

参数:

epfd:epoll实例的文件描述符;

op:取值三种:EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除;

fd:socket的文件描述符;

epoll_event *event:事件

event代表一个事件,类似于Java NIO中的channel“通道”。epoll_event 的结构如下:

typedef union epoll_data {void *ptr;int fd; /* socket文件描述符 */__uint32_t u32;__uint64_t u64;} epoll_data_t;struct epoll_event {__uint32_t events; /* Epoll events 就是各种待监听操作的操作码求与的结果,例如EPOLLIN(fd可读)、EPOLLOUT(fd可写) */epoll_data_t data; /* User data variable */};

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

等待事件是否就绪,类似于Java NIO中 select 方法。如果事件就绪,将就绪的event存入events数组中。

参数

epfd:epoll实例的文件描述符;

events:已就绪的事件数组;

intmaxevents:每次能处理的事件数;

timeout:阻塞时间,等待产生就绪事件的超时值。

源码分析

事件

Redis事件系统中将事件分为两种类型: 文件事件;网络套接字对应的事件; 时间事件:Redis中一些定时操作事件,例如 serverCron 函数。

下面从事件的注册、触发两个流程对源码进行分析

绑定事件

建立 eventLoop

在 initServer方法(由 redis.c 的 main 函数调用) 中,在建立 RedisDb 对象的同时,会初始化一个“eventLoop”对象,我称之为事件处理器对象。结构体的关键成员变量如下所示:

struct aeEventLoop{aeFileEvent *events;//已注册的文件事件数组aeFiredEvent *fired;//已就绪的文件事件数组aeTimeEvent *timeEventHead;//时间事件数组...}

初始化 eventLoop 在 ae.c 的“aeCreateEventLoop”方法中执行。该方法中除了初始化 eventLoop 还调用如下方法初始化了一个 epoll 实例。

/* * ae_epoll.c * 创建一个新的 epoll 实例,并将它赋值给 eventLoop */static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; // 初始化事件槽空间 state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } // 创建 epoll 实例 state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } // 赋值给 eventLoop eventLoop->apidata = state; return 0;}

也正是在此处调用了系统方法“epoll_create”。这里的state是一个aeApiState结构,如下所示:

/* * 事件状态 */typedef struct aeApiState { // epoll 实例描述符 int epfd; // 事件槽 struct epoll_event *events;} aeApiState;

这个 state 由 eventLoop->apidata 来记录。

绑定ip端口与句柄

通过 listenToPort 方法开启TCP端口,每个IP端口会对应一个文件描述符 ipfd(因为服务器可能会有多个ip地址)

// 打开 TCP 监听端口,用于等待客户端的命令请求if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1);

注意:*eventLoop 和 ipfd 分别被 server.el 和 server.ipfd[] 引用。server 是结构体 RedisServer 的实例,是Redis的全局变量。

注册事件

如下所示代码,为每一个文件描述符绑定一个事件函数

// initServer方法:for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); }}// ae.c 中的 aeCreateFileEvent 方法/* * 根据 mask 参数的值,监听 fd 文件的状态, * 当 fd 可用时,执行 proc 函数 */int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData){ if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } if (fd >= eventLoop->setsize) return AE_ERR; // 取出文件事件结构 aeFileEvent *fe = &eventLoop->events[fd]; // 监听指定 fd 的指定事件 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; // 设置文件事件类型,以及事件的处理器 fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; // 私有数据 fe->clientData = clientData; // 如果有需要,更新事件处理器的最大 fd if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK;}

aeCreateFileEvent 函数中有一个方法调用:aeApiAddEvent,代码如下

/* * ae_epoll.c * 关联给定事件到 fd */static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. * * 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。 * * 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。 */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; // 注册事件到 epoll ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0;}

这里实际上就是调用系统方法“epoll_ctl”,将事件(文件描述符)注册进 epoll 中。首先要封装一个 epoll_event 结构,即 ee ,通过“epoll_ctl”将其注册进 epoll 中。

除此之外,aeCreateFileEvent 还完成了下面两个重要操作: 将事件函数“acceptTcpHandler”存入了eventLoop中,即由eventLoop->events[fd]->rfileProc 来引用(也可能是wfileProc,分别代表读事件和写事件); 将当操作码添加进 eventLoop->events[fd]->mask 中(mask 类似于JavaNIO中的ops操作码,代表事件类型)。

事件监听与执行

redis.c 的main函数会调用 ae.c 中的 main 方法,如下所示:

/* * 事件处理器的主循环 */void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 如果有需要在事件处理前执行的函数,那么运行它 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 开始处理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); }}

上述代码会调用 aeProcessEvents 方法用于处理事件,方法如下所示

/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * * 处理所有已到达的时间事件,以及所有已就绪的文件事件。 * 函数的返回值为已处理事件的数量 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags){ int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // 获取最近的时间事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果时间事件存在的话 // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间 long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ // 计算距今最近的时间事件还要多久才能达到 // 并将该时间距保存在 tv 结构中 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞) if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { // 执行到这一步,说明没有时间事件 // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度 /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 设置文件事件不阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 文件事件可以阻塞直到有事件到达为止 tvp = NULL; /* wait forever */ } } // 处理文件事件,阻塞时间由 tvp 决定 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 从已就绪数组中获取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; }

该函数中代码大致分为三个主要步骤 根据时间事件与当前时间的关系,决定阻塞时间 tvp; 调用aeApiPoll方法,将就绪事件都写入eventLoop->fired[]中,返回就绪事件数目; 遍历eventLoop->fired[],遍历每一个就绪事件,执行之前绑定好的方法rfileProc 或者wfileProc。

ae_epoll.c 中的 aeApiPoll 方法如下所示:

/* * 获取可执行事件 */static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 等待时间 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // 有至少一个事件就绪? if (retval > 0) { int j; // 为已就绪事件设置相应的模式 // 并加入到 eventLoop 的 fired 数组中 numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } // 返回已就绪事件个数 return numevents;}

执行epoll_wait后,就绪的事件会被写入 eventLoop->apidata->events 事件槽。后面的循环就是将事件槽中的事件写入到 eventLoop->fired[] 中。具体描述:每一个事件都是一个 epoll_event 结构,用e来指代,则e.data.fd代表文件描述符,e->events表示其操作码,将操作码转化为mask,最后将fd 和 mask 都写入eventLoop->fired[j]中。

之后,在外层的 aeProcessEvents 方法中会执行函数指针 rfileProc 或者 wfileProc 指向的方法,例如前文提到已注册的“acceptTcpHandler”。

总结

Redis的网络模块其实是一个简易的Reactor模式。本文顺着“服务端注册事件——>接受客户端连接——>监听事件是否就绪——>执行事件”这样的路线,来分析Redis源码,描述了Redis接受客户端connect的过程。实际上NIO的思想都基本类似。

到此这篇关于Redis网络模型的源码详析的文章就介绍到这了,更多相关Redis网络模型源码内容请搜索真格学网以前的文章或继续浏览下面的相关文章希望大家以后多多支持真格学网!

第 1 步:阅读数据结构实现第 2 步:阅读内存编码数据结构实现上一步要读的数据结构,比如双端链表、字典、HyperLogLog,在算法书上或者相关的论文上都可以找到资料介绍。而内存编码数据结构却不容易找到相关的资料,因为这些数据结构都是 Redis 为了节约内存而专门开发出来的,换句话说,这些数据结构都是特制(adhoc)的,除了 Redis 源码中的文档之外,基本上找不到其他资料来了解这些特制的数据结构内容来自www.zgxue.com请勿采集。


  • 本文相关:
  • redis源码解析:集群手动故障转移、从节点迁移详解
  • spring aop实现redis缓存数据库查询源码
  • redis源码分析教程之压缩链表ziplist详解
  • redisson分布式锁源码解析
  • 从源码解读redis持久化
  • scrapy-redis源码分析之发送post请求详解
  • 在redis数据库中实现分布式速率限制的方法
  • redis教程(六):sorted-sets数据类型
  • centos 6.5 64位下安装redis3.0.2的具体步骤
  • redis stat的安装指南
  • redis配置认证密码的方法
  • 安装redis(windows和ubuntu)详解
  • redis如何存储对象与集合示例详解
  • redis服务器的启动过程分析
  • redis主从实现读写分离
  • 详解redis用链表实现消息队列
  • 如何阅读redis源码
  • redis源码的入口函数存在于哪个文件中
  • redis源码好在哪里之命令处理
  • 谁荐个基于redis秒杀系统的源码,推荐的都有分
  • windows怎么调试redis源码
  • 如何高效深入的阅读Redis的源码
  • spark源码和redis源码哪个提升大
  • linux下怎么进入redis操作
  • ubuntu下怎样源码安装redis
  • 如何高效深入的阅读Redis的源码
  • 网站首页网页制作脚本下载服务器操作系统网站运营平面设计媒体动画电脑基础硬件教程网络安全mssqlmysqlmariadboracledb2mssql2008mssql2005sqlitepostgresqlmongodbredisaccess数据库文摘数据库其它首页redis源码解析:集群手动故障转移、从节点迁移详解spring aop实现redis缓存数据库查询源码redis源码分析教程之压缩链表ziplist详解redisson分布式锁源码解析从源码解读redis持久化scrapy-redis源码分析之发送post请求详解在redis数据库中实现分布式速率限制的方法redis教程(六):sorted-sets数据类型centos 6.5 64位下安装redis3.0.2的具体步骤redis stat的安装指南redis配置认证密码的方法安装redis(windows和ubuntu)详解redis如何存储对象与集合示例详解redis服务器的启动过程分析redis主从实现读写分离详解redis用链表实现消息队列超强、超详细redis数据库入门教程redis常用命令、常见错误、配置技redis操作命令总结redis中5种数据结构的使用场景介64位windows下安装redis教程redis中使用redis-dump导出、导入redis中统计各种数据大小的方法redis常用命令小结让redis在你的系统中发挥更大作用centos 6.6下redis安装配置记录详解centos7下配置redis并开机自启动redis+mysql+quartz 一种红包发送功能的实redis全量复制与部分复制示例详解centos linux系统下安装redis过程和配置参redis通过pipeline提升吞吐量的方法通过 redis 实现 rpc 远程方法调用(支持多redis两种持久化方案rdb和aof详解如何操作redis和zookeeper实现分布式锁redis操作命令总结win10配置redis服务实现过程详解
    免责声明 - 关于我们 - 联系我们 - 广告联系 - 友情链接 - 帮助中心 - 频道导航
    Copyright © 2017 www.zgxue.com All Rights Reserved