📄 udpbroadcast.cc
字号:
/* * Player - One Hell of a Robot Server * Copyright (C) 2000 Brian Gerkey & Kasper Stoy * gerkey@usc.edu kaspers@robotics.usc.edu * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ /////////////////////////////////////////////////////////////////////////////// Desc: Device for inter-player communication using broadcast sockets.// Author: Andrew Howard// Date: 5 Feb 2000// CVS: $Id: udpbroadcast.cc,v 1.1.2.1 2002/12/04 03:57:34 gerkey Exp $//// Theory of operation:// This device uses IPv4 broadcasting (not multicasting). Be careful// not to run this on the USC university nets: you will get disconnected// and spanked!/////////////////////////////////////////////////////////////////////////////#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <netdb.h>#include <fcntl.h>#include <errno.h>#include <stddef.h>#ifdef HAVE_CONFIG_H#include <config.h>#endif#define PLAYER_ENABLE_TRACE 0#include <playertime.h>extern PlayerTime* GlobalTime;#include "device.h"#include "drivertable.h"#include "player.h"#define DEFAULT_BROADCAST_IP "10.255.255.255"#define DEFAULT_BROADCAST_PORT 6013class UDPBroadcast : public CDevice{ // Constructor public: UDPBroadcast(char* interface, ConfigFile* cf, int section); // Override the subscribe/unsubscribe calls, since we need to // maintain our own subscription list. public: virtual int Subscribe(void *client); public: virtual int Unsubscribe(void *client); // Setup/shutdown routines public: virtual int Setup(); public: virtual int Shutdown(); // Process commands from a particular client (called in server thread). public: virtual void PutCommand(void* client, unsigned char* src, size_t len); // Get the number of data packets to be returned to a particular client. public: virtual size_t GetNumData(void* client); // Get data for a particular client (called in server thread). public: virtual size_t GetData(void* client, unsigned char* dest, size_t len, uint32_t* timestamp_sec, uint32_t* timestamp_usec); // Main function for device thread private: virtual void Main(); // Setup the message queues private: int SetupQueues(); // Shutdown the message queues private: int ShutdownQueues(); // Create a new queue private: int AddQueue(void *client); // Delete queue for a client private: int DelQueue(void *client); // Find the queue for a particular client. private: int FindQueue(void *client); // Determine the length of the queue for a particular client. private: int LenQueue(void *client); // Push a message onto all of the queues private: int PushQueue(void *msg, int len); // Pop a message from a particular client's queue private: int PopQueue(void *client, void *msg, int len, struct timeval* timestamp); // Initialise the broadcast sockets private: int SetupSockets(); // Shutdown the broadcast sockets private: int ShutdownSockets(); // Send a packet to the broadcast socket. private: void SendPacket(void *packet, size_t size); // Receive a packet from the broadcast socket. This will block. private: int RecvPacket(void *packet, size_t size); // Queue used for each client. private: struct queue_t { void *client; int size, count, start, end; int *msglen; void **msg; struct timeval* timestamp; }; // Max messages to have in any given queue private: int max_queue_size; // Message queue list (one for each client). private: int qlist_size, qlist_count; private: queue_t **qlist; // Address and port to broadcast on private: char addr[MAX_FILENAME_SIZE]; private: int port; // Write socket info private: int write_socket; private: sockaddr_in write_addr; // Read socket info private: int read_socket; private: sockaddr_in read_addr;};///////////////////////////////////////////////////////////////////////////// Instantiate an instance of this driverCDevice* UDPBroadcast_Init(char* interface, ConfigFile* cf, int section){ if(strcmp(interface, PLAYER_COMMS_STRING)) { PLAYER_ERROR1("driver \"udpbroadcast\" does not support interface \"%s\"\n", interface); return(NULL); } else return((CDevice*)(new UDPBroadcast(interface, cf, section)));}///////////////////////////////////////////////////////////////////////////// Register drivervoid UDPBroadcast_Register(DriverTable* table){ table->AddDriver("udpbroadcast", PLAYER_ALL_MODE, UDPBroadcast_Init);}///////////////////////////////////////////////////////////////////////////// ConstructorUDPBroadcast::UDPBroadcast(char* interface, ConfigFile* cf, int section) : CDevice(0,0,0,0){ this->max_queue_size = 160; this->read_socket = 0; this->write_socket = 0; strncpy(this->addr, cf->ReadString(section, "addr", DEFAULT_BROADCAST_IP), sizeof(this->addr)); this->port = cf->ReadInt(section, "port", DEFAULT_BROADCAST_PORT); PLAYER_TRACE2("broadcasting on %s:%d", this->addr, this->port); return;}///////////////////////////////////////////////////////////////////////////// Subscribe new clients to this device. This will create a new message// queue for each client.int UDPBroadcast::Subscribe(void *client){ int result; // Do default subscription. result = CDevice::Subscribe(client); if (result != 0) return result; // Create a new queue Lock(); AddQueue(client); Unlock(); return 0;}///////////////////////////////////////////////////////////////////////////// Unsubscribe clients from this device. This will destroy the corresponding// message queue.int UDPBroadcast::Unsubscribe(void *client){ // Delete queue for this client Lock(); DelQueue(client); Unlock(); // Do default unsubscribe return CDevice::Unsubscribe(client);}///////////////////////////////////////////////////////////////////////////// Start deviceint UDPBroadcast::Setup(){ PLAYER_TRACE0("initializing"); // Setup the sockets if (SetupSockets() != 0) return 1; // Setup the message queues if (SetupQueues() != 0) return 1; // Start device thread StartThread(); PLAYER_TRACE0("initializing ... done"); return 0;}///////////////////////////////////////////////////////////////////////////// Shutdown deviceint UDPBroadcast::Shutdown(){ PLAYER_TRACE0("shuting down"); // Shutdown device thread StopThread(); // Shutdown the message queues ShutdownQueues(); // Shutdown the sockets ShutdownSockets(); PLAYER_TRACE0("shutting down ... done"); return 0;}///////////////////////////////////////////////////////////////////////////// Process commands from a particular client (called in server thread).void UDPBroadcast::PutCommand(void* client, unsigned char* src, size_t len){ PLAYER_TRACE3("client %p sent %d bytes [%s]", client, len, src); SendPacket(src, len); return;}///////////////////////////////////////////////////////////////////////////// Get the number of data packets to be returned to a particular client.size_t UDPBroadcast::GetNumData(void* client){ PLAYER_TRACE2("client %p has %d pending messages", client, LenQueue(client)); return LenQueue(client);}///////////////////////////////////////////////////////////////////////////// Get data for a particular client (called in server thread).size_t UDPBroadcast::GetData(void* client, unsigned char* dest, size_t len, uint32_t* timestamp_sec, uint32_t* timestamp_usec){ size_t dlen; struct timeval ts; // Pop the next waiting packet from the queue and send it back // to the client. Lock(); dlen = PopQueue(client, dest, len, &ts); if(dlen) { *timestamp_sec = ts.tv_sec; *timestamp_usec = ts.tv_usec; } Unlock(); PLAYER_TRACE3("client %p got %d bytes [%s]", client, dlen, dest); return dlen;}///////////////////////////////////////////////////////////////////////////// Main function for device threadvoid UDPBroadcast::Main() { int len; uint8_t msg[PLAYER_MAX_MESSAGE_SIZE]; PLAYER_TRACE0("thread running"); while (true) { // Get incoming messages; this is a blocking call. len = RecvPacket(msg, sizeof(msg)); // Test for thread termination; this will make the function exit // immediately. pthread_testcancel(); // Push incoming messages on the queue. Lock(); PushQueue(&msg, len); Unlock(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -