1.4. Skynet服务管理
每个服务通过一个32位的无符号整数进行标识,整数的高8位用于定位属于哪个节点,低24位用于定位属于哪个服务。本地节点号通过HARBOR进行存储位于文件skynet_harbor.c中,全局的所有服务信息通过H进行存储位于文件skynet_handle.c中。其中涉及到的结构有:
struct handle_name {
char * name; //服务名字
uint32_t handle; //用于定位服务,高8位为节点编号,低24位为服务号
};
//全局服务信息结构体
struct handle_storage {
struct rwlock lock; //读写锁
uint32_t harbor; //定位节点
uint32_t handle_index; //服务的信息下一个存储到slot的第几个
int slot_size; //存储服务信息数组的大小
struct skynet_context ** slot; //存储服务信息数组
int name_cap; //服务名数组的容量
int name_count; //服务名数组当前存的服务名的数量
struct handle_name *name; //服务名及定位服务的handle
};
每个服务的信息存储结构体定义于文件skynet_server.c中,该文件实现了服务的基本逻辑功能,并且在该文件中定义了一个结构体用于记录该节点上全局服务的状态信息,相应的结构体为:
//每个服务的信息存储结构体
struct skynet_context {
void * instance; //动态连接库的实例指针 即服务的实例指针
struct skynet_module * mod; //指定已加载的动态库信息
void * cb_ud; //服务回调的对象
skynet_cb cb; //服务的回调函数
struct message_queue *queue; //服务队列
FILE * logfile; //日志输出的文件
uint64_t cpu_cost; // in microsec //消耗CUP时间 精确到微秒
uint64_t cpu_start; // in microsec //本线程到当前代码系统CPU花费的时间
char result[32]; //存储相应指令名执行的函数返回结果
uint32_t handle; //存储带有节点号的服务号
int session_id; //用于为消息分配一个session
int ref; //服务信息的引用计数
int message_count; //记录处理消息的数量
bool init; //服务是否初始化
bool endless; //标记服务是否陷入死循环
bool profile; //是否开启CPU耗时监测
CHECKCALLING_DECL //锁
};
//全局服务相关的信息
struct skynet_node {
int total; //记录总的服务的数量
int init; //标记全局服务信息是否初始化
uint32_t monitor_exit; //检测将要退出的服务
pthread_key_t handle_key; //与线程相关联的handle
bool profile; //是否开启CPU耗时监测 默认开启
};
首先,初始化全局服务信息,最先初始化的为全局服务相关的信息,在main函数中调用skynet_globalinit函数,在skynet_start.c文件中的skynet_start函数调用skynet_harbor.c文件中的skynet_harbor_init函数对节点信息进行初始化,skynet_start函数调用skynet_handle.c文件中的skynet_handle_init函数对全局服务信息进行初始化。
//初始化与服务相关的全局信息
void skynet_globalinit(void) {
G_NODE.total = 0;
G_NODE.monitor_exit = 0;
G_NODE.init = 1;
if (pthread_key_create(&G_NODE.handle_key, NULL)) { //创建线程私有数据的key
fprintf(stderr, "pthread_key_create failed");
exit(1);
}
// set mainthread's key
skynet_initthread(THREAD_MAIN); //初始化主线程私有数据的key
}
void skynet_harbor_init(int harbor) {
HARBOR = (unsigned int)harbor << HANDLE_REMOTE_SHIFT; //将节点信息存于高8位
}
void skynet_handle_init(int harbor) {
assert(H==NULL);
struct handle_storage * s = skynet_malloc(sizeof(*H));
s->slot_size = DEFAULT_SLOT_SIZE; //存储服务信息数组的大小,随着服务数量的增加会翻倍增加
s->slot = skynet_malloc(s->slot_size * sizeof(struct skynet_context *)); //分配内存
memset(s->slot, 0, s->slot_size * sizeof(struct skynet_context *));
rwlock_init(&s->lock);
// reserve 0 for system
s->harbor = (uint32_t) (harbor & 0xff) << HANDLE_REMOTE_SHIFT; //本节点的节点号
s->handle_index = 1; //初始化从slot数组第一个开始存服务信息
s->name_cap = 2; //初始化容量为2,随着服务数量的增加,这个容量值翻倍增加
s->name_count = 0;
s->name = skynet_malloc(s->name_cap * sizeof(struct handle_name));
H = s;
// Don't need to free H
}
然后,新建服务,通过调用skynet_server.c中的skynet_context_new函数新建服务,该函数需要传入两个参数,第一个参数为服务的名称,第二个为服务初始化时传入的参数。这个函数的大体逻辑为:先调用skynet_module_query函数获得相应服务名称的动态链接库信息(详情请看1.7节),其次调用skynet_module_instance_create创建服务实例,而后对服务的信息进行初始化,并调用skynet_module_instance_init函数初始化相应的服务实例,最后调用skynet_globalmq_push函数将该服务添加到全局队列中。
//新建一个服务信息
struct skynet_context * skynet_context_new(const char * name, const char *param) {
struct skynet_module * mod = skynet_module_query(name); //获得指定文件名的动态连接库信息
if (mod == NULL)
return NULL;
void *inst = skynet_module_instance_create(mod); //调用相应动态库的库文件名_create的API函数
if (inst == NULL)
return NULL;
struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
CHECKCALLING_INIT(ctx)
ctx->mod = mod; //服务对应的动态库信息
ctx->instance = inst; //对应的动态库_create的函数返回的指针
ctx->ref = 2; //服务信息的引用计数,初始化完后会减1
ctx->cb = NULL; //服务的回调函数
ctx->cb_ud = NULL; //服务回调的对象
ctx->session_id = 0; //用于为消息分配一个session
ctx->logfile = NULL; //日志输出的文件
ctx->init = false; //服务是否初始化
ctx->endless = false; //服务是否陷入死循环
ctx->cpu_cost = 0; //消耗CUP时间 精确到微秒
ctx->cpu_start = 0; //本线程到当前代码系统CPU花费的时间
ctx->message_count = 0; //记录处理消息的数量
ctx->profile = G_NODE.profile; //是否开启CPU耗时监测
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
ctx->handle = 0; //存储带有节点号的服务号
ctx->handle = skynet_handle_register(ctx); //将服务信息存储到全局服务信息中,并产生一个定位服务的编号
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle); //创建服务队列
// init function maybe use ctx->handle, so it must init at last
context_inc(); //服务总数加1
CHECKCALLING_BEGIN(ctx) //尝试获得锁
int r = skynet_module_instance_init(mod, inst, ctx, param); //调用相应动态库的库文件名_init的API函数
CHECKCALLING_END(ctx) //释放锁
if (r == 0) {
struct skynet_context * ret = skynet_context_release(ctx); //服务信息的引用计数减1
if (ret) {
ctx->init = true; //标记服务初始化成功
}
skynet_globalmq_push(queue); //将服务队列添加到全局队列
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
} else {
skynet_error(ctx, "FAILED launch %s", name);
uint32_t handle = ctx->handle;
skynet_context_release(ctx); //递减服务信息的引用计数,如果计数为0则释放
skynet_handle_retire(handle); //将指定的服务信息从全局的服务信息数字中剔除掉
struct drop_t d = { handle };
skynet_mq_release(queue, drop_message, &d); //将服务队列释放
return NULL;
}
}