📄 udpbroadcast.cc
字号:
///////////////////////////////////////////////////////////////////////////// Setup the message queues.// This is a list of queues (one queue for each client) implemented as a// dynamic array of pointers to queues.int UDPBroadcast::SetupQueues(){ this->qlist_count = 0; this->qlist_size = 10; this->qlist = (queue_t**) malloc(this->qlist_size * sizeof(queue_t*)); return 0;}///////////////////////////////////////////////////////////////////////////// Shutdown the message queues.// First delete any queues we still have, then delete the array of// pointers to queues.int UDPBroadcast::ShutdownQueues(){ int i; for (i = 0; i < this->qlist_count; i++) free(this->qlist[i]); this->qlist_count = 0; free(this->qlist); this->qlist_size = 0; return 0;}///////////////////////////////////////////////////////////////////////////// Create a new queue.// We may have to increase the size of the list.int UDPBroadcast::AddQueue(void *client){ queue_t *queue; // first, check if we've already got a queue for this client. this can // happen when a client subscribes read/write, because that causes our // Subscribe() to be called twice. for(int i=0;i<this->qlist_count;i++) { if(this->qlist[i]->client == client) return(0); } PLAYER_TRACE1("adding queue for client %p", client); if (this->qlist_count == this->qlist_size) { this->qlist_size *= 2; this->qlist = (queue_t**) realloc(this->qlist, this->qlist_size * sizeof(queue_t*)); } assert(this->qlist_count < this->qlist_size); queue = (queue_t*) malloc(sizeof(queue_t)); queue->client = client; queue->size = 10; queue->count = 0; queue->start = queue->end = 0; queue->msg = (void**) calloc(queue->size, sizeof(void*)); queue->msglen = (int*) calloc(queue->size, sizeof(int)); queue->timestamp = (struct timeval*) calloc(queue->size, sizeof(struct timeval)); this->qlist[this->qlist_count++] = queue; return 0;}///////////////////////////////////////////////////////////////////////////// Delete queue for a client.// We have to find the queue, then shift all the other entries in the list.int UDPBroadcast::DelQueue(void *client){ int index; PLAYER_TRACE1("deleting queue for client %p", client); index = FindQueue(client); if (index < 0) { // this is likely OK; it can happen when a client was subscribed // read/write and then unsubscribed, which causes our Unsubscribe() to be // called twice. //PLAYER_ERROR1("queue for client %p not found", client); return 0; } for(int i=0;i<this->qlist[index]->count;i++) free(this->qlist[index]->msg[i]); free(this->qlist[index]->msg); free(this->qlist[index]->msglen); free(this->qlist[index]->timestamp); free(this->qlist[index]); memmove(this->qlist + index, this->qlist + index + 1, (this->qlist_count - (index + 1)) * sizeof(this->qlist[0])); this->qlist_count--; return 0;}///////////////////////////////////////////////////////////////////////////// Find the queue for a particular client.int UDPBroadcast::FindQueue(void *client){ int i; for (i = 0; i < this->qlist_count; i++) { if (this->qlist[i]->client == client) return i; } return -1;}///////////////////////////////////////////////////////////////////////////// Determine the length of the queue for a particular client.int UDPBroadcast::LenQueue(void *client){ int index; queue_t *queue; index = FindQueue(client); if (index < 0) { PLAYER_ERROR1("queue for client %p not found", client); return -1; } queue = this->qlist[index]; return queue->count;}///////////////////////////////////////////////////////////////////////////// Push a message onto all of the queuesint UDPBroadcast::PushQueue(void *msg, int len){ int i; queue_t *queue; struct timeval curr; // first, get the current time in order to stamp the message GlobalTime->GetTime(&curr); for (i = 0; i < this->qlist_count; i++) { queue = this->qlist[i]; // Resize queue if we've run out of space. if(queue->count == queue->size) { if(queue->size >= this->max_queue_size) continue; queue->size *= 2; queue->msg = (void**)realloc(queue->msg, queue->size * sizeof(void*)); queue->msglen = (int*)realloc(queue->msglen, queue->size * sizeof(int)); queue->timestamp = (struct timeval*)realloc(queue->timestamp, queue->size * sizeof(struct timeval)); // now we need to move everything between the start pointers and the end // of the old buffers to the end of the new buffers memmove(queue->msg + (queue->size/2) + queue->start, queue->msg + queue->start, ((queue->size/2) - queue->start) * sizeof(void*)); memmove(queue->msglen + (queue->size/2) + queue->start, queue->msglen + queue->start, ((queue->size/2) - queue->start) * sizeof(int)); memmove(queue->timestamp + (queue->size/2) + queue->start, queue->timestamp + queue->start, ((queue->size/2) - queue->start) * sizeof(struct timeval)); // now move the start pointer queue->start += queue->size/2; } assert(queue->count < queue->size); queue->msg[queue->end] = malloc(len); memcpy(queue->msg[queue->end], msg, len); queue->msglen[queue->end] = len; queue->timestamp[queue->end] = curr; queue->end = (queue->end + 1) % queue->size; queue->count += 1; } return 0;}///////////////////////////////////////////////////////////////////////////// Pop a message from a particular client's queueint UDPBroadcast::PopQueue(void *client, void *msg, int len, struct timeval* timestamp){ int index, msglen; queue_t *queue; index = FindQueue(client); if (index < 0) { PLAYER_ERROR1("queue for client %p not found", client); return -1; } queue = this->qlist[index]; if (queue->count == 0) return 0; msglen = queue->msglen[queue->start]; assert(msglen <= len); memcpy(msg, queue->msg[queue->start], msglen); free(queue->msg[queue->start]); *timestamp = queue->timestamp[queue->start]; queue->start = (queue->start + 1) % queue->size; queue->count -= 1; return msglen;}///////////////////////////////////////////////////////////////////////////// Initialise the broadcast socketsint UDPBroadcast::SetupSockets(){ // Set up the write socket this->write_socket = socket(AF_INET, SOCK_DGRAM, 0); if (this->write_socket == -1) { PLAYER_ERROR1("error initializing socket : %s", strerror(errno)); return 1; } memset(&this->write_addr, 0, sizeof(this->write_addr)); this->write_addr.sin_family = AF_INET; this->write_addr.sin_addr.s_addr = inet_addr(this->addr); this->write_addr.sin_port = htons(this->port); // Set write socket options to allow broadcasting u_int broadcast = 1; if (setsockopt(this->write_socket, SOL_SOCKET, SO_BROADCAST, (const char*)&broadcast, sizeof(broadcast)) < 0) { PLAYER_ERROR1("error initializing socket : %s", strerror(errno)); return 1; } // Set up the read socket this->read_socket = socket(PF_INET, SOCK_DGRAM, 0); if (this->read_socket == -1) { PLAYER_ERROR1("error initializing socket : %s", strerror(errno)); return 1; } // Set socket options to allow sharing of port u_int share = 1; if (setsockopt(this->read_socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&share, sizeof(share)) < 0) { PLAYER_ERROR1("error initializing socket : %s", strerror(errno)); return 1; } // Bind socket to port memset(&this->read_addr, 0, sizeof(this->read_addr)); this->read_addr.sin_family = AF_INET; this->read_addr.sin_addr.s_addr = htonl(INADDR_ANY); this->read_addr.sin_port = htons(this->port); if (bind(this->read_socket, (sockaddr*) &this->read_addr, sizeof(this->read_addr)) < 0) { PLAYER_ERROR1("error initializing socket : %s", strerror(errno)); return 1; } // Set socket to non-blocking //if (fcntl(this->read_socket, F_SETFL, O_NONBLOCK) < 0) //{ // PLAYER_ERROR1("error initializing socket : %s", strerror(errno)); // return 1; //} return 0;}///////////////////////////////////////////////////////////////////////////// Shutdown the broadcast socketsint UDPBroadcast::ShutdownSockets(){ // Close sockets close(this->write_socket); close(this->read_socket); return 0;}///////////////////////////////////////////////////////////////////////////// Send a packetvoid UDPBroadcast::SendPacket(void *packet, size_t size){ if (sendto(this->write_socket, (const char*)packet, size, 0, (sockaddr*) &this->write_addr, sizeof(this->write_addr)) < 0) { PLAYER_ERROR1("error writing to broadcast socket: %s", strerror(errno)); return; } //PLAYER_TRACE1("sent msg len = %d", (int) size);}///////////////////////////////////////////////////////////////////////////// Receive a packetint UDPBroadcast::RecvPacket(void *packet, size_t size){ socklen_t addr_len = sizeof(this->read_addr); int packet_len = recvfrom(this->read_socket, (char*)packet, size, 0, (sockaddr*) &this->read_addr, &addr_len); if (packet_len < 0) { PLAYER_ERROR1("error reading from broadcast socket: %s", strerror(errno)); return 0; } //PLAYER_TRACE1("read packet len = %d", packet_len); return packet_len;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -