1.9. Skynet套接字线程工作原理
涉及的到的相应结构体以及预定义注释为:
//用于标记socket结构体的状态
#define SOCKET_TYPE_INVALID 0 //socket结构体未被使用
#define SOCKET_TYPE_RESERVE 1 //socket结构体已被分配,但是还没有实际进行网络连接
#define SOCKET_TYPE_PLISTEN 2 //已经绑定套接字监听端口号,但是没有添加到epoll监听事件,调用start_socket函数才会,变为SOCKET_TYPE_LISTEN状态
#define SOCKET_TYPE_LISTEN 3 //已经绑定套接字监听端口号, 并且已经添加到epoll监听事件
#define SOCKET_TYPE_CONNECTING 4 //套接字正在连接中, 但是还没有连接上, 此时还不能传送信息
#define SOCKET_TYPE_CONNECTED 5 //套接字连接成功, 可以发送信息
#define SOCKET_TYPE_HALFCLOSE 6 //半关闭状态, 虽然套接字本身没有关闭, 但是已经不能往里边添加信息了, 最终会在清空写缓冲的情况下关闭
#define SOCKET_TYPE_PACCEPT 7 //已经接受了客户端的连接, 但是没有添加到epoll监听事件, 当调用 start_socket 才变成 CONNECTED
#define SOCKET_TYPE_BIND 8 //绑定外部创建的套接字,监听可读事件
//处理管道信息和epoll监听事件的返回结果,即socket_server_poll返回值
#define SOCKET_DATA 0 //套接字已接收TCP数据
#define SOCKET_CLOSE 1 //套接字已被关闭
#define SOCKET_OPEN 2 //说明套接字已经可以进行正常的通信,例如绑定套接字成功,请求连接成功
#define SOCKET_ACCEPT 3 //客户端请求连接事件处理,表明连接成功
#define SOCKET_ERR 4 //出错返回
#define SOCKET_EXIT 5 //整个套接字服务退出
#define SOCKET_UDP 6 //套接字已接收UDP数据
#define SOCKET_WARNING 7 //写缓存超出阈值
//skynet_socket_message中type套接字消息的类型
#define SKYNET_SOCKET_TYPE_DATA 1 //接收到的TCP数据
#define SKYNET_SOCKET_TYPE_CONNECT 2 //套接字已经可以进行正常的通信,例如绑定套接字成功,请求连接成功
#define SKYNET_SOCKET_TYPE_CLOSE 3 //套接字已被关闭
#define SKYNET_SOCKET_TYPE_ACCEPT 4 //客户端请求连接事件处理,表明连接成功
#define SKYNET_SOCKET_TYPE_ERROR 5 //出错返回
#define SKYNET_SOCKET_TYPE_UDP 6 //接收到UDP数据
#define SKYNET_SOCKET_TYPE_WARNING 7 //写缓存超出阈值
struct skynet_socket_message { //发送到 skynet 各个服务去的套接字消息
int type; //套接字消息的类型,取上面的预定义值
int id; //定位存储套接字信息的id
int ud; //套接字消息的数据的大小
char * buffer; //套接字消息的数据
};
//全局的信息
struct socket_server {
int recvctrl_fd; //读管道fd
int sendctrl_fd; //写管道fd
int checkctrl; //默认值为1,是否需要检查管道中的命令的标记
poll_fd event_fd; //epoll句柄
int alloc_id; //当前分配到的socket ID
int event_n; //epoll触发的事件数量
int event_index; //当前已经处理的epoll事件的数量
struct socket_object_interface soi; //初始化发送对象时用
struct event ev[MAX_EVENT]; //事件的相关数据
struct socket slot[MAX_SOCKET]; //所有套接字相关的信息
char buffer[MAX_INFO]; //open_socket发起TCP连接时,用于保存套接字的对端IP地址,如果是客户端套接字保存客户端的ip地址和端口号
uint8_t udpbuffer[MAX_UDP_PACKAGE]; //接收UDP数据
fd_set rfds; //select的读描述符集合
};
//每个套接字相关的信息
struct socket { //套接字相关信息结构体
uintptr_t opaque; //一般用于存储定位服务的handle
struct wb_list high; //高优先级的写缓存
struct wb_list low; //低优先级的写缓存
int64_t wb_size; //套接字写入缓存的大小,会随着缓存的大小变化,为高优先级和低优先级的和
int fd; //套接字描述符
int id; //用于在socket_server结构体中定位存储套接字信息
uint8_t protocol; //协议类型
uint8_t type; //socket结构体所处的状态,绑定套接字时,即套接字的状态
uint16_t udpconnecting; //大于0标记该套接字正在进行关联ip地址操作,用于UDP协议
int64_t warn_size; //阈值,写缓存超过的阈值,每超过一次阈值就会翻倍
union {
int size; //在 TCP 协议下使用, 表示一次性最多读取的字节数
uint8_t udp_address[UDP_ADDRESS_SIZE]; //udp_address[0]存协议类型,udp_address[1],udp_address[]存端口号,剩余部分存ip地址
} p;
struct spinlock dw_lock; //写缓存锁
int dw_offset; //已经发送的数据偏移位置
const void * dw_buffer; //已发送一部分的全部数据缓存
size_t dw_size; //已发送一部分的全部数据的大小
};
struct wb_list { //写缓存队列
struct write_buffer * head; //队列头
struct write_buffer * tail; //尾队列
};
//socket 写入的缓存数据, 如果是 TCP 协议将不包含 udp_address 字段, 而仅仅是前面部分
struct write_buffer {
struct write_buffer * next; //处于 wb_list 中的下一个写缓存
void *buffer; //调用者传递过来的缓存, 从中可以提取出发送数据, 最后需要回收内存
char *ptr; //发送数据的起始指针, 会随着不断写入 socket 而向后移动
int sz; //发送数据的大小, 会随着不断写入 socket 而减小
bool userobject; //标记初始化发送对象时有没有用socket_server中的soi接口
uint8_t udp_address[UDP_ADDRESS_SIZE]; //udp_address[0]存协议类型,udp_address[1],udp_address[]存端口号,剩余部分存ip地址
};
在skynet_start函数中会调用skynet_socket.c文件中的skynet_socket_init函数对套接字线程中的全局信息进行初始化,skynet_socket_init函数调用的是socket_server.c文件中的socket_server_create函数,该函数返回初始化的全局套接字信息,该函数的主要工作为:创建一个epoll用于监听所有描述符上的事件,一个读管道和一个写管道,将读管道添加到epoll中进行可读事件的监听,分配一块保存全局信息的内存,将epoll句柄和读写管道都记录到里面,并对其他信息进行初始化,代码注释如下:
//初始化全局的套接字服务信息
struct socket_server * socket_server_create() {
int i;
int fd[2];
poll_fd efd = sp_create(); //创建一个epoll
if (sp_invalid(efd)) {
fprintf(stderr, "socket-server: create event pool failed.\n");
return NULL;
}
if (pipe(fd)) { //产生一个读管道和一个写管道
sp_release(efd);
fprintf(stderr, "socket-server: create socket pair failed.\n");
return NULL;
}
if (sp_add(efd, fd[0], NULL)) { //将读管道添加到epoll中进行可读事件监听
// add recvctrl_fd to event poll
fprintf(stderr, "socket-server: can't add server fd to event pool.\n");
close(fd[0]);
close(fd[1]);
sp_release(efd);
return NULL;
}
struct socket_server *ss = MALLOC(sizeof(*ss));
ss->event_fd = efd; //epoll句柄
ss->recvctrl_fd = fd[0]; //读管道fd
ss->sendctrl_fd = fd[1]; //写管道fd
ss->checkctrl = 1;
for (i=0;i<MAX_SOCKET;i++) { //MAX_SOCKET=2^16 对存储套接字的相关信息结构进行初始化
struct socket *s = &ss->slot[i];
s->type = SOCKET_TYPE_INVALID; //状态初始化为无效状态
clear_wb_list(&s->high); //清空高优先级写缓存队列
clear_wb_list(&s->low); //清空低优先级写缓存队列
}
ss->alloc_id = 0; //记录当前分配可以分配的套接字信息的位置
ss->event_n = 0; //epoll中监听到的事件数量
ss->event_index = 0; //当前处理到第几个事件
memset(&ss->soi, 0, sizeof(ss->soi));
FD_ZERO(&ss->rfds); //清空描述符集合
assert(ss->recvctrl_fd < FD_SETSIZE); //读管道是否有效
return ss;
}
套接字线程的主要处理过程为skynet_socket.c文件中的函数skynet_socket_poll,该函数调用socket_server.c文件中的socket_server_poll函数(该函数的主要处理流程如上图所示),skynet_socket_poll函数根据socket_server_poll函数处理的返回值将套接字的信息调用函数forward_message发送给对应的服务,代码注释为:
//处理所有套接字上的事件,返回处理的结果,将处理的结果及结果信息转发给对应的服务
int skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more); //处理所有套接字上的事件,返回处理结果及信息
switch (type) {
case SOCKET_EXIT:
return 0; //整个套接字服务退出
case SOCKET_DATA: //接收到TCP数据 result中存有定位服务的handle,定位套接字信息的id,接收到的数据大小ud以及数据data
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
case SOCKET_CLOSE: //套接字已被关闭
forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
break;
case SOCKET_OPEN: //说明套接字已经可以进行正常的通信,例如绑定套接字成功,请求连接成功
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
case SOCKET_ERR: //出错返回
forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
break;
case SOCKET_ACCEPT: //客户端请求连接事件处理,表明连接成功
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result); //result->data保存客户端的ip地址和端口号
break;
case SOCKET_UDP: //接收到UDP数据 result中存有定位服务的handle,定位套接字信息的id,接收到的数据大小ud以及数据+IP地址data
forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
break;
case SOCKET_WARNING: //写缓存超出阈值
forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
break;
default:
skynet_error(NULL, "Unknown socket message type %d.",type);
return -1;
}
if (more) {
return -1;
}
return 1;
}
//将套接字上的消息发送给对应的服务,type套接字消息的类型,padding是否需要填充内容
static void forward_message(int type, bool padding, struct socket_message * result) {
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
if (padding) { //是否需要填充,填充的内容大小不能超过128
if (result->data) {
size_t msg_sz = strlen(result->data);
if (msg_sz > 128) {
msg_sz = 128;
}
sz += msg_sz;
} else {
result->data = "";
}
}
sm = (struct skynet_socket_message *)skynet_malloc(sz);
sm->type = type; //套接字消息的类型
sm->id = result->id; //定位存储套接字信息的id
sm->ud = result->ud; //一般为套接字消息的大小
if (padding) {
sm->buffer = NULL;
memcpy(sm+1, result->data, sz - sizeof(*sm)); //填充的内容放在skynet_socket_message消息的下一个地址上
} else {
sm->buffer = result->data; //不需要填充的内容
}
struct skynet_message message; //skynet消息
message.source = 0;
message.session = 0;
message.data = sm; //skynet消息内容包含skynet_socket_message已经如果需要填充的内容
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT); //消息的类型PTYPE_SOCKET及大小
if (skynet_context_push((uint32_t)result->opaque, &message)) { //添加消息到对应的服务队列
// todo: report somewhere to close socket
// don't call skynet_socket_close here (It will block mainloop)
skynet_free(sm->buffer);
skynet_free(sm);
}
}
下面将详细介绍socket_server_poll函数的工作流程,该函数首先判断是否需要检查读管道上可读事件,默认为是,通过调用函数has_cmd将读管道描述符添加到select进行可读事件监听,select为不等待立即返回模式,如果检查到有可读事件则调用ctrl_cmd函数读取读管道中的命令及数据,通过命令调用的相应的处理函数进行处理,回到socket_server_poll函数接下来是等待epoll中所有监听的事件触发,如果事件触发,则从事件列表中一个一个取出事件进行处理,对事件的处理主要根据套接字的状态:正在建立连接的套接字(SOCKET_TYPE_CONNECTING)、处于监听状态的套接字(SOCKET_TYPE_LISTEN)、无效套接字(SOCKET_TYPE_INVALID)、其他状态的套接字。对于SOCKET_TYPE_CONNECTING当收到触发事件说明连接已经建立,则调用report_connect函数改变套接字的状态为SOCKET_TYPE_CONNECTED ,监听事件改为可读事件。对于SOCKET_TYPE_LISTEN与客户端建立连接,保存客户端套接字信息,并将其设置为非阻塞模式,但是不添加到epoll进行事件监听,还需要等待'S'命令。对于SOCKET_TYPE_INVALID不做处理,其他状态的套接字根据事件是可读事件还是可写事件,根据套接字的协议类型对套接字进行读写操作,或者是错误事件进行错误处理。
//检查读管道中的命令,有命令则读取命令及携带的数据进行处理,如果各个套接字上没有事件需要处理则
//监听所有套接字注册的事件,等待事件触发,如果有事件需要处理,则一个一个事件进行处理,
//more为1表示上次的事件还没处理完,0表示上次的事件都处理完了
int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->checkctrl) { //判断是否需要检查读管道中的命令,默认需要
if (has_cmd(ss)) { //检查读管道上是否有命令可读取,有则返回1,否则返回0
int type = ctrl_cmd(ss, result); //从读管道上读取相应的命令,并对其数据进行处理,返回相应的处理结果
if (type != -1) {
clear_closed_event(ss, result, type); //清除掉该套接字相关的监听事件
return type;
} else //type=-1说明是一个过渡状态
continue;
} else {
ss->checkctrl = 0;
}
}
if (ss->event_index == ss->event_n) {
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT); //等待epoll上监听的事件触发,阻塞,返回触发事件的数量
ss->checkctrl = 1;
if (more) {
*more = 0; //标记上一次的事件都处理完了
}
ss->event_index = 0;
if (ss->event_n <= 0) {
ss->event_n = 0;
if (errno == EINTR) { //判断是否是中断
continue;
}
return -1;
}
}
struct event *e = &ss->ev[ss->event_index++]; //从监听到的事件中取出一个事件
struct socket *s = e->s; //取出事件附带的socket信息
if (s == NULL) { //开始时发送的是管道消息
// dispatch pipe message at beginning
continue;
}
struct socket_lock l;
socket_lock_init(s, &l); //锁l引用套接字数据中的锁
switch (s->type) {
case SOCKET_TYPE_CONNECTING: //套接字正在连接中, 但是还没有连接上, 此时还不能传送信息
return report_connect(ss, s, &l, result); //改变套接字的状态为SOCKET_TYPE_CONNECTED ,监听事件改为可读事件,保存对端ip地址
case SOCKET_TYPE_LISTEN: { //套接字处于监听状态,说明有客户端发送连接请求
int ok = report_accept(ss, s, result); //等待客户端的连接请求
if (ok > 0) {
return SOCKET_ACCEPT; //正常连接
} if (ok < 0 ) {
return SOCKET_ERR; //描述符超出限制
}
// when ok == 0, retry
break;
}
case SOCKET_TYPE_INVALID: //套接字信息不存在
fprintf(stderr, "socket-server: invalid socket\n");
break;
default: //如果套接字信息存储着,已经正常连接好,并且不是客户端请求连接服务端,则进行下列处理
if (e->read) { //可读事件
int type;
if (s->protocol == PROTOCOL_TCP) { //如果是TCP通信
type = forward_message_tcp(ss, s, &l, result); //读取数据
} else { //如果是UDP通信
type = forward_message_udp(ss, s, &l, result); //接收UDP数据
if (type == SOCKET_UDP) { //如果接收到UDP数据
// try read again
--ss->event_index; //下次还会尝试去读取一次数据
return SOCKET_UDP;
}
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
// Try to dispatch write message next step if write flag set.
e->read = false;
--ss->event_index; //下次会再尝试去写数据
}
if (type == -1)
break;
return type;
}
if (e->write) { //可写事件
int type = send_buffer(ss, s, &l, result); //发送套接字中写缓存中的数据
if (type == -1)
break;
return type;
}
if (e->error) { //错误事件
// close when error
int error;
socklen_t len = sizeof(error);
int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len); //获得套接字s->fd上的获取错误状态并清除,无错误发生返回0
const char * err = NULL;
if (code < 0) {
err = strerror(errno);
} else if (error != 0) {
err = strerror(error);
} else {
err = "Unknown error";
}
force_close(ss, s, &l, result); //强制关闭套接字
result->data = (char *)err; //记录错误原因
return SOCKET_ERR; //返回错误
}
break;
}
}
}
//用于检查是否有命令,通过一个select监听读管道fd是否有可读事件,有则返回1,否则返回0,
static int has_cmd(struct socket_server *ss) {
struct timeval tv = {0,0};
int retval;
FD_SET(ss->recvctrl_fd, &ss->rfds); //将管道读fd加入读描述符集合
retval = select(ss->recvctrl_fd+1, &ss->rfds, NULL, NULL, &tv); //创建一个select,不阻塞
if (retval == 1) {
return 1;
}
return 0;
}
//从读管道中取出相应的命令及附带的数据进行处理,result保存各个命令处理的结果信息,
static int ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
int fd = ss->recvctrl_fd;
// the length of message is one byte, so 256+8 buffer size is enough.
uint8_t buffer[256]; //数据内容缓存
uint8_t header[2]; //命令缓存
block_readpipe(fd, header, sizeof(header)); //读取读管道中指定长度的命令内容
int type = header[0]; //命令的类型
int len = header[1]; //命令附带的数据长度
block_readpipe(fd, buffer, len); //读取读管道中指定长度的数据内容
// ctrl command only exist in local fd, so don't worry about endian.
switch (type) {
case 'S': //开始添加套接字到epoll进行可读事件监听,改变套接字的状态为 SOCKET_TYPE_CONNECTED 或 SOCKET_TYPE_LISTEN
return start_socket(ss,(struct request_start *)buffer, result); //成功返回SOCKET_OPEN,否则返回-1或者SOCKET_ERR
case 'B': //绑定一个由外部生成的套接字,并添加到epoll监听读事件,设置套接字为非阻塞模式,改变状态为SOCKET_TYPE_BIND
return bind_socket(ss,(struct request_bind *)buffer, result); //成功返回SOCKET_OPEN,否则为SOCKET_ERR
case 'L': //将已经监听的套接字添加到套接字信息结构中,但不添加到epoll中监听事件,套接字的状态由SOCKET_TYPE_RESERVE变为SOCKET_TYPE_PLISTEN
return listen_socket(ss,(struct request_listen *)buffer, result); //成功返回-1,否则返回SOCKET_ERR
case 'K': //关闭套接字,如果是强制关闭或没有数据则直接关闭,否则如果有数据则先发送完数据,再将套接字状态设置为SOCKET_TYPE_HALFCLOSE,
return close_socket(ss,(struct request_close *)buffer, result);
case 'O': //发起TCP连接服务端请求
return open_socket(ss, (struct request_open *)buffer, result); //返回-1表示正在连接中,返回SOCKET_OPEN表示已连接成功,返回SOCKET_ERR失败
case 'X': //整个套接字服务退出
result->opaque = 0;
result->id = 0;
result->ud = 0;
result->data = NULL;
return SOCKET_EXIT;
case 'D': //向套接字发送数据,将数据添加到高优先级写缓存中
return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH, NULL);
case 'P': //向套接字发送数据,将数据添加到低优先级写缓存中
return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW, NULL);
case 'A': { //UDP协议,向套接字发送数据,未发送完添加到高优先级写缓存队列中
struct request_send_udp * rsu = (struct request_send_udp *)buffer;
return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address);
}
case 'C': //设置指定套接字信息中的ip地址,前提是套接字信息有效及协议类型匹配,此过程中s->udpconnecting大于0
return set_udp_address(ss, (struct request_setudp *)buffer, result); //协议不匹配返回SOCKET_ERR,否则返回-1
case 'T': //设置套接字的选项,选项的层次在 IPPROTO_TCP 上 , 设置的键和值都是 int 类型的,
setopt_socket(ss, (struct request_setopt *)buffer); //目前仅用于设置套接字的 TCP_NODELAY 选项,request->what为1禁止发送合并的Nagle算法
return -1;
case 'U': //添加产生的套接字到分配的套接字信息结构中,并添加可读事件的监听,修改套接字的状态为 SOCKET_TYPE_CONNECTED
add_udp_socket(ss, (struct request_udp *)buffer); //添加成功后不关联对端ip地址信息
return -1;
default:
fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type);
return -1;
};
return -1;
}