📄 xpc_channel.c
字号:
"channel=%d\n", put, ch->partid, ch->number); send_IPI = 1; /* * We need to ensure that the message referenced by * local_GP->put is not XPC_M_READY or that local_GP->put * equals w_local_GP.put, so we'll go have a look. */ initial_put = put; } if (send_IPI) { xpc_IPI_send_msgrequest(ch); }}/* * Common code that does the actual sending of the message by advancing the * local message queue's Put value and sends an IPI to the partition the * message is being sent to. */static enum xpc_retvalxpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type, xpc_notify_func func, void *key){ enum xpc_retval ret = xpcSuccess; struct xpc_notify *notify = notify; s64 put, msg_number = msg->number; DBUG_ON(notify_type == XPC_N_CALL && func == NULL); DBUG_ON((((u64) msg - (u64) ch->local_msgqueue) / ch->msg_size) != msg_number % ch->local_nentries); DBUG_ON(msg->flags & XPC_M_READY); if (ch->flags & XPC_C_DISCONNECTING) { /* drop the reference grabbed in xpc_allocate_msg() */ xpc_msgqueue_deref(ch); return ch->reason; } if (notify_type != 0) { /* * Tell the remote side to send an ACK interrupt when the * message has been delivered. */ msg->flags |= XPC_M_INTERRUPT; atomic_inc(&ch->n_to_notify); notify = &ch->notify_queue[msg_number % ch->local_nentries]; notify->func = func; notify->key = key; notify->type = notify_type; // >>> is a mb() needed here? if (ch->flags & XPC_C_DISCONNECTING) { /* * An error occurred between our last error check and * this one. We will try to clear the type field from * the notify entry. If we succeed then * xpc_disconnect_channel() didn't already process * the notify entry. */ if (cmpxchg(¬ify->type, notify_type, 0) == notify_type) { atomic_dec(&ch->n_to_notify); ret = ch->reason; } /* drop the reference grabbed in xpc_allocate_msg() */ xpc_msgqueue_deref(ch); return ret; } } msg->flags |= XPC_M_READY; /* * The preceding store of msg->flags must occur before the following * load of ch->local_GP->put. */ mb(); /* see if the message is next in line to be sent, if so send it */ put = ch->local_GP->put; if (put == msg_number) { xpc_send_msgs(ch, put); } /* drop the reference grabbed in xpc_allocate_msg() */ xpc_msgqueue_deref(ch); return ret;}/* * Send a message previously allocated using xpc_initiate_allocate() on the * specified channel connected to the specified partition. * * This routine will not wait for the message to be received, nor will * notification be given when it does happen. Once this routine has returned * the message entry allocated via xpc_initiate_allocate() is no longer * accessable to the caller. * * This routine, although called by users, does not call xpc_part_ref() to * ensure that the partition infrastructure is in place. It relies on the * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg(). * * Arguments: * * partid - ID of partition to which the channel is connected. * ch_number - channel # to send message on. * payload - pointer to the payload area allocated via * xpc_initiate_allocate(). */enum xpc_retvalxpc_initiate_send(partid_t partid, int ch_number, void *payload){ struct xpc_partition *part = &xpc_partitions[partid]; struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); enum xpc_retval ret; dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *) msg, partid, ch_number); DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS); DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); DBUG_ON(msg == NULL); ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL); return ret;}/* * Send a message previously allocated using xpc_initiate_allocate on the * specified channel connected to the specified partition. * * This routine will not wait for the message to be sent. Once this routine * has returned the message entry allocated via xpc_initiate_allocate() is no * longer accessable to the caller. * * Once the remote end of the channel has received the message, the function * passed as an argument to xpc_initiate_send_notify() will be called. This * allows the sender to free up or re-use any buffers referenced by the * message, but does NOT mean the message has been processed at the remote * end by a receiver. * * If this routine returns an error, the caller's function will NOT be called. * * This routine, although called by users, does not call xpc_part_ref() to * ensure that the partition infrastructure is in place. It relies on the * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg(). * * Arguments: * * partid - ID of partition to which the channel is connected. * ch_number - channel # to send message on. * payload - pointer to the payload area allocated via * xpc_initiate_allocate(). * func - function to call with asynchronous notification of message * receipt. THIS FUNCTION MUST BE NON-BLOCKING. * key - user-defined key to be passed to the function when it's called. */enum xpc_retvalxpc_initiate_send_notify(partid_t partid, int ch_number, void *payload, xpc_notify_func func, void *key){ struct xpc_partition *part = &xpc_partitions[partid]; struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); enum xpc_retval ret; dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *) msg, partid, ch_number); DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS); DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); DBUG_ON(msg == NULL); DBUG_ON(func == NULL); ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL, func, key); return ret;}static struct xpc_msg *xpc_pull_remote_msg(struct xpc_channel *ch, s64 get){ struct xpc_partition *part = &xpc_partitions[ch->partid]; struct xpc_msg *remote_msg, *msg; u32 msg_index, nmsgs; u64 msg_offset; enum xpc_retval ret; if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) { /* we were interrupted by a signal */ return NULL; } while (get >= ch->next_msg_to_pull) { /* pull as many messages as are ready and able to be pulled */ msg_index = ch->next_msg_to_pull % ch->remote_nentries; DBUG_ON(ch->next_msg_to_pull >= (volatile s64) ch->w_remote_GP.put); nmsgs = (volatile s64) ch->w_remote_GP.put - ch->next_msg_to_pull; if (msg_index + nmsgs > ch->remote_nentries) { /* ignore the ones that wrap the msg queue for now */ nmsgs = ch->remote_nentries - msg_index; } msg_offset = msg_index * ch->msg_size; msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + msg_offset); remote_msg = (struct xpc_msg *) (ch->remote_msgqueue_pa + msg_offset); if ((ret = xpc_pull_remote_cachelines(part, msg, remote_msg, nmsgs * ch->msg_size)) != xpcSuccess) { dev_dbg(xpc_chan, "failed to pull %d msgs starting with" " msg %ld from partition %d, channel=%d, " "ret=%d\n", nmsgs, ch->next_msg_to_pull, ch->partid, ch->number, ret); XPC_DEACTIVATE_PARTITION(part, ret); mutex_unlock(&ch->msg_to_pull_mutex); return NULL; } mb(); /* >>> this may not be needed, we're not sure */ ch->next_msg_to_pull += nmsgs; } mutex_unlock(&ch->msg_to_pull_mutex); /* return the message we were looking for */ msg_offset = (get % ch->remote_nentries) * ch->msg_size; msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + msg_offset); return msg;}/* * Get a message to be delivered. */static struct xpc_msg *xpc_get_deliverable_msg(struct xpc_channel *ch){ struct xpc_msg *msg = NULL; s64 get; do { if ((volatile u32) ch->flags & XPC_C_DISCONNECTING) { break; } get = (volatile s64) ch->w_local_GP.get; if (get == (volatile s64) ch->w_remote_GP.put) { break; } /* There are messages waiting to be pulled and delivered. * We need to try to secure one for ourselves. We'll do this * by trying to increment w_local_GP.get and hope that no one * else beats us to it. If they do, we'll we'll simply have * to try again for the next one. */ if (cmpxchg(&ch->w_local_GP.get, get, get + 1) == get) { /* we got the entry referenced by get */ dev_dbg(xpc_chan, "w_local_GP.get changed to %ld, " "partid=%d, channel=%d\n", get + 1, ch->partid, ch->number); /* pull the message from the remote partition */ msg = xpc_pull_remote_msg(ch, get); DBUG_ON(msg != NULL && msg->number != get); DBUG_ON(msg != NULL && (msg->flags & XPC_M_DONE)); DBUG_ON(msg != NULL && !(msg->flags & XPC_M_READY)); break; } } while (1); return msg;}/* * Deliver a message to its intended recipient. */voidxpc_deliver_msg(struct xpc_channel *ch){ struct xpc_msg *msg; if ((msg = xpc_get_deliverable_msg(ch)) != NULL) { /* * This ref is taken to protect the payload itself from being * freed before the user is finished with it, which the user * indicates by calling xpc_initiate_received(). */ xpc_msgqueue_ref(ch); atomic_inc(&ch->kthreads_active); if (ch->func != NULL) { dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, " "msg_number=%ld, partid=%d, channel=%d\n", (void *) msg, msg->number, ch->partid, ch->number); /* deliver the message to its intended recipient */ ch->func(xpcMsgReceived, ch->partid, ch->number, &msg->payload, ch->key); dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, " "msg_number=%ld, partid=%d, channel=%d\n", (void *) msg, msg->number, ch->partid, ch->number); } atomic_dec(&ch->kthreads_active); }}/* * Now we actually acknowledge the messages that have been delivered and ack'd * by advancing the cached remote message queue's Get value and if requested * send an IPI to the message sender's partition. */static voidxpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags){ struct xpc_msg *msg; s64 get = initial_get + 1; int send_IPI = 0; while (1) { while (1) { if (get == (volatile s64) ch->w_local_GP.get) { break; } msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + (get % ch->remote_nentries) * ch->msg_size); if (!(msg->flags & XPC_M_DONE)) { break; } msg_flags |= msg->flags; get++; } if (get == initial_get) { /* nothing's changed */ break; } if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) != initial_get) { /* someone else beat us to it */ DBUG_ON((volatile s64) ch->local_GP->get <= initial_get); break; } /* we just set the new value of local_GP->get */ dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, " "channel=%d\n", get, ch->partid, ch->number); send_IPI = (msg_flags & XPC_M_INTERRUPT); /* * We need to ensure that the message referenced by * local_GP->get is not XPC_M_DONE or that local_GP->get * equals w_local_GP.get, so we'll go have a look. */ initial_get = get; } if (send_IPI) { xpc_IPI_send_msgrequest(ch); }}/* * Acknowledge receipt of a delivered message. * * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition * that sent the message. * * This function, although called by users, does not call xpc_part_ref() to * ensure that the partition infrastructure is in place. It relies on the * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg(). * * Arguments: * * partid - ID of partition to which the channel is connected. * ch_number - channel # message received on. * payload - pointer to the payload area allocated via * xpc_initiate_allocate(). */voidxpc_initiate_received(partid_t partid, int ch_number, void *payload){ struct xpc_partition *part = &xpc_partitions[partid]; struct xpc_channel *ch; struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); s64 get, msg_number = msg->number; DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS); DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); ch = &part->channels[ch_number]; dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n", (void *) msg, msg_number, ch->partid, ch->number); DBUG_ON((((u64) msg - (u64) ch->remote_msgqueue) / ch->msg_size) != msg_number % ch->remote_nentries); DBUG_ON(msg->flags & XPC_M_DONE); msg->flags |= XPC_M_DONE; /* * The preceding store of msg->flags must occur before the following * load of ch->local_GP->get. */ mb(); /* * See if this message is next in line to be acknowledged as having * been delivered. */ get = ch->local_GP->get; if (get == msg_number) { xpc_acknowledge_msgs(ch, get, msg->flags); } /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */ xpc_msgqueue_deref(ch);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -