⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 erl_message.c

📁 OTP是开放电信平台的简称
💻 C
📖 第 1 页 / 共 2 页
字号:
	    case REFC_BINARY_SUBTAG:		oh_list_pp = (Uint **) &off_heap->mso;		oh_el_p = (Uint *) (hp-1);		oh_el_next_pp = (Uint **) &((ProcBin *) oh_el_p)->next;		cpy_sz = thing_arityval(val);		goto cpy_words;	    case FUN_SUBTAG:#ifndef HYBRID		oh_list_pp = (Uint **) &off_heap->funs;		oh_el_p = (Uint *) (hp-1);		oh_el_next_pp = (Uint **) &((ErlFunThing *) oh_el_p)->next;#endif		cpy_sz = thing_arityval(val);		goto cpy_words;	    case EXTERNAL_PID_SUBTAG:	    case EXTERNAL_PORT_SUBTAG:	    case EXTERNAL_REF_SUBTAG:		oh_list_pp = (Uint **) &off_heap->externals;		oh_el_p = (Uint *) (hp-1);		oh_el_next_pp = (Uint **) &((ExternalThing *) oh_el_p)->next;		cpy_sz = thing_arityval(val);		goto cpy_words;	    default:		cpy_sz = header_arity(val);	    cpy_words:		sz -= cpy_sz;		while (cpy_sz >= 8) {		    cpy_sz -= 8;		    *hp++ = *fhp++;		    *hp++ = *fhp++;		    *hp++ = *fhp++;		    *hp++ = *fhp++;		    *hp++ = *fhp++;		    *hp++ = *fhp++;		    *hp++ = *fhp++;		    *hp++ = *fhp++;		}		switch (cpy_sz) {		case 7: *hp++ = *fhp++;		case 6: *hp++ = *fhp++;		case 5: *hp++ = *fhp++;		case 4: *hp++ = *fhp++;		case 3: *hp++ = *fhp++;		case 2: *hp++ = *fhp++;		case 1: *hp++ = *fhp++;		default: break;		}		if (oh_list_pp) {#ifdef HARD_DEBUG		    Uint *dbg_old_oh_list_p = *oh_list_pp;#endif		    /* Add to offheap list */		    *oh_el_next_pp = *oh_list_pp;		    *oh_list_pp = oh_el_p;		    ASSERT(*hpp <= oh_el_p);		    ASSERT(hp > oh_el_p);#ifdef HARD_DEBUG		    switch (val & _HEADER_SUBTAG_MASK) {		    case REFC_BINARY_SUBTAG:			ASSERT(off_heap->mso == (ProcBin *) *oh_list_pp);			ASSERT(off_heap->mso->next			       == (ProcBin *) dbg_old_oh_list_p);			break;#ifndef HYBRID		    case FUN_SUBTAG:			ASSERT(off_heap->funs == (ErlFunThing *) *oh_list_pp);			ASSERT(off_heap->funs->next			       == (ErlFunThing *) dbg_old_oh_list_p);			break;#endif		    case EXTERNAL_PID_SUBTAG:		    case EXTERNAL_PORT_SUBTAG:		    case EXTERNAL_REF_SUBTAG:			ASSERT(off_heap->externals			       == (ExternalThing *) *oh_list_pp);			ASSERT(off_heap->externals->next			       == (ExternalThing *) dbg_old_oh_list_p);			break;		    default:			ASSERT(0);		    }#endif		    oh_list_pp = NULL;		}		break;	    }	    break;	}    }    ASSERT(bp->size == hp - *hpp);    *hpp = hp;    if (is_not_immed(token)) {	ASSERT(bp->mem <= ptr_val(token));	ASSERT(bp->mem + bp->size > ptr_val(token));	ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs);#ifdef HARD_DEBUG	ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg)));	ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg)));#endif    }    if (is_not_immed(term)) {	ASSERT(bp->mem <= ptr_val(term));	ASSERT(bp->mem + bp->size > ptr_val(term));	ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs);#ifdef HARD_DEBUG	ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg)));	ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg)));#endif    }#ifdef HARD_DEBUG    {	int i, j;	{	    ProcBin *mso = off_heap->mso;	    i = j = 0;	    while (mso != dbg_mso_start) {		mso = mso->next;		i++;	    }	    mso = bp->off_heap.mso;	    while (mso) {		mso = mso->next;		j++;	    }	    ASSERT(i == j);	}	{	    ErlFunThing *fun = off_heap->funs;	    i = j = 0;	    while (fun != dbg_fun_start) {		fun = fun->next;		i++;	    }	    fun = bp->off_heap.funs;	    while (fun) {		fun = fun->next;		j++;	    }	    ASSERT(i == j);	}	{	    ExternalThing *external = off_heap->externals;	    i = j = 0;	    while (external != dbg_external_start) {		external = external->next;		i++;	    }	    external = bp->off_heap.externals;	    while (external) {		external = external->next;		j++;	    }	    ASSERT(i == j);	}    }#endif	        bp->off_heap.mso = NULL;#ifndef HYBRID    bp->off_heap.funs = NULL;#endif    bp->off_heap.externals = NULL;    free_message_buffer(bp);#ifdef HARD_DEBUG    ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term));    ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token));    free_message_buffer(dbg_bp);#endif}voiderts_move_msg_mbuf_to_proc_mbufs(Process *p, ErlMessage *msg){    link_mbuf_to_proc(p, msg->bp);    msg->bp = NULL;}#endif/* * Send a local message when sender & receiver processes are known. */voiderts_send_message(Process* sender,		  Process* receiver,		  Uint32 *receiver_locks,		  Eterm message,		  unsigned flags){    ErlHeapFragment* bp = NULL;    Eterm token = NIL;    BM_STOP_TIMER(system);    BM_MESSAGE(message,sender,receiver);    BM_START_TIMER(send);    if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {        Uint msize;        Eterm* hp;        BM_SWAP_TIMER(send,size);        msize = size_object(message);        BM_SWAP_TIMER(size,send);	seq_trace_update_send(sender);	seq_trace_output(SEQ_TRACE_TOKEN(sender), message, SEQ_TRACE_SEND, 			 receiver->id, sender);	bp = new_message_buffer(msize + 6 /* TUPLE5 */);	hp = bp->mem;        BM_SWAP_TIMER(send,copy);	token = copy_struct(SEQ_TRACE_TOKEN(sender),			    6 /* TUPLE5 */,			    &hp,			    &bp->off_heap);	message = copy_struct(message, msize, &hp, &bp->off_heap);        BM_MESSAGE_COPIED(msize);        BM_SWAP_TIMER(copy,send);        erts_queue_message(receiver,			   *receiver_locks,			   bp,			   message,			   token);        BM_SWAP_TIMER(send,system);#ifdef HYBRID    } else {        ErlMessage* mp = message_alloc();        BM_SWAP_TIMER(send,copy);#ifdef INCREMENTAL        /* TODO: During GC activate processes if the message relies in         * the fromspace and the sender is active. During major         * collections add the message to the gray stack if it relies         * in the old generation and the sender is active and the         * receiver is inactive.        if (!IS_CONST(message) && (ma_gc_flags & GC_CYCLE) &&            (ptr_val(message) >= inc_fromspc &&            ptr_val(message) < inc_fromend) && INC_IS_ACTIVE(sender))            INC_ACTIVATE(receiver);        else if (!IS_CONST(message) && (ma_gc_flags & GC_CYCLE) &&            (ptr_val(message) >= global_old_heap &&            ptr_val(message) < global_old_hend) &&            INC_IS_ACTIVE(sender) && !INC_IS_ACTIVE(receiver))            Mark message in blackmap and add it to the gray stack        */         if (!IS_CONST(message))            INC_ACTIVATE(receiver);#endif        LAZY_COPY(sender,message);        BM_SWAP_TIMER(copy,send);        ERL_MESSAGE_TERM(mp) = message;        ERL_MESSAGE_TOKEN(mp) = NIL;        mp->next = NULL;        LINK_MESSAGE(receiver, mp);        ACTIVATE(receiver);        if (receiver->status == P_WAITING) {            add_to_schedule_q(receiver);        } else if (receiver->status == P_SUSPENDED) {            receiver->rstatus = P_RUNABLE;        }        if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {            trace_receive(receiver, message);        }        BM_SWAP_TIMER(send,system);        return;#else    } else if (sender == receiver) {	/* Drop message if receiver has a pending exit ... */	if (!ERTS_PROC_PENDING_EXIT(receiver)) {	    ErlMessage* mp = message_alloc();#if defined(ERTS_SMP) || defined(HEAP_FRAG_ELIM_TEST)	    mp->bp = NULL;#endif	    ERL_MESSAGE_TERM(mp) = message;	    ERL_MESSAGE_TOKEN(mp) = NIL;	    mp->next = NULL;	    /*	     * We move 'in queue' to 'private queue' and place	     * message at the end of 'private queue' in order	     * to ensure that the 'in queue' doesn't contain	     * references into the heap. By ensuring this,	     * we don't need to include the 'in queue' in	     * the root set when garbage collecting.	     */	    ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);	    LINK_MESSAGE_PRIVQ(receiver, mp);	    if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {		trace_receive(receiver, message);	    }	}        BM_SWAP_TIMER(send,system);	return;    } else {#ifdef ERTS_SMP        Uint msz;	ErlOffHeap *ohp;        Eterm *hp;	/* Drop message if receiver has a pending exit ... */	if (!ERTS_PROC_PENDING_EXIT(receiver)) {	    BM_SWAP_TIMER(send,size);	    msz = size_object(message);	    BM_SWAP_TIMER(size,send);	    hp = erts_alloc_message_heap(msz,&bp,&ohp,receiver,receiver_locks);	    BM_SWAP_TIMER(send,copy);	    message = copy_struct(message, msz, &hp, ohp);	    BM_MESSAGE_COPIED(msz);	    BM_SWAP_TIMER(copy,send);	    erts_queue_message(receiver, 			       *receiver_locks,			       bp, message, token);	}        BM_SWAP_TIMER(send,system);#else	ErlMessage* mp = message_alloc();        Uint msize;        Eterm *hp;        BM_SWAP_TIMER(send,size);        msize = size_object(message);        BM_SWAP_TIMER(size,send);	if (receiver->stop - receiver->htop <= msize) {            BM_SWAP_TIMER(send,system);	    erts_garbage_collect(receiver, msize, receiver->arg_reg, receiver->arity);            BM_SWAP_TIMER(system,send);	}	hp = receiver->htop;	receiver->htop = hp + msize;        BM_SWAP_TIMER(send,copy);	message = copy_struct(message, msize, &hp, &receiver->off_heap);        BM_MESSAGE_COPIED(msize);        BM_SWAP_TIMER(copy,send);	ERL_MESSAGE_TERM(mp) = message;	ERL_MESSAGE_TOKEN(mp) = NIL;	mp->next = NULL;#if defined(HEAP_FRAG_ELIM_TEST)	mp->bp = NULL;#endif	LINK_MESSAGE(receiver, mp);	if (receiver->status == P_WAITING) {	    add_to_schedule_q(receiver);	} else if (receiver->status == P_SUSPENDED) {	    receiver->rstatus = P_RUNABLE;	}	if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {	    trace_receive(receiver, message);	}        BM_SWAP_TIMER(send,system);#endif /* #ifndef ERTS_SMP */	return;#endif /* HYBRID */    }}/* * This function delivers an EXIT message to a process * which is trapping EXITs. */voiderts_deliver_exit_message(Eterm from, Process *to, Uint32 *to_locksp,			  Eterm reason, Eterm token){    Eterm mess;    Eterm save;    Eterm from_copy;    Uint sz_reason;    Uint sz_token;    Uint sz_from;    Eterm* hp;    Eterm temptoken;    ErlHeapFragment* bp = NULL;    if (token != NIL) {	ASSERT(is_tuple(token));	sz_reason = size_object(reason);	sz_token = size_object(token);	sz_from = size_object(from);	bp = new_message_buffer(sz_reason + sz_from + sz_token + 4);	hp = bp->mem;	mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap);	from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap);	save = TUPLE3(hp, am_EXIT, from_copy, mess);	hp += 4;	/* the trace token must in this case be updated by the caller */	seq_trace_output(token, save, SEQ_TRACE_SEND, to->id, NULL);	temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap);	erts_queue_message(to, *to_locksp, bp, save, temptoken);    } else {	ErlOffHeap *ohp;	sz_reason = size_object(reason);	sz_from = IS_CONST(from) ? 0 : size_object(from);	hp = erts_alloc_message_heap(sz_reason+sz_from+4,				     &bp,				     &ohp,				     to,				     to_locksp);	mess = copy_struct(reason, sz_reason, &hp, ohp);	from_copy = (IS_CONST(from)		     ? from		     : copy_struct(from, sz_from, &hp, ohp));	save = TUPLE3(hp, am_EXIT, from_copy, mess);	erts_queue_message(to, *to_locksp, bp, save, NIL);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -