📄 xpc_channel.c
字号:
if (!(ch->flags & XPC_C_RCLOSEREQUEST)) { if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number) & XPC_IPI_CLOSEREQUEST)) { DBUG_ON(ch->delayed_IPI_flags != 0); spin_lock(&part->IPI_lock); XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch_number, XPC_IPI_CLOSEREPLY); spin_unlock(&part->IPI_lock); } spin_unlock_irqrestore(&ch->lock, irq_flags); return; } ch->flags |= XPC_C_RCLOSEREPLY; if (ch->flags & XPC_C_CLOSEREPLY) { /* both sides have finished disconnecting */ xpc_process_disconnect(ch, &irq_flags); } } if (IPI_flags & XPC_IPI_OPENREQUEST) { dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, " "local_nentries=%d) received from partid=%d, " "channel=%d\n", args->msg_size, args->local_nentries, ch->partid, ch->number); if (part->act_state == XPC_P_DEACTIVATING || (ch->flags & XPC_C_ROPENREQUEST)) { spin_unlock_irqrestore(&ch->lock, irq_flags); return; } if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) { ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST; spin_unlock_irqrestore(&ch->lock, irq_flags); return; } DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED | XPC_C_OPENREQUEST))); DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | XPC_C_OPENREPLY | XPC_C_CONNECTED)); /* * The meaningful OPENREQUEST connection state fields are: * msg_size = size of channel's messages in bytes * local_nentries = remote partition's local_nentries */ if (args->msg_size == 0 || args->local_nentries == 0) { /* assume OPENREQUEST was delayed by mistake */ spin_unlock_irqrestore(&ch->lock, irq_flags); return; } ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING); ch->remote_nentries = args->local_nentries; if (ch->flags & XPC_C_OPENREQUEST) { if (args->msg_size != ch->msg_size) { XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes, &irq_flags); spin_unlock_irqrestore(&ch->lock, irq_flags); return; } } else { ch->msg_size = args->msg_size; XPC_SET_REASON(ch, 0, 0); ch->flags &= ~XPC_C_DISCONNECTED; atomic_inc(&part->nchannels_active); } xpc_process_connect(ch, &irq_flags); } if (IPI_flags & XPC_IPI_OPENREPLY) { dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, " "local_nentries=%d, remote_nentries=%d) received from " "partid=%d, channel=%d\n", args->local_msgqueue_pa, args->local_nentries, args->remote_nentries, ch->partid, ch->number); if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) { spin_unlock_irqrestore(&ch->lock, irq_flags); return; } if (!(ch->flags & XPC_C_OPENREQUEST)) { XPC_DISCONNECT_CHANNEL(ch, xpcOpenCloseError, &irq_flags); spin_unlock_irqrestore(&ch->lock, irq_flags); return; } DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST)); DBUG_ON(ch->flags & XPC_C_CONNECTED); /* * The meaningful OPENREPLY connection state fields are: * local_msgqueue_pa = physical address of remote * partition's local_msgqueue * local_nentries = remote partition's local_nentries * remote_nentries = remote partition's remote_nentries */ DBUG_ON(args->local_msgqueue_pa == 0); DBUG_ON(args->local_nentries == 0); DBUG_ON(args->remote_nentries == 0); ch->flags |= XPC_C_ROPENREPLY; ch->remote_msgqueue_pa = args->local_msgqueue_pa; if (args->local_nentries < ch->remote_nentries) { dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new " "remote_nentries=%d, old remote_nentries=%d, " "partid=%d, channel=%d\n", args->local_nentries, ch->remote_nentries, ch->partid, ch->number); ch->remote_nentries = args->local_nentries; } if (args->remote_nentries < ch->local_nentries) { dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new " "local_nentries=%d, old local_nentries=%d, " "partid=%d, channel=%d\n", args->remote_nentries, ch->local_nentries, ch->partid, ch->number); ch->local_nentries = args->remote_nentries; } xpc_process_connect(ch, &irq_flags); } spin_unlock_irqrestore(&ch->lock, irq_flags);}/* * Attempt to establish a channel connection to a remote partition. */static enum xpc_retvalxpc_connect_channel(struct xpc_channel *ch){ unsigned long irq_flags; struct xpc_registration *registration = &xpc_registrations[ch->number]; if (mutex_trylock(®istration->mutex) == 0) { return xpcRetry; } if (!XPC_CHANNEL_REGISTERED(ch->number)) { mutex_unlock(®istration->mutex); return xpcUnregistered; } spin_lock_irqsave(&ch->lock, irq_flags); DBUG_ON(ch->flags & XPC_C_CONNECTED); DBUG_ON(ch->flags & XPC_C_OPENREQUEST); if (ch->flags & XPC_C_DISCONNECTING) { spin_unlock_irqrestore(&ch->lock, irq_flags); mutex_unlock(®istration->mutex); return ch->reason; } /* add info from the channel connect registration to the channel */ ch->kthreads_assigned_limit = registration->assigned_limit; ch->kthreads_idle_limit = registration->idle_limit; DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0); DBUG_ON(atomic_read(&ch->kthreads_idle) != 0); DBUG_ON(atomic_read(&ch->kthreads_active) != 0); ch->func = registration->func; DBUG_ON(registration->func == NULL); ch->key = registration->key; ch->local_nentries = registration->nentries; if (ch->flags & XPC_C_ROPENREQUEST) { if (registration->msg_size != ch->msg_size) { /* the local and remote sides aren't the same */ /* * Because XPC_DISCONNECT_CHANNEL() can block we're * forced to up the registration sema before we unlock * the channel lock. But that's okay here because we're * done with the part that required the registration * sema. XPC_DISCONNECT_CHANNEL() requires that the * channel lock be locked and will unlock and relock * the channel lock as needed. */ mutex_unlock(®istration->mutex); XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes, &irq_flags); spin_unlock_irqrestore(&ch->lock, irq_flags); return xpcUnequalMsgSizes; } } else { ch->msg_size = registration->msg_size; XPC_SET_REASON(ch, 0, 0); ch->flags &= ~XPC_C_DISCONNECTED; atomic_inc(&xpc_partitions[ch->partid].nchannels_active); } mutex_unlock(®istration->mutex); /* initiate the connection */ ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING); xpc_IPI_send_openrequest(ch, &irq_flags); xpc_process_connect(ch, &irq_flags); spin_unlock_irqrestore(&ch->lock, irq_flags); return xpcSuccess;}/* * Clear some of the msg flags in the local message queue. */static inline voidxpc_clear_local_msgqueue_flags(struct xpc_channel *ch){ struct xpc_msg *msg; s64 get; get = ch->w_remote_GP.get; do { msg = (struct xpc_msg *) ((u64) ch->local_msgqueue + (get % ch->local_nentries) * ch->msg_size); msg->flags = 0; } while (++get < (volatile s64) ch->remote_GP.get);}/* * Clear some of the msg flags in the remote message queue. */static inline voidxpc_clear_remote_msgqueue_flags(struct xpc_channel *ch){ struct xpc_msg *msg; s64 put; put = ch->w_remote_GP.put; do { msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + (put % ch->remote_nentries) * ch->msg_size); msg->flags = 0; } while (++put < (volatile s64) ch->remote_GP.put);}static voidxpc_process_msg_IPI(struct xpc_partition *part, int ch_number){ struct xpc_channel *ch = &part->channels[ch_number]; int nmsgs_sent; ch->remote_GP = part->remote_GPs[ch_number]; /* See what, if anything, has changed for each connected channel */ xpc_msgqueue_ref(ch); if (ch->w_remote_GP.get == ch->remote_GP.get && ch->w_remote_GP.put == ch->remote_GP.put) { /* nothing changed since GPs were last pulled */ xpc_msgqueue_deref(ch); return; } if (!(ch->flags & XPC_C_CONNECTED)){ xpc_msgqueue_deref(ch); return; } /* * First check to see if messages recently sent by us have been * received by the other side. (The remote GET value will have * changed since we last looked at it.) */ if (ch->w_remote_GP.get != ch->remote_GP.get) { /* * We need to notify any senders that want to be notified * that their sent messages have been received by their * intended recipients. We need to do this before updating * w_remote_GP.get so that we don't allocate the same message * queue entries prematurely (see xpc_allocate_msg()). */ if (atomic_read(&ch->n_to_notify) > 0) { /* * Notify senders that messages sent have been * received and delivered by the other side. */ xpc_notify_senders(ch, xpcMsgDelivered, ch->remote_GP.get); } /* * Clear msg->flags in previously sent messages, so that * they're ready for xpc_allocate_msg(). */ xpc_clear_local_msgqueue_flags(ch); ch->w_remote_GP.get = ch->remote_GP.get; dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, " "channel=%d\n", ch->w_remote_GP.get, ch->partid, ch->number); /* * If anyone was waiting for message queue entries to become * available, wake them up. */ if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) { wake_up(&ch->msg_allocate_wq); } } /* * Now check for newly sent messages by the other side. (The remote * PUT value will have changed since we last looked at it.) */ if (ch->w_remote_GP.put != ch->remote_GP.put) { /* * Clear msg->flags in previously received messages, so that * they're ready for xpc_get_deliverable_msg(). */ xpc_clear_remote_msgqueue_flags(ch); ch->w_remote_GP.put = ch->remote_GP.put; dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, " "channel=%d\n", ch->w_remote_GP.put, ch->partid, ch->number); nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get; if (nmsgs_sent > 0) { dev_dbg(xpc_chan, "msgs waiting to be copied and " "delivered=%d, partid=%d, channel=%d\n", nmsgs_sent, ch->partid, ch->number); if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) { xpc_activate_kthreads(ch, nmsgs_sent); } } } xpc_msgqueue_deref(ch);}voidxpc_process_channel_activity(struct xpc_partition *part){ unsigned long irq_flags; u64 IPI_amo, IPI_flags; struct xpc_channel *ch; int ch_number; u32 ch_flags; IPI_amo = xpc_get_IPI_flags(part); /* * Initiate channel connections for registered channels. * * For each connected channel that has pending messages activate idle * kthreads and/or create new kthreads as needed. */ for (ch_number = 0; ch_number < part->nchannels; ch_number++) { ch = &part->channels[ch_number]; /* * Process any open or close related IPI flags, and then deal * with connecting or disconnecting the channel as required. */ IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number); if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags)) { xpc_process_openclose_IPI(part, ch_number, IPI_flags); } ch_flags = ch->flags; /* need an atomic snapshot of flags */ if (ch_flags & XPC_C_DISCONNECTING) { spin_lock_irqsave(&ch->lock, irq_flags); xpc_process_disconnect(ch, &irq_flags); spin_unlock_irqrestore(&ch->lock, irq_flags); continue; } if (part->act_state == XPC_P_DEACTIVATING) { continue; } if (!(ch_flags & XPC_C_CONNECTED)) { if (!(ch_flags & XPC_C_OPENREQUEST)) { DBUG_ON(ch_flags & XPC_C_SETUP); (void) xpc_connect_channel(ch); } else { spin_lock_irqsave(&ch->lock, irq_flags); xpc_process_connect(ch, &irq_flags); spin_unlock_irqrestore(&ch->lock, irq_flags); } continue; } /* * Process any message related IPI flags, this may involve the * activation of kthreads to deliver any pending messages sent * from the other partition. */ if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags)) { xpc_process_msg_IPI(part, ch_number); } }}/* * XPC's heartbeat code calls this function to inform XPC that a partition is * going down. XPC responds by tearing down the XPartition Communication * infrastructure used for the just downed partition. * * XPC's heartbeat code will never call this function and xpc_partition_up() * at the same time. Nor will it ever make multiple calls to either function * at the same time. */voidxpc_partition_going_down(struct xpc_partition *part, enum xpc_retval reason){ unsigned long irq_flags; int ch_number; struct xpc_channel *ch; dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n", XPC_PARTID(part), reason); if (!xpc_part_ref(part)) { /* infrastructure for this partition isn't currently set up */ return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -