1.9. Skynet套接字线程工作原理

涉及的到的相应结构体以及预定义注释为:

//用于标记socket结构体的状态
#define SOCKET_TYPE_INVALID 0                //socket结构体未被使用
#define SOCKET_TYPE_RESERVE 1                //socket结构体已被分配,但是还没有实际进行网络连接
#define SOCKET_TYPE_PLISTEN 2                //已经绑定套接字监听端口号,但是没有添加到epoll监听事件,调用start_socket函数才会,变为SOCKET_TYPE_LISTEN状态
#define SOCKET_TYPE_LISTEN 3                //已经绑定套接字监听端口号, 并且已经添加到epoll监听事件
#define SOCKET_TYPE_CONNECTING 4            //套接字正在连接中, 但是还没有连接上, 此时还不能传送信息
#define SOCKET_TYPE_CONNECTED 5                //套接字连接成功, 可以发送信息
#define SOCKET_TYPE_HALFCLOSE 6                //半关闭状态, 虽然套接字本身没有关闭, 但是已经不能往里边添加信息了, 最终会在清空写缓冲的情况下关闭
#define SOCKET_TYPE_PACCEPT 7                //已经接受了客户端的连接, 但是没有添加到epoll监听事件, 当调用 start_socket 才变成 CONNECTED
#define SOCKET_TYPE_BIND 8                    //绑定外部创建的套接字,监听可读事件

//处理管道信息和epoll监听事件的返回结果,即socket_server_poll返回值
#define SOCKET_DATA 0            //套接字已接收TCP数据
#define SOCKET_CLOSE 1            //套接字已被关闭
#define SOCKET_OPEN 2            //说明套接字已经可以进行正常的通信,例如绑定套接字成功,请求连接成功
#define SOCKET_ACCEPT 3            //客户端请求连接事件处理,表明连接成功
#define SOCKET_ERR 4            //出错返回
#define SOCKET_EXIT 5            //整个套接字服务退出
#define SOCKET_UDP 6            //套接字已接收UDP数据
#define SOCKET_WARNING 7        //写缓存超出阈值

//skynet_socket_message中type套接字消息的类型
#define SKYNET_SOCKET_TYPE_DATA 1        //接收到的TCP数据
#define SKYNET_SOCKET_TYPE_CONNECT 2    //套接字已经可以进行正常的通信,例如绑定套接字成功,请求连接成功
#define SKYNET_SOCKET_TYPE_CLOSE 3        //套接字已被关闭
#define SKYNET_SOCKET_TYPE_ACCEPT 4        //客户端请求连接事件处理,表明连接成功
#define SKYNET_SOCKET_TYPE_ERROR 5        //出错返回
#define SKYNET_SOCKET_TYPE_UDP 6        //接收到UDP数据
#define SKYNET_SOCKET_TYPE_WARNING 7    //写缓存超出阈值

struct skynet_socket_message {    //发送到 skynet 各个服务去的套接字消息
    int type;        //套接字消息的类型,取上面的预定义值
    int id;            //定位存储套接字信息的id
    int ud;            //套接字消息的数据的大小
    char * buffer;    //套接字消息的数据
};

//全局的信息
struct socket_server {
    int recvctrl_fd;                    //读管道fd
    int sendctrl_fd;                    //写管道fd
    int checkctrl;                        //默认值为1,是否需要检查管道中的命令的标记
    poll_fd event_fd;                    //epoll句柄
    int alloc_id;                        //当前分配到的socket ID
    int event_n;                        //epoll触发的事件数量
    int event_index;                    //当前已经处理的epoll事件的数量
    struct socket_object_interface soi;    //初始化发送对象时用
    struct event ev[MAX_EVENT];            //事件的相关数据
    struct socket slot[MAX_SOCKET];        //所有套接字相关的信息
    char buffer[MAX_INFO];                //open_socket发起TCP连接时,用于保存套接字的对端IP地址,如果是客户端套接字保存客户端的ip地址和端口号
    uint8_t udpbuffer[MAX_UDP_PACKAGE];    //接收UDP数据
    fd_set rfds;                        //select的读描述符集合
};

//每个套接字相关的信息
struct socket {                            //套接字相关信息结构体
    uintptr_t opaque;                    //一般用于存储定位服务的handle
    struct wb_list high;                //高优先级的写缓存
    struct wb_list low;                    //低优先级的写缓存
    int64_t wb_size;                    //套接字写入缓存的大小,会随着缓存的大小变化,为高优先级和低优先级的和
    int fd;                                //套接字描述符
    int id;                                //用于在socket_server结构体中定位存储套接字信息
    uint8_t protocol;                    //协议类型
    uint8_t type;                        //socket结构体所处的状态,绑定套接字时,即套接字的状态
    uint16_t udpconnecting;                //大于0标记该套接字正在进行关联ip地址操作,用于UDP协议
    int64_t warn_size;                    //阈值,写缓存超过的阈值,每超过一次阈值就会翻倍
    union {
        int size;                        //在 TCP 协议下使用, 表示一次性最多读取的字节数
        uint8_t udp_address[UDP_ADDRESS_SIZE];    //udp_address[0]存协议类型,udp_address[1],udp_address[]存端口号,剩余部分存ip地址
    } p;
    struct spinlock dw_lock;            //写缓存锁
    int dw_offset;                        //已经发送的数据偏移位置
    const void * dw_buffer;                //已发送一部分的全部数据缓存
    size_t dw_size;                        //已发送一部分的全部数据的大小
};

struct wb_list {                //写缓存队列
    struct write_buffer * head;    //队列头
    struct write_buffer * tail;    //尾队列
};

//socket 写入的缓存数据, 如果是 TCP 协议将不包含 udp_address 字段, 而仅仅是前面部分
struct write_buffer {
    struct write_buffer * next;                //处于 wb_list 中的下一个写缓存
    void *buffer;                            //调用者传递过来的缓存, 从中可以提取出发送数据, 最后需要回收内存
    char *ptr;                                //发送数据的起始指针, 会随着不断写入 socket 而向后移动
    int sz;                                    //发送数据的大小, 会随着不断写入 socket 而减小
    bool userobject;                        //标记初始化发送对象时有没有用socket_server中的soi接口
    uint8_t udp_address[UDP_ADDRESS_SIZE];    //udp_address[0]存协议类型,udp_address[1],udp_address[]存端口号,剩余部分存ip地址
};

在skynet_start函数中会调用skynet_socket.c文件中的skynet_socket_init函数对套接字线程中的全局信息进行初始化,skynet_socket_init函数调用的是socket_server.c文件中的socket_server_create函数,该函数返回初始化的全局套接字信息,该函数的主要工作为:创建一个epoll用于监听所有描述符上的事件,一个读管道和一个写管道,将读管道添加到epoll中进行可读事件的监听,分配一块保存全局信息的内存,将epoll句柄和读写管道都记录到里面,并对其他信息进行初始化,代码注释如下:

//初始化全局的套接字服务信息
struct socket_server * socket_server_create() {
    int i;
    int fd[2];
    poll_fd efd = sp_create();        //创建一个epoll
    if (sp_invalid(efd)) {
        fprintf(stderr, "socket-server: create event pool failed.\n");
        return NULL;
    }
    if (pipe(fd)) {            //产生一个读管道和一个写管道
        sp_release(efd);
        fprintf(stderr, "socket-server: create socket pair failed.\n");
        return NULL;
    }
    if (sp_add(efd, fd[0], NULL)) {        //将读管道添加到epoll中进行可读事件监听
        // add recvctrl_fd to event poll
        fprintf(stderr, "socket-server: can't add server fd to event pool.\n");
        close(fd[0]);
        close(fd[1]);
        sp_release(efd);
        return NULL;
    }

    struct socket_server *ss = MALLOC(sizeof(*ss));
    ss->event_fd = efd;                //epoll句柄
    ss->recvctrl_fd = fd[0];        //读管道fd
    ss->sendctrl_fd = fd[1];        //写管道fd
    ss->checkctrl = 1;

    for (i=0;i<MAX_SOCKET;i++) {    //MAX_SOCKET=2^16    对存储套接字的相关信息结构进行初始化
        struct socket *s = &ss->slot[i];
        s->type = SOCKET_TYPE_INVALID;        //状态初始化为无效状态
        clear_wb_list(&s->high);            //清空高优先级写缓存队列
        clear_wb_list(&s->low);                //清空低优先级写缓存队列
    }
    ss->alloc_id = 0;                        //记录当前分配可以分配的套接字信息的位置
    ss->event_n = 0;                        //epoll中监听到的事件数量
    ss->event_index = 0;                    //当前处理到第几个事件
    memset(&ss->soi, 0, sizeof(ss->soi));
    FD_ZERO(&ss->rfds);            //清空描述符集合
    assert(ss->recvctrl_fd < FD_SETSIZE);    //读管道是否有效

    return ss;
}

套接字线程的主要处理过程为skynet_socket.c文件中的函数skynet_socket_poll,该函数调用socket_server.c文件中的socket_server_poll函数(该函数的主要处理流程如上图所示),skynet_socket_poll函数根据socket_server_poll函数处理的返回值将套接字的信息调用函数forward_message发送给对应的服务,代码注释为:

//处理所有套接字上的事件,返回处理的结果,将处理的结果及结果信息转发给对应的服务
int skynet_socket_poll() {
    struct socket_server *ss = SOCKET_SERVER;
    assert(ss);
    struct socket_message result;
    int more = 1;
    int type = socket_server_poll(ss, &result, &more);    //处理所有套接字上的事件,返回处理结果及信息
    switch (type) {
    case SOCKET_EXIT:
        return 0;            //整个套接字服务退出
    case SOCKET_DATA:        //接收到TCP数据 result中存有定位服务的handle,定位套接字信息的id,接收到的数据大小ud以及数据data
        forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result); 
        break;
    case SOCKET_CLOSE:        //套接字已被关闭
        forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
        break;
    case SOCKET_OPEN:        //说明套接字已经可以进行正常的通信,例如绑定套接字成功,请求连接成功
        forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
        break;
    case SOCKET_ERR:        //出错返回
        forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
        break;
    case SOCKET_ACCEPT:        //客户端请求连接事件处理,表明连接成功
        forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);    //result->data保存客户端的ip地址和端口号
        break;
    case SOCKET_UDP:        //接收到UDP数据 result中存有定位服务的handle,定位套接字信息的id,接收到的数据大小ud以及数据+IP地址data
        forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
        break;
    case SOCKET_WARNING:    //写缓存超出阈值
        forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
        break;
    default:
        skynet_error(NULL, "Unknown socket message type %d.",type);
        return -1;
    }
    if (more) {
        return -1;
    }
    return 1;
}

//将套接字上的消息发送给对应的服务,type套接字消息的类型,padding是否需要填充内容
static void forward_message(int type, bool padding, struct socket_message * result) {
    struct skynet_socket_message *sm;
    size_t sz = sizeof(*sm);
    if (padding) {    //是否需要填充,填充的内容大小不能超过128
        if (result->data) {
            size_t msg_sz = strlen(result->data);
            if (msg_sz > 128) {
                msg_sz = 128;
            }
            sz += msg_sz;
        } else {
            result->data = "";
        }
    }
    sm = (struct skynet_socket_message *)skynet_malloc(sz);
    sm->type = type;        //套接字消息的类型
    sm->id = result->id;    //定位存储套接字信息的id
    sm->ud = result->ud;    //一般为套接字消息的大小
    if (padding) {
        sm->buffer = NULL;
        memcpy(sm+1, result->data, sz - sizeof(*sm));    //填充的内容放在skynet_socket_message消息的下一个地址上
    } else {
        sm->buffer = result->data;        //不需要填充的内容
    }

    struct skynet_message message;    //skynet消息
    message.source = 0;
    message.session = 0;
    message.data = sm;        //skynet消息内容包含skynet_socket_message已经如果需要填充的内容
    message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);    //消息的类型PTYPE_SOCKET及大小

    if (skynet_context_push((uint32_t)result->opaque, &message)) {    //添加消息到对应的服务队列
        // todo: report somewhere to close socket
        // don't call skynet_socket_close here (It will block mainloop)
        skynet_free(sm->buffer);
        skynet_free(sm);
    }
}

下面将详细介绍socket_server_poll函数的工作流程,该函数首先判断是否需要检查读管道上可读事件,默认为是,通过调用函数has_cmd将读管道描述符添加到select进行可读事件监听,select为不等待立即返回模式,如果检查到有可读事件则调用ctrl_cmd函数读取读管道中的命令及数据,通过命令调用的相应的处理函数进行处理,回到socket_server_poll函数接下来是等待epoll中所有监听的事件触发,如果事件触发,则从事件列表中一个一个取出事件进行处理,对事件的处理主要根据套接字的状态:正在建立连接的套接字(SOCKET_TYPE_CONNECTING)、处于监听状态的套接字(SOCKET_TYPE_LISTEN)、无效套接字(SOCKET_TYPE_INVALID)、其他状态的套接字。对于SOCKET_TYPE_CONNECTING当收到触发事件说明连接已经建立,则调用report_connect函数改变套接字的状态为SOCKET_TYPE_CONNECTED ,监听事件改为可读事件。对于SOCKET_TYPE_LISTEN与客户端建立连接,保存客户端套接字信息,并将其设置为非阻塞模式,但是不添加到epoll进行事件监听,还需要等待'S'命令。对于SOCKET_TYPE_INVALID不做处理,其他状态的套接字根据事件是可读事件还是可写事件,根据套接字的协议类型对套接字进行读写操作,或者是错误事件进行错误处理。

//检查读管道中的命令,有命令则读取命令及携带的数据进行处理,如果各个套接字上没有事件需要处理则
//监听所有套接字注册的事件,等待事件触发,如果有事件需要处理,则一个一个事件进行处理,
//more为1表示上次的事件还没处理完,0表示上次的事件都处理完了
int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
    for (;;) {
        if (ss->checkctrl) {    //判断是否需要检查读管道中的命令,默认需要
            if (has_cmd(ss)) {    //检查读管道上是否有命令可读取,有则返回1,否则返回0
                int type = ctrl_cmd(ss, result);    //从读管道上读取相应的命令,并对其数据进行处理,返回相应的处理结果
                if (type != -1) {
                    clear_closed_event(ss, result, type);    //清除掉该套接字相关的监听事件
                    return type;
                } else         //type=-1说明是一个过渡状态
                    continue;
            } else {
                ss->checkctrl = 0;
            }
        }
        if (ss->event_index == ss->event_n) {
            ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);    //等待epoll上监听的事件触发,阻塞,返回触发事件的数量
            ss->checkctrl = 1;
            if (more) {
                *more = 0;            //标记上一次的事件都处理完了
            }
            ss->event_index = 0;
            if (ss->event_n <= 0) {
                ss->event_n = 0;
                if (errno == EINTR) {    //判断是否是中断
                    continue;
                }
                return -1;
            }
        }
        struct event *e = &ss->ev[ss->event_index++];    //从监听到的事件中取出一个事件
        struct socket *s = e->s;    //取出事件附带的socket信息
        if (s == NULL) {            //开始时发送的是管道消息
            // dispatch pipe message at beginning
            continue;
        }
        struct socket_lock l;
        socket_lock_init(s, &l);    //锁l引用套接字数据中的锁
        switch (s->type) {
        case SOCKET_TYPE_CONNECTING:    //套接字正在连接中, 但是还没有连接上, 此时还不能传送信息
            return report_connect(ss, s, &l, result);    //改变套接字的状态为SOCKET_TYPE_CONNECTED ,监听事件改为可读事件,保存对端ip地址
        case SOCKET_TYPE_LISTEN: {        //套接字处于监听状态,说明有客户端发送连接请求
            int ok = report_accept(ss, s, result);        //等待客户端的连接请求
            if (ok > 0) {
                return SOCKET_ACCEPT;    //正常连接
            } if (ok < 0 ) {
                return SOCKET_ERR;        //描述符超出限制
            }
            // when ok == 0, retry
            break;
        }
        case SOCKET_TYPE_INVALID:        //套接字信息不存在
            fprintf(stderr, "socket-server: invalid socket\n");
            break;
        default:        //如果套接字信息存储着,已经正常连接好,并且不是客户端请求连接服务端,则进行下列处理
            if (e->read) {        //可读事件
                int type;
                if (s->protocol == PROTOCOL_TCP) {    //如果是TCP通信
                    type = forward_message_tcp(ss, s, &l, result);    //读取数据
                } else {                            //如果是UDP通信
                    type = forward_message_udp(ss, s, &l, result);    //接收UDP数据
                    if (type == SOCKET_UDP) {        //如果接收到UDP数据
                        // try read again
                        --ss->event_index;            //下次还会尝试去读取一次数据
                        return SOCKET_UDP;
                    }
                }
                if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
                    // Try to dispatch write message next step if write flag set.
                    e->read = false;
                    --ss->event_index;                //下次会再尝试去写数据
                }
                if (type == -1)
                    break;                
                return type;
            }
            if (e->write) {        //可写事件
                int type = send_buffer(ss, s, &l, result);    //发送套接字中写缓存中的数据
                if (type == -1)
                    break;
                return type;
            }
            if (e->error) {        //错误事件
                // close when error
                int error;
                socklen_t len = sizeof(error);  
                int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);  //获得套接字s->fd上的获取错误状态并清除,无错误发生返回0
                const char * err = NULL;
                if (code < 0) {
                    err = strerror(errno);
                } else if (error != 0) {
                    err = strerror(error);
                } else {
                    err = "Unknown error";
                }
                force_close(ss, s, &l, result);        //强制关闭套接字
                result->data = (char *)err;            //记录错误原因
                return SOCKET_ERR;                    //返回错误
            }
            break;
        }
    }
}

//用于检查是否有命令,通过一个select监听读管道fd是否有可读事件,有则返回1,否则返回0,
static int has_cmd(struct socket_server *ss) {
    struct timeval tv = {0,0};
    int retval;

    FD_SET(ss->recvctrl_fd, &ss->rfds);        //将管道读fd加入读描述符集合

    retval = select(ss->recvctrl_fd+1, &ss->rfds, NULL, NULL, &tv);    //创建一个select,不阻塞
    if (retval == 1) {
        return 1;
    }
    return 0;
}

//从读管道中取出相应的命令及附带的数据进行处理,result保存各个命令处理的结果信息,
static int ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
    int fd = ss->recvctrl_fd;
    // the length of message is one byte, so 256+8 buffer size is enough.
    uint8_t buffer[256];    //数据内容缓存
    uint8_t header[2];        //命令缓存
    block_readpipe(fd, header, sizeof(header));    //读取读管道中指定长度的命令内容
    int type = header[0];    //命令的类型
    int len = header[1];    //命令附带的数据长度
    block_readpipe(fd, buffer, len);    //读取读管道中指定长度的数据内容
    // ctrl command only exist in local fd, so don't worry about endian.
    switch (type) {
    case 'S':    //开始添加套接字到epoll进行可读事件监听,改变套接字的状态为 SOCKET_TYPE_CONNECTED 或 SOCKET_TYPE_LISTEN
        return start_socket(ss,(struct request_start *)buffer, result);    //成功返回SOCKET_OPEN,否则返回-1或者SOCKET_ERR
    case 'B':    //绑定一个由外部生成的套接字,并添加到epoll监听读事件,设置套接字为非阻塞模式,改变状态为SOCKET_TYPE_BIND
        return bind_socket(ss,(struct request_bind *)buffer, result);    //成功返回SOCKET_OPEN,否则为SOCKET_ERR
    case 'L':    //将已经监听的套接字添加到套接字信息结构中,但不添加到epoll中监听事件,套接字的状态由SOCKET_TYPE_RESERVE变为SOCKET_TYPE_PLISTEN
        return listen_socket(ss,(struct request_listen *)buffer, result);    //成功返回-1,否则返回SOCKET_ERR
    case 'K':    //关闭套接字,如果是强制关闭或没有数据则直接关闭,否则如果有数据则先发送完数据,再将套接字状态设置为SOCKET_TYPE_HALFCLOSE,
        return close_socket(ss,(struct request_close *)buffer, result);
    case 'O':    //发起TCP连接服务端请求
        return open_socket(ss, (struct request_open *)buffer, result);    //返回-1表示正在连接中,返回SOCKET_OPEN表示已连接成功,返回SOCKET_ERR失败
    case 'X':    //整个套接字服务退出
        result->opaque = 0;
        result->id = 0;
        result->ud = 0;
        result->data = NULL;
        return SOCKET_EXIT;
    case 'D':    //向套接字发送数据,将数据添加到高优先级写缓存中
        return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH, NULL);
    case 'P':    //向套接字发送数据,将数据添加到低优先级写缓存中
        return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW, NULL);
    case 'A': {    //UDP协议,向套接字发送数据,未发送完添加到高优先级写缓存队列中
        struct request_send_udp * rsu = (struct request_send_udp *)buffer;
        return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address);
    }
    case 'C':    //设置指定套接字信息中的ip地址,前提是套接字信息有效及协议类型匹配,此过程中s->udpconnecting大于0
        return set_udp_address(ss, (struct request_setudp *)buffer, result);    //协议不匹配返回SOCKET_ERR,否则返回-1
    case 'T':    //设置套接字的选项,选项的层次在 IPPROTO_TCP 上 , 设置的键和值都是 int 类型的, 
        setopt_socket(ss, (struct request_setopt *)buffer);    //目前仅用于设置套接字的 TCP_NODELAY 选项,request->what为1禁止发送合并的Nagle算法
        return -1;
    case 'U':    //添加产生的套接字到分配的套接字信息结构中,并添加可读事件的监听,修改套接字的状态为 SOCKET_TYPE_CONNECTED
        add_udp_socket(ss, (struct request_udp *)buffer);    //添加成功后不关联对端ip地址信息
        return -1;
    default:
        fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type);
        return -1;
    };

    return -1;
}

results matching ""

    No results matching ""