1.6. Skynet消息处理
在工作线程中调用skynet_server.c文件中的skynet_context_message_dispatch函数进行消息分发,该函数的原理为:如果传入的第二个参数服务队列为NULL则从全局队列中取服务队列信息,通过服务队列信息获得定位服务的编号和服务信息,默认线程只处理服务队列中的一条消息,通过每个工作线程的weight可以改变每次处理的消息的数量,从服务队列中取出消息如果该服务有回调函数则调用回调函数进行处理(在dispatch_message函数中),如果服务队列中的消息都处理完了,则该服务队列直到有消息时才添加到全局队列中,此时返回全局队列中的下一个服务队列,当服务队列中还有消息,但本次处理的消息数量已完成,则将该服务队列压入全局队列,返回全局队列中的下一个服务队列。
//消息分发
struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
//如果服务队列q为空,则会从全局队列中去取
if (q == NULL) {
q = skynet_globalmq_pop(); //从全局队列中取出服务队列,并删除全局队列中的
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q); //获取服务的handle
struct skynet_context * ctx = skynet_handle_grab(handle); //根据服务编号(包含节点号和服务号),获得服务信息,会增加服务信息的引用计数
if (ctx == NULL) { //如果服务信息为NULL,则释放服务队列信息
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop(); //返回全局队列中的下一个服务队列
}
int i,n=1;
struct skynet_message msg;
for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) { //从服务队列中取出消息
skynet_context_release(ctx); //递减服务信息的引用计数,如果计数为0则释放
return skynet_globalmq_pop(); //本服务队列暂无消息,不会返回全局队列,返回下一个服务队列
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q); //获得消息的数量
n >>= weight; //根据weight来决定线程本次处理服务队列中消息的数量
}
int overload = skynet_mq_overload(q); //获得消息数量超出阈值时的消息数量,并清零记录值
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload); //将错误信息输出到logger服务
}
skynet_monitor_trigger(sm, msg.source , handle); //记录消息源、目的地、version增1,用于监测线程监测该线程是否卡死与某条消息的处理
if (ctx->cb == NULL) { //如果服务没有注册回调函数则释放掉消息内容
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg); //有回调函数调用相应的回调函数进行处理
}
skynet_monitor_trigger(sm, 0,0); //清除记录的消息源、目的地
}
assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop(); //取出下一个服务队列
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q); //将当前处理的服务队列压入全局队列
q = nq;
}
skynet_context_release(ctx); //递减服务信息的引用计数,如果计数为0则释放
return q;
}
//对分发的消息进行处理
static void dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle)); //将handle与线程关联
int type = msg->sz >> MESSAGE_TYPE_SHIFT; //获得消息类型
size_t sz = msg->sz & MESSAGE_TYPE_MASK; //获得消息的大小
if (ctx->logfile) { //如果打开了日志文件,则将消息输出到日志文件
skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count; //记录处理消息的数量
int reserve_msg;
if (ctx->profile) { //记录消耗CPU时间
ctx->cpu_start = skynet_thread_time();
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); //调用服务回调进行消息处理
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); //调用服务回调进行消息处理
}
if (!reserve_msg) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
//根据服务编号(包含节点号和服务号),获得服务信息
struct skynet_context * skynet_handle_grab(uint32_t handle) {
struct handle_storage *s = H; //全局所有的服务信息
struct skynet_context * result = NULL;
rwlock_rlock(&s->lock);
uint32_t hash = handle & (s->slot_size-1); //获得服务编号对应的服务信息的存储位置
struct skynet_context * ctx = s->slot[hash]; //获得服务信息
if (ctx && skynet_context_handle(ctx) == handle) {
result = ctx;
skynet_context_grab(result); //增加服务信息的引用计数
}
rwlock_runlock(&s->lock);
return result;
}