📄 async.cpp
字号:
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Library General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
// async.cpp - Copyright (C) 2002, Simon Brenner
#include <sys/poll.h>
#include <sys/types.h>
#include <asm/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include "async.h"
#include "FdConnection.hpp"
#include <linux/errqueue.h>
#define F_SETSIG 10 /* from asm/fcntl.h */
using namespace std;
locker < vector <FdConnection *> > sockList;
locker < map <int, FdConnection *> > fdSockMap;
#ifdef USE_SIGNALS
void AsyncSignalHandler(int signal, siginfo_t *si, void *)
{
pollfd pfd;
int res;
printf("In AsyncSignalHandler()\n");
if (si->si_code == SI_SIGIO)
{
FdConnection *sc;
fdSockMap.Lock();
sc=fdSockMap[si->si_fd];
fdSockMap.Unlock();
pfd.fd=si->si_fd;
pfd.events=POLLIN|POLLOUT;
pfd.revents=0;
res=poll(&pfd, 1, 500);
if (res < 0)
perror("Poll in AsyncSignalHandler()");
if (res == 0)
return;
if (res == 1)
{
if (pfd.revents & POLLIN)
sc->OnRead();
if (pfd.revents & POLLOUT)
sc->OnWrite();
if (pfd.revents & POLLHUP)
sc->OnClose();
}
}
}
#endif // USE_SIGNALS
void InitAsync()
{
// Dummy
}
void AsyncNotifications(const vector <int> &fds, timeval *timeout, bool noWrites)
{
int numFds;
int max_fd=0;
fd_set read_fds, write_fds;
int res;
numFds=fds.size();
if (numFds>0)
{
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
for (int i=0;i<numFds;i++)
{
int fd=fds[i];
if (fd>max_fd) max_fd=fd;
FD_SET(fd, &read_fds);
FD_SET(fd, &write_fds);
}
// We rather fill in write_fds for every socket and then clear it,
// than to test for noWrites for every socket in the above loop
if (noWrites)
FD_ZERO(&write_fds);
//printf("max_fd: %d\tnumFds: %d\n", max_fd, numFds);
res=select(max_fd+1, &read_fds, &write_fds, NULL, timeout);
if (res < 0)
{
if (errno == EINTR)
return;
perror("Select in AsyncNotifications()");
abort();
}
if (res == 0)
return;
if (res > 0)
{
for (int i=0;i<numFds;i++)
{
// We obviously save some work by not doing anything for sockets
// that don't have events pending
if (!FD_ISSET(fds[i], &read_fds) && !FD_ISSET(fds[i], &write_fds))
continue;
fdSockMap.Lock();
FdConnection *sc=fdSockMap[fds[i]];
fdSockMap.Unlock();
bool invalidated=false;
if (FD_ISSET(fds[i], &read_fds))
{
if (GetAvailBytes(fds[i]))
sc->OnRead();
else
{
invalidated=true;
sc->OnClose();
}
}
// Errors can surface in OnRead() that terminate the socket,
// so check if the socket is still valid before calling callback
if (!invalidated && FD_ISSET(fds[i], &write_fds))
sc->OnWrite();
}
}
}
else
{
printf("ERROR: No sockets left in AsyncNotificationLoop\n");
return;
}
}
void AsyncNotificationLoop(bool noWrites)
{
printf("AsyncNotificationLoop started\n");
vector <int> fds;
while (1)
{
sockList.Lock();
fds.resize(sockList.size());
for (int i=0;i<sockList.size();i++)
fds[i]=sockList[i]->getFd();
sockList.Unlock();
AsyncNotifications(fds, NULL, noWrites);
}
}
void CheckSocketError(FdConnection *sc)
{
char msgBuf[50];
char buf[1];
int en;
int ret=recv(sc->getFd(), buf, 1, MSG_PEEK);
en=errno;
sprintf(msgBuf, "In CheckSocketError(%d) errno(%d)", sc->getFd(), en);
if (ret < 1)
perror(msgBuf);
else
printf("%s\n", msgBuf);
}
void RemoveSocket(int fd, FdConnection *pSock)
{
//printf("fd %d removed from socket notification lists\n", fd);
fdSockMap.Lock();
fdSockMap.erase(fd);
fdSockMap.Unlock();
sockList.Lock();
for (vector <FdConnection *>::iterator p=sockList.begin();p<sockList.end();p++)
if (*p == pSock)
sockList.erase(p);
sockList.Unlock();
}
void AddSocket(int fd, FdConnection *pSock)
{
//printf("fd %d added to socket notification lists\n", fd);
int flags;
sockList.Lock();
sockList.push_back(pSock);
sockList.Unlock();
fdSockMap.Lock();
fdSockMap.insert(pair <int, FdConnection *>(fd, pSock));
fdSockMap.Unlock();
flags=fcntl(fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
int SocketCount(int fd)
{
int c=0;
sockList.Lock();
for (unsigned int i=0;i<sockList.size() && c<=1;i++)
if (sockList[i]->getFd() == fd)
c++;
sockList.Unlock();
return c;
}
bool LastSocketObject(int fd)
{
return !(SocketCount(fd) > 1);
}
unsigned int GetAvailBytes(int fd)
{
unsigned int ret;
if (ioctl(fd, FIONREAD, &ret) != -1)
return ret;
else
{
if (errno == EINVAL) // Server socket
return -1;
return 0;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -