1.3. Skynet工作逻辑
当从终端输入命令“./skynet examples/config”命令时,启动程序skynet,首先调用skynet_main.c文件中的main函数,examples/config将作为argv[1]参数传入。在main函数中主要的工作为:初始化全局信息,加载配置文件,调用skynet_start.c中的skynet_start函数。skynet_start函数的主要的工作为:根据加载的配置信息初始化全局信息,创建一个logger服务,调用bootstrap函数创建一个snlua服务,调用start函数创建一个监测线程、一个定时器线程、一个套接字线程以及配置配的相应数量的工作线程。
skynet_main.c文件中的main函数源码注释
int main(int argc, char *argv[]) {
const char * config_file = NULL ;
if (argc > 1) {
config_file = argv[1];
} else {
fprintf(stderr, "Need a config file. Please read skynet wiki : https://github.com/cloudwu/skynet/wiki/Config\n"
"usage: skynet configfilename\n");
return 1;
}
luaS_initshr(); //初始化读写锁
skynet_globalinit(); //初始化主线程
skynet_env_init(); //初始化一个全局的Lua环境
sigign(); //忽略SIGPIPE信号
struct skynet_config config;
struct lua_State *L = luaL_newstate(); //新建一个lua状态机
luaL_openlibs(L); // link lua lib
int err = luaL_loadbufferx(L, load_config, strlen(load_config), "=[skynet config]", "t"); //加载load_config中的代码块
assert(err == LUA_OK);
lua_pushstring(L, config_file); //将配置文件的路径压入栈
err = lua_pcall(L, 1, 1, 0); //运行加载的代码块,并将配置文件路径作为参数传入
if (err) {
fprintf(stderr,"%s\n",lua_tostring(L,-1));
lua_close(L);
return 1;
}
_init_env(L); //将配置文件的键值对添加到lua全局信息内
config.thread = optint("thread",8);
config.module_path = optstring("cpath","./cservice/?.so");
config.harbor = optint("harbor", 1);
config.bootstrap = optstring("bootstrap","snlua bootstrap");
config.daemon = optstring("daemon", NULL);
config.logger = optstring("logger", NULL);
config.logservice = optstring("logservice", "logger");
config.profile = optboolean("profile", 1);
lua_close(L); //消耗上面创建的lua状态机
skynet_start(&config); //skynet开始
skynet_globalexit();
luaS_exitshr();
return 0;
}
skynet_start.c文件中的skynet_start函数源码注释
void skynet_start(struct skynet_config * config) {
// register SIGHUP for log file reopen
//log默认是输出到终端的,如果终端关闭则将log输出到文件
struct sigaction sa;
sa.sa_handler = &handle_hup;
sa.sa_flags = SA_RESTART;
sigfillset(&sa.sa_mask);
sigaction(SIGHUP, &sa, NULL);
//如果配置中有配置,以后台模式启动skynet
if (config->daemon) {
if (daemon_init(config->daemon)) {
exit(1);
}
}
skynet_harbor_init(config->harbor); //初始化节点号
skynet_handle_init(config->harbor); //初始化全局服务信息
skynet_mq_init(); //初始化全局队列
skynet_module_init(config->module_path); //初始化需要加载的动态库的路径
skynet_timer_init(); //初始化计时
skynet_socket_init(); //创建一个epoll
skynet_profile_enable(config->profile); //设置是否开启监测每个服务的CPU耗时标志
struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger); //新建有一个logger服务
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}
bootstrap(ctx, config->bootstrap); //新建一个snlua服务
start(config->thread); //开始工作,创建定时器、监测、套接字和相应数量的工作线程
// harbor_exit may call socket send, so it should exit before socket_free
skynet_harbor_exit();
skynet_socket_free();
if (config->daemon) {
daemon_exit(config->daemon);
}
}
多线程的工作原理:
多线程中涉及到的结构体有:
//用做定时器、监测、套接字和工作线程的运行函数都共享的参数
struct monitor {
int count; //工作线程数量,即配置中配的
struct skynet_monitor ** m; //为每个工作线程存储监测信息的结构体
pthread_cond_t cond; //多线程同步机制中的条件变量
pthread_mutex_t mutex; //多线程同步机制中的互斥锁
int sleep; //记录处于阻塞状态的线程数量
int quit; //标记线程是否退出
};
//为每个工作线程存储监测信息的结构体
struct skynet_monitor {
int version; //没修改一次source和destination自加1
int check_version; //已经检测到的version只,用于和当前的version进行比较,防止线程卡死在某一条消息
uint32_t source; //消息源,定位到发消息的服务
uint32_t destination; //消息发往的目的地
};
//用做工作线程的运行函数的参数
struct worker_parm {
struct monitor *m; //所有线程共享的参数
int id; //每个工作线程的序号
int weight; //标记每个线程每次处理服务队列中的消息数量
};
上述函数skynet_start中调用start函数创建了多个线程,该函数的原理为:首先分配一个所有线程共享的结构体monitor,为每个工作线程创建一个存储监测信息的结构体,初始化所有线程共享的互斥锁和条件变量,然后创建一个监测线程、一个定时器线程,一个套接字线程,最后创建相应数量的工作线程,等待所有线程结束,代码如下:
static void start(int thread) {
pthread_t pid[thread+3];
struct monitor *m = skynet_malloc(sizeof(*m)); //后面创建的线程都共享参数
memset(m, 0, sizeof(*m));
m->count = thread; //工作线程的数量
m->sleep = 0; //记录处于阻塞状态的线程数量
m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *)); //为每个工作线程创建一个存储监测信息的结构体
int i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new(); //为每一个工作线程分配一块监测信息的内存
}
if (pthread_mutex_init(&m->mutex, NULL)) { //初始化互斥锁
fprintf(stderr, "Init mutex error");
exit(1);
}
if (pthread_cond_init(&m->cond, NULL)) { //初始化条件变量
fprintf(stderr, "Init cond error");
exit(1);
}
create_thread(&pid[0], thread_monitor, m); //创建监测线程
create_thread(&pid[1], thread_timer, m); //创建定时器线程
create_thread(&pid[2], thread_socket, m); //创建套接字线程
static int weight[] = { //-1表示每个线程每次处理服务队列中的消息数量为1
-1, -1, -1, -1, 0, 0, 0, 0, //0表示每个线程每次处理服务队列中的所有消息
1, 1, 1, 1, 1, 1, 1, 1, //1表示每个线程每次处理服务队列中的所有消息的1/2
2, 2, 2, 2, 2, 2, 2, 2, //2表示每个线程每次处理服务队列中的所有消息的1/4
3, 3, 3, 3, 3, 3, 3, 3, }; //3表示每个线程每次处理服务队列中的所有消息的1/8
struct worker_parm wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m; //各个线程共享参数部分
wp[i].id = i; //每个工作线程的序号
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]); //创建工作线程
}
for (i=0;i<thread+3;i++) {
pthread_join(pid[i], NULL); //等待各个线程结束
}
free_monitor(m); //释放资源
}
监测线程
监测线程主要工作是监测所有的工作线程是否有卡死在某服务对某条消息的处理,其运行函数thread_monitor的工作原理为:首先初始化该线程的key对应的私有数据块,然后每个5秒对所有的工作线程进行一次检查,调用skynet_monitor.c文件中的skynet_monitor_check函数检测每一个线程是否有卡住在某一条消息的处理。
//检测线程运行函数
static void * thread_monitor(void *p) {
struct monitor * m = p;
int i;
int n = m->count; //工作线程数量
skynet_initthread(THREAD_MONITOR); //初始化该线程对应的私有数据块
for (;;) {
CHECK_ABORT //检测总的服务数量,为0则break
for (i=0;i<n;i++) {
skynet_monitor_check(m->m[i]); //检查工作线程是否陷入死循环
}
for (i=0;i<5;i++) { //睡眠5秒
CHECK_ABORT //检测总的服务数量,为0则break
sleep(1);
}
}
return NULL;
}
//检测某个工作线程是否卡主在某一条消息
void skynet_monitor_check(struct skynet_monitor *sm) {
if (sm->version == sm->check_version) { //当前处理的消息和已经监测到的消息是否相等
if (sm->destination) { //判断消息是否已经被处理
skynet_context_endless(sm->destination); //标记相应的服务陷入死循环
skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)", sm->source , sm->destination, sm->version);
}
} else {
sm->check_version = sm->version;
}
}
定时器线程
定时器线程主要的工作为每隔2500微秒刷新计时、唤醒等待条件触发的工作线程并检查是否有终端关闭的信号,如果有则打开log文件,将log输出到文件中,在刷新计时中会对每个时刻的链表进行相应的处理(详情请看1.8节)。运行函数thread_timer的代码如下:
//定时器线程运行函数
static void * thread_timer(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_TIMER); //初始化该线程对应的私有数据块
for (;;) {
skynet_updatetime(); //刷新时间,详情请看1.8节
CHECK_ABORT //检测总的服务数量,为0则break
wakeup(m,m->count-1); //唤醒等待条件触发的工作线程
usleep(2500); //定时器线程挂起2500微秒
if (SIG) { //如果触发终端关闭的信号SIGHUP,则打开log文件
signal_hup(); //发送服务内部消息打开log文件,将log输出到文件
SIG = 0;
}
}
// wakeup socket thread
skynet_socket_exit(); //正常结束套接字服务
// wakeup all worker thread
pthread_mutex_lock(&m->mutex); //获得互斥锁
m->quit = 1; //设置线程退出标志
pthread_cond_broadcast(&m->cond); //激活所有等待条件触发的线程
pthread_mutex_unlock(&m->mutex); //释放互斥锁
return NULL;
}
//唤醒等待条件触发的线程
static void wakeup(struct monitor *m, int busy) {
if (m->sleep >= m->count - busy) {
// signal sleep worker, "spurious wakeup" is harmless
pthread_cond_signal(&m->cond); //激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;
}
}
//发送服务内部消息打开log文件,将log输出到文件
static void signal_hup() {
// make log file reopen
struct skynet_message smsg;
smsg.source = 0;
smsg.session = 0;
smsg.data = NULL;
smsg.sz = (size_t)PTYPE_SYSTEM << MESSAGE_TYPE_SHIFT;
uint32_t logger = skynet_handle_findname("logger"); //查找logger服务信息的handle
if (logger) {
skynet_context_push(logger, &smsg); //将消息添加到对应的服务队列
}
}
套接字线程
套接字线程用于处理所有的套接字上的事件,刚初始化用于处理命令,线程的推出也有命令控制,并且该线程确保所有的工作线程中至少有一条工作线程是处于运行状态的,确保可以处理套接字上的事件。(关于套接字线程的工作原理请看1.9)
//套接字线程运行函数
static void * thread_socket(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_SOCKET); //初始化该线程对应的私有数据块
for (;;) {
int r = skynet_socket_poll(); //处理所有套接字上的事件,返回处理的结果,将处理的结果及结果信息转发给对应的服务
if (r==0) //线程退出
break;
if (r<0) {
CHECK_ABORT //检测总的服务数量,为0则break
continue;
}
wakeup(m,0); //如果所有工作线程都处于等待状态,则唤醒其中一个
}
return NULL;
}
工作线程
工作线程主要的工作为从全局队列中取出服务队列对其消息进行处理(关于消息的处理请看1.6),其运行函数thread_worker的工作原理为,首先初始化该线程的key对应的私有数据块,然后从全局队列中取出服务队列对其消息进行处理,最后当全局队列中没有服务队列信息时进入等待状态,等待定时器线程或套接字线程触发条件。
//工作线程运行函数
static void * thread_worker(void *p) {
struct worker_parm *wp = p; //当前工作线程的参数
int id = wp->id; //当前工作线程的序号
int weight = wp->weight;
struct monitor *m = wp->m;
struct skynet_monitor *sm = m->m[id];
skynet_initthread(THREAD_WORKER); //初始化该线程对应的私有数据块
struct message_queue * q = NULL;
while (!m->quit) {
q = skynet_context_message_dispatch(sm, q, weight); //消息分发
if (q == NULL) { //如果全局队列中没有服务队列信息,尝试获得互斥锁,等待定时器线程或套接字线程触发条件
if (pthread_mutex_lock(&m->mutex) == 0) { //获得互斥锁,如该锁已被其他工作线程锁住或拥有,则该线程阻塞直到可以
++ m->sleep; //线程阻塞计数加1
// "spurious wakeup" is harmless,
// because skynet_context_message_dispatch() can be call at any time.
if (!m->quit)
pthread_cond_wait(&m->cond, &m->mutex); //等待条件触发
-- m->sleep; //线程阻塞计数减1
if (pthread_mutex_unlock(&m->mutex)) { //释放互斥锁
fprintf(stderr, "unlock mutex error");
exit(1);
}
}
}
}
return NULL;
}