📄 erl_message.c
字号:
/* ``The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved via the world wide web at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * The Initial Developer of the Original Code is Ericsson Utvecklings AB. * Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings * AB. All Rights Reserved.'' * * $Id$ *//* * Message passing primitives. */#ifdef HAVE_CONFIG_H# include "config.h"#endif#include "sys.h"#include "erl_vm.h"#include "global.h"#include "erl_message.h"#include "erl_process.h"#include "erl_nmgc.h"ERTS_SMP_QUALLOC_IMPL(message, ErlMessage, ERL_MESSAGE_BUF_SZ, ERTS_ALC_T_MSG_REF)#if defined(DEBUG) && 0#define HARD_DEBUG#else#undef HARD_DEBUG#endifvoidinit_message(void){ init_message_alloc();}voidfree_message(ErlMessage* mp){ message_free(mp);}/* Allocate message buffer (size in words) */ErlHeapFragment*new_message_buffer(Uint size){ ErlHeapFragment* bp; bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG, (sizeof(ErlHeapFragment) - sizeof(Eterm) + size*sizeof(Eterm))); bp->next = NULL; bp->size = size; bp->off_heap.mso = NULL;#ifndef HYBRID /* FIND ME! */ bp->off_heap.funs = NULL;#endif bp->off_heap.externals = NULL; bp->off_heap.overhead = 0; return bp;}ErlHeapFragment*erts_resize_message_buffer(ErlHeapFragment *bp, Uint size, Eterm *brefs, Uint brefs_size){#ifdef DEBUG int i;#endif#ifdef HARD_DEBUG ErlHeapFragment *dbg_bp; Eterm *dbg_brefs; Uint dbg_size; Uint dbg_tot_size; Eterm *dbg_hp;#endif ErlHeapFragment* nbp;#ifdef DEBUG { Uint off_sz = size < bp->size ? size : bp->size; for (i = 0; i < brefs_size; i++) { Eterm *ptr; if (is_immed(brefs[i])) continue; ptr = ptr_val(brefs[i]); ASSERT(&bp->mem[0] <= ptr && ptr < &bp->mem[0] + off_sz); } }#endif if (size == bp->size) return bp;#ifdef HARD_DEBUG dbg_brefs = erts_alloc(ERTS_ALC_T_UNDEF, sizeof(Eterm *)*brefs_size); dbg_bp = new_message_buffer(bp->size); dbg_hp = dbg_bp->mem; dbg_tot_size = 0; for (i = 0; i < brefs_size; i++) { dbg_size = size_object(brefs[i]); dbg_tot_size += dbg_size; dbg_brefs[i] = copy_struct(brefs[i], dbg_size, &dbg_hp, &dbg_bp->off_heap); } ASSERT(dbg_tot_size == size < bp->size ? size : bp->size);#endif nbp = (ErlHeapFragment*) ERTS_HEAP_REALLOC(ERTS_ALC_T_HEAP_FRAG, (void *) bp, (sizeof(ErlHeapFragment) - sizeof(Eterm) + bp->size*sizeof(Eterm)), (sizeof(ErlHeapFragment) - sizeof(Eterm) + size*sizeof(Eterm))); if (bp != nbp) { Uint off_sz = size < nbp->size ? size : nbp->size; Eterm *sp = &bp->mem[0]; Eterm *ep = sp + off_sz; Sint offs = &nbp->mem[0] - sp; erts_offset_off_heap(&nbp->off_heap, offs, sp, ep); erts_offset_heap(&nbp->mem[0], off_sz, offs, sp, ep); if (brefs && brefs_size) erts_offset_heap_ptr(brefs, brefs_size, offs, sp, ep);#ifdef DEBUG for (i = 0; i < brefs_size; i++) { Eterm *ptr; if (is_immed(brefs[i])) continue; ptr = ptr_val(brefs[i]); ASSERT(&nbp->mem[0] <= ptr && ptr < &nbp->mem[0] + off_sz); }#endif } nbp->size = size;#ifdef HARD_DEBUG for (i = 0; i < brefs_size; i++) ASSERT(eq(dbg_brefs[i], brefs[i])); free_message_buffer(dbg_bp); erts_free(ERTS_ALC_T_UNDEF, dbg_brefs);#endif return nbp;}voiderts_cleanup_offheap(ErlOffHeap *offheap){ if (offheap->mso) { erts_cleanup_mso(offheap->mso); }#ifndef HYBRID /* FIND ME! */ if (offheap->funs) { erts_cleanup_funs(offheap->funs); }#endif if (offheap->externals) { erts_cleanup_externals(offheap->externals); }}voidfree_message_buffer(ErlHeapFragment* bp){ erts_cleanup_offheap(&bp->off_heap); ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, (void *) bp, (sizeof(ErlHeapFragment) - sizeof(Eterm) + bp->size*sizeof(Eterm)));}static ERTS_INLINE voidlink_mbuf_to_proc(Process *proc, ErlHeapFragment *bp){ if (bp) { /* Link the message buffer */ bp->next = MBUF(proc); MBUF(proc) = bp; MBUF_SIZE(proc) += bp->size;#if defined(HEAP_FRAG_ELIM_TEST) MSO(proc).overhead += proc->heap_sz; /* Force GC */#else MSO(proc).overhead += (sizeof(ErlHeapFragment) / sizeof(Eterm) - 1);#endif /* Move any binaries into the process */ if (bp->off_heap.mso != NULL) { ProcBin** next_p = &bp->off_heap.mso; while (*next_p != NULL) { next_p = &((*next_p)->next); } *next_p = MSO(proc).mso; MSO(proc).mso = bp->off_heap.mso; bp->off_heap.mso = NULL; MSO(proc).overhead += bp->off_heap.overhead; } /* Move any funs into the process */#ifndef HYBRID if (bp->off_heap.funs != NULL) { ErlFunThing** next_p = &bp->off_heap.funs; while (*next_p != NULL) { next_p = &((*next_p)->next); } *next_p = MSO(proc).funs; MSO(proc).funs = bp->off_heap.funs; bp->off_heap.funs = NULL; }#endif /* Move any external things into the process */ if (bp->off_heap.externals != NULL) { ExternalThing** next_p = &bp->off_heap.externals; while (*next_p != NULL) { next_p = &((*next_p)->next); } *next_p = MSO(proc).externals; MSO(proc).externals = bp->off_heap.externals; bp->off_heap.externals = NULL; } }}/* Add a message last in message queue */voiderts_queue_message(Process* receiver, Uint32 receiver_locks, ErlHeapFragment* bp, Eterm message, Eterm seq_trace_token){ ErlMessage* mp; ERTS_SMP_LC_ASSERT(receiver_locks == erts_proc_lc_my_proc_locks(receiver)); ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCKS_MSG_SEND & receiver_locks) == ERTS_PROC_LOCKS_MSG_SEND);#ifdef ERTS_SMP if (ERTS_PROC_PENDING_EXIT(receiver)) { /* Drop message if receiver has a pending exit ... */ if (bp) free_message_buffer(bp); return; }#endif mp = message_alloc(); ERL_MESSAGE_TERM(mp) = message; ERL_MESSAGE_TOKEN(mp) = seq_trace_token; mp->next = NULL;#ifdef ERTS_SMP if (receiver_locks & ERTS_PROC_LOCK_MAIN) {#if defined(HEAP_FRAG_ELIM_TEST) mp->bp = bp;#else mp->bp = NULL; link_mbuf_to_proc(receiver, bp);#endif /* * We move 'in queue' to 'private queue' and place * message at the end of 'private queue' in order * to ensure that the 'in queue' doesn't contain * references into the heap. By ensuring this, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); } else { mp->bp = bp; LINK_MESSAGE(receiver, mp); }#else#if defined(HEAP_FRAG_ELIM_TEST) mp->bp = bp;#else link_mbuf_to_proc(receiver, bp);#endif LINK_MESSAGE(receiver, mp);#endif ACTIVATE(receiver); if (receiver->status == P_WAITING) { add_to_schedule_q(receiver); } else if (receiver->status == P_SUSPENDED) { receiver->rstatus = P_RUNABLE; } if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); }#ifndef ERTS_SMP ERTS_HOLE_CHECK(receiver);#endif}voiderts_link_mbuf_to_proc(struct process *proc, ErlHeapFragment *bp){ link_mbuf_to_proc(proc, bp);#ifdef HEAP_FRAG_ELIM_TEST { Eterm* htop = HEAP_TOP(proc); if (htop < HEAP_LIMIT(proc)) { *htop = make_pos_bignum_header(HEAP_LIMIT(proc)-htop-1); HEAP_TOP(proc) = HEAP_LIMIT(proc); } }#endif}#if defined(ERTS_SMP) || defined(HEAP_FRAG_ELIM_TEST)/* * Moves content of message buffer attached to a message into a heap. * The message buffer is deallocated. */voiderts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg){ Uint **oh_list_pp, **oh_el_next_pp; Uint *oh_el_p; Eterm term, token, *fhp, *hp; Sint offs; Uint sz; ErlHeapFragment *bp;#ifdef HARD_DEBUG ProcBin *dbg_mso_start = off_heap->mso; ErlFunThing *dbg_fun_start = off_heap->funs; ExternalThing *dbg_external_start = off_heap->externals; Eterm dbg_term, dbg_token; ErlHeapFragment *dbg_bp; Uint *dbg_hp, *dbg_thp_start; Uint dbg_term_sz, dbg_token_sz;#endif bp = msg->bp; term = ERL_MESSAGE_TERM(msg); token = ERL_MESSAGE_TOKEN(msg); if (!bp) { ASSERT(is_immed(term) && is_immed(token)); return; }#ifdef HARD_DEBUG dbg_term_sz = size_object(term); dbg_token_sz = size_object(token); ASSERT(bp->size == dbg_term_sz + dbg_token_sz); dbg_bp = new_message_buffer(bp->size); dbg_hp = dbg_bp->mem; dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap); dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap); dbg_thp_start = *hpp;#endif ASSERT(bp); msg->bp = NULL; off_heap->overhead += bp->off_heap.overhead; sz = bp->size;#ifdef DEBUG if (is_not_immed(term)) { ASSERT(bp->mem <= ptr_val(term)); ASSERT(bp->mem + bp->size > ptr_val(term)); } if (is_not_immed(token)) { ASSERT(bp->mem <= ptr_val(token)); ASSERT(bp->mem + bp->size > ptr_val(token)); }#endif fhp = bp->mem; hp = *hpp; offs = hp - fhp; oh_list_pp = NULL; oh_el_next_pp = NULL; /* Shut up compiler warning */ oh_el_p = NULL; /* Shut up compiler warning */ while (sz--) { Uint cpy_sz; Eterm val = *fhp++; switch (primary_tag(val)) { case TAG_PRIMARY_IMMED1: *hp++ = val; break; case TAG_PRIMARY_LIST: case TAG_PRIMARY_BOXED: ASSERT(bp->mem <= ptr_val(val)); ASSERT(bp->mem + bp->size > ptr_val(val)); *hp++ = offset_ptr(val, offs); break; case TAG_PRIMARY_HEADER: *hp++ = val; switch (val & _HEADER_SUBTAG_MASK) { case ARITYVAL_SUBTAG: break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -