📄 net_udp.cpp
字号:
// control programs.
double Time0,Time,TimeNext;
double DeltaTime = 0.5; // Seconds
struct timeb tt;
unsigned char *pbuf;
int AllConnected,iNext;
ftime(&tt); // Establish the base time
Time0 = tt.time + (double)tt.millitm / 1000.0;
TimeNext = DeltaTime;
// This is the master for the initialization procedure
// In the first phase, send out an "are-you-there" packet
// to all processes that have not yet responded.
// When all processes have responded, send out a "start"
// packet to all processes
for(;;)
{
// Stay in this loop until all processes have been connected
// Send out "are-you-there" packets
for(int i = 0; i < NumProc; i++)
{
if(i == ThisProcess)continue; // Don't send to self
if(ProcessConnected[i] == 1)continue;
pbuf = pPacketBuffer[i]; // Pointer to buffer
iNext = PacketNext[i]; // Next free slot
// 1st byte for Intel data format (already there)
pbuf[iNext++] = 100; // 2nd byte indicates are-you-there
pbuf[iNext++] = (unsigned char)i; // 3rd byte has number of target process
PacketNext[i] = iNext;
}
if(SendData() != 0) // Send out Packets
{
cout << "<ContactProcesses> Error sending ARE-YOU-THERE data\n";
return(1);
}
// Check to see if all processes are connected
// After DeltaTime seconds, go back and send out ARE-YOU-THEREs again
do
{
if(CheckNet() != 0)
{
cout << "<ContactProcesses> Error checking data\n";
return(1);
}
AllConnected = 1;
for(int i = 0; i < NumProc; i++)
{
if(ProcessConnected[i] != 1)AllConnected = 0;
}
if(AllConnected)break;
ftime(&tt); // Establish the base time
Time = tt.time + (double)tt.millitm / 1000.0 - Time0;
}while(Time < TimeNext);
if(AllConnected)break; // Done with connect loop
TimeNext += DeltaTime;
}
// Send out START signals
for(int i = 0; i < NumProc; i++)
{
if(i == ThisProcess)continue; // Don't send to self
pbuf = pPacketBuffer[i]; // Pointer to buffer
iNext = PacketNext[i]; // Next free slot
// 1st byte for Intel data format (already there)
pbuf[iNext++] = 102; // 2nd byte indicates START
pbuf[iNext++] = (unsigned char)i; // 3rd byte has number of target process
PacketNext[i] = iNext;
}
if(SendData() != 0) // Send out Packets
{
cout << "<ContactProcesses> Error sending START data\n";
return(1);
}
NetConnected = 1; // All processes are now on line
return(0); // Normal return
}
void ErrorExit(int n)
{
cout << "Error Exit #" << n << "\n";
cout << "Hit any key to exit.\n";
while(!kbhit()) ;
exit(1);
}
void InitUDP(int NumberProc,int ThisProc,char *Addr[])
// Initialize a socket
{
WSADATA wsaData; // Winsock implementation details
SOCKADDR_IN sockAddr; // Socket address structure
// union AddressConvert AddrCon;
int nBind; // Result value from bind()
ThisProcess = ThisProc; // Record process # for this process
NumProc = NumberProc;
if(NumProc > MAX_PROC)
{
cout << "<InitUDP> Too many processes.\n";
ErrorExit(2);
}
if (WSAStartup(WINSOCK_VERSION, &wsaData))
{
cout << "<InitUDP> Could not load WinSock DLL.\n";
ErrorExit(1);
}
nSocket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (nSocket == INVALID_SOCKET)
{
cout << "<InitUDP> Invalid socket!!\n";
NetCleanUp();
ErrorExit(2);
}
// Define the socket address
sockAddr.sin_family = AF_INET; // Internet address family
sockAddr.sin_port = u_short(BASE_PORT + ThisProc);
// Define the port for this process
sockAddr.sin_addr.s_addr = INADDR_ANY;
// Bind the socket
nBind = bind(nSocket,(LPSOCKADDR) &sockAddr,sizeof(sockAddr));
if(nBind == SOCKET_ERROR)
{
cout << "Can't Bind, nBind= " << nBind << "\n";
NetCleanUp();
ErrorExit(3);
}
// Set socket to non-blocking
unsigned long NoBlock = TRUE;
int RetVal = ioctlsocket(nSocket, FIONBIO, &NoBlock);
if(RetVal == SOCKET_ERROR)
{
cout << "<InitUDP> Error in setting socket to non-blocking\n";
NetCleanUp();
ErrorExit(4);
}
// Allocate memory for packet buffers and convert addresses
for(int i = 0; i < NumProc; i++)
{
if(ResetPacket(i) != 0)
{
NetCleanUp();
ErrorExit(4);
}
ProcessAddress[i] = inet_addr(Addr[i]); // Convert address
ProcessConnected[i] = 0; // No processes connected yet
}
ProcessConnected[ThisProc] = 1; // Connection flag for this process
NetConnected = 0; // Flag is TRUE when all processes are on line
if(ThisProcess == 0)
{
// Process #0 masters the connect procedure
if(ContactProcesses() != 0)
{
cout << "<InitUDP> Couldn't contact other processes\n";
NetCleanUp();
ErrorExit(4);
}
}
else
{
// Wait for START signal (also responds to ARE-YOU-THERE signals)
do
{
if(CheckNet() != 0)
{
cout << "<InitUDP> Error in waiting for START\n";
ErrorExit(8);
}
}while(NetConnected != 1);
}
cout << "All processes are on line.\n";
}
int CheckNet(void)
// Process any data that has come in from the network
// Returns error: 0 for OK, 1 for error
{
SOCKADDR_IN RecvSockAddr; // Socket address structure
unsigned char MessageBuffer[PACKET_BUFFER_SIZE]; // Buffer to hold
// incoming information
int nCharRecv; // Number of characters received
int RetVal;
unsigned long BytesToRead;
while(1) // Infinite loop -- return when out of network data
{
RetVal = ioctlsocket(nSocket, FIONREAD, &BytesToRead);
if(RetVal == SOCKET_ERROR)
{
cout << "<CheckNet>Error waiting for message\n";
return(1);
}
if(BytesToRead <= 0)return(0); // No more data available
// Get a packet
int lenRecvSockAddr = sizeof(RecvSockAddr);
nCharRecv = recvfrom(nSocket,(char *)MessageBuffer,PACKET_BUFFER_SIZE,
RECV_FLAGS,(LPSOCKADDR)&RecvSockAddr,&lenRecvSockAddr);
//cout << "nCharRecv " << nCharRecv << "\n";
//for(int j = 0; j < 10; j++)cout << (int)MessageBuffer[j] << " ";
//cout << "\n";
// Parse the message into component values
if(ParseMessage(MessageBuffer,nCharRecv) < 0)break;
}
cout << "<CheckNet>Unexpected break from process loop\n";
return(1);
}
int SendData(void)
{
// Send out any data currently buffered in packets
// Returns error: 0 for OK, 1 for error
SOCKADDR_IN SendSockAddr; // Socket address structure
unsigned char MessageBuffer[PACKET_BUFFER_SIZE]; // Buffer to hold
// outgoing information
int nbuf; // Number of characters in buffer
int nCharSent; // Number of characters transmitted
for(int i = 0; i < NumProc; i++)
{
// Cycle through all processes
if((i == ThisProcess) || (PacketNext[i] <= 0))continue;
// Don't send data to "self"; skip empty packets
SendSockAddr.sin_family = AF_INET; // Internet address family
SendSockAddr.sin_port = u_short(BASE_PORT + i);
SendSockAddr.sin_addr.s_addr = ProcessAddress[i];
// Copy packet data to local buffer --
// This should be done as critical region
nbuf = PacketNext[i]; // size of this packet
unsigned char *pc = pPacketBuffer[i]; // Pointer to packet
for(int j = 0; j < nbuf; j++)MessageBuffer[j] = pc[j];
ResetPacket(i); // This packet is now available for refilling
// Critical region can end here
nCharSent = sendto(nSocket,(char *)MessageBuffer,nbuf,
SEND_FLAGS,(LPSOCKADDR)&SendSockAddr,sizeof(SendSockAddr));
//cout << "nCharSent " << nCharSent << "\n";
//for(int j = 0; j < 10; j++)cout << (int)MessageBuffer[j] << " ";
//cout << "\n";
if(nCharSent != nbuf)
{
cout << "<SendData> Error sending packet\n";
return(1);
}
}
return(0); // Normal return
}
int InsertMessage(int ToProc,int ToTask,int BoxNo,int FromProc,
int FromTask,int MsgFlag,double MsgValue,
int RetRecpt,int DelMode)
{
// Returns error: 1 for error, 0 for OK
// Add a message to the appropriate process packet
if(ToProc == ThisProcess)
{
cout << "<InsertMessage> Messages to this process\n"
<< "Should have been handled locally not in network\n";
return(1);
}
int next = PacketNext[ToProc];
if((next + 25) > PACKET_BUFFER_SIZE)
{
cout << "<InsertMessage> Packet overflow, process # "
<< ToProc << "\n";
return(1);
}
unsigned char *pc = pPacketBuffer[ToProc]; // Pointer to buffer
pc[next++] = 1; // Code for message block
next = InsertShortValue(pc,next,ToProc);
next = InsertShortValue(pc,next,ToTask);
next = InsertShortValue(pc,next,BoxNo);
next = InsertShortValue(pc,next,FromProc);
next = InsertShortValue(pc,next,FromTask);
next = InsertShortValue(pc,next,MsgFlag);
next = InsertDoubleValue(pc,next,MsgValue);
next = InsertShortValue(pc,next,RetRecpt);
next = InsertShortValue(pc,next,DelMode);
PacketNext[ToProc] = next; // Update packet index
return(0);
}
int InsertSharedArray(int ToProc,int ToTask,int nValues,double *Val)
{
// Returns error: 1 for error, 0 for OK
if(ToProc == ThisProcess)
{
cout << "<InsertSharedArray> Data for this process\n"
<< "Should have been handled locally not in network\n";
return(1);
}
int next = PacketNext[ToProc];
if((next + 7 + nValues * sizeof(double)) > PACKET_BUFFER_SIZE)
{
cout << "<InsertSharedArray> Packet overflow, process # "
<< ToProc << "\n";
return(1);
}
unsigned char *pc = pPacketBuffer[ToProc]; // Pointer to buffer
pc[next++] = 2; // Code for shared data array
next = InsertShortValue(pc,next,ToProc);
next = InsertShortValue(pc,next,ToTask);
next = InsertDoubleArray(pc,next,Val,nValues);
PacketNext[ToProc] = next; // Update packet index
return(0);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -