📄 ioq_tcp.c
字号:
pj_pool_t *pool = NULL;
pj_ioqueue_t *ioque = NULL;
pj_ioqueue_key_t *ckey1;
pj_ssize_t status = -1;
int pending_op = 0;
pj_str_t s;
pj_status_t rc;
// Create pool.
pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
// Create I/O Queue.
rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (!ioque) {
status=-20; goto on_error;
}
// Create client socket
rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &csock1);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR in pj_sock_socket()", rc);
status=-1; goto on_error;
}
// Register client socket.
rc = pj_ioqueue_register_sock(pool, ioque, csock1, NULL,
&test_cb, &ckey1);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR in pj_ioqueue_register_sock()", rc);
status=-23; goto on_error;
}
// Initialize remote address.
pj_sockaddr_in_init(&addr, pj_cstr(&s, "127.0.0.1"), NON_EXISTANT_PORT);
// Client socket connect()
status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
if (status==PJ_SUCCESS) {
// unexpectedly success!
status = -30;
goto on_error;
}
if (status != PJ_EPENDING) {
// success
} else {
++pending_op;
}
callback_connect_status = -2;
callback_connect_key = NULL;
// Poll until we've got result
while (pending_op) {
pj_time_val timeout = {1, 0};
status=pj_ioqueue_poll(ioque, &timeout);
if (status > 0) {
if (callback_connect_key==ckey1) {
if (callback_connect_status == 0) {
// unexpectedly connected!
status = -50;
goto on_error;
}
}
if (status > pending_op) {
PJ_LOG(3,(THIS_FILE,
"...error: pj_ioqueue_poll() returned %d "
"(only expecting %d)",
status, pending_op));
return -552;
}
pending_op -= status;
if (pending_op == 0) {
status = 0;
}
}
}
// There's no pending operation.
// When we poll the ioqueue, there must not be events.
if (pending_op == 0) {
pj_time_val timeout = {1, 0};
status = pj_ioqueue_poll(ioque, &timeout);
if (status != 0) {
status=-60; goto on_error;
}
}
// Success
status = 0;
on_error:
if (csock1 != PJ_INVALID_SOCKET)
pj_sock_close(csock1);
if (ioque != NULL)
pj_ioqueue_destroy(ioque);
pj_pool_release(pool);
return status;
}
/*
* Repeated connect/accept on the same listener socket.
*/
static int compliance_test_2(void)
{
#if defined(PJ_SYMBIAN) && PJ_SYMBIAN!=0
enum { MAX_PAIR = 1, TEST_LOOP = 2 };
#else
enum { MAX_PAIR = 4, TEST_LOOP = 2 };
#endif
struct listener
{
pj_sock_t sock;
pj_ioqueue_key_t *key;
pj_sockaddr_in addr;
int addr_len;
} listener;
struct server
{
pj_sock_t sock;
pj_ioqueue_key_t *key;
pj_sockaddr_in local_addr;
pj_sockaddr_in rem_addr;
int rem_addr_len;
pj_ioqueue_op_key_t accept_op;
} server[MAX_PAIR];
struct client
{
pj_sock_t sock;
pj_ioqueue_key_t *key;
} client[MAX_PAIR];
pj_pool_t *pool = NULL;
char *send_buf, *recv_buf;
pj_ioqueue_t *ioque = NULL;
int i, bufsize = BUF_MIN_SIZE;
pj_ssize_t status;
int test_loop, pending_op = 0;
pj_timestamp t_elapsed;
pj_str_t s;
pj_status_t rc;
// Create pool.
pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
// Create I/O Queue.
rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR in pj_ioqueue_create()", rc);
return -10;
}
// Allocate buffers for send and receive.
send_buf = (char*)pj_pool_alloc(pool, bufsize);
recv_buf = (char*)pj_pool_alloc(pool, bufsize);
// Create listener socket
rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &listener.sock);
if (rc != PJ_SUCCESS) {
app_perror("...error creating socket", rc);
status=-20; goto on_error;
}
// Bind listener socket.
pj_sockaddr_in_init(&listener.addr, 0, 0);
if ((rc=pj_sock_bind(listener.sock, &listener.addr, sizeof(listener.addr))) != 0 ) {
app_perror("...bind error", rc);
status=-30; goto on_error;
}
// Get listener address.
listener.addr_len = sizeof(listener.addr);
rc = pj_sock_getsockname(listener.sock, &listener.addr, &listener.addr_len);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR in pj_sock_getsockname()", rc);
status=-40; goto on_error;
}
listener.addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
// Register listener socket.
rc = pj_ioqueue_register_sock(pool, ioque, listener.sock, NULL, &test_cb,
&listener.key);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR", rc);
status=-50; goto on_error;
}
// Listener socket listen().
if (pj_sock_listen(listener.sock, 5)) {
app_perror("...ERROR in pj_sock_listen()", rc);
status=-60; goto on_error;
}
for (test_loop=0; test_loop < TEST_LOOP; ++test_loop) {
// Client connect and server accept.
for (i=0; i<MAX_PAIR; ++i) {
rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &client[i].sock);
if (rc != PJ_SUCCESS) {
app_perror("...error creating socket", rc);
status=-70; goto on_error;
}
rc = pj_ioqueue_register_sock(pool, ioque, client[i].sock, NULL,
&test_cb, &client[i].key);
if (rc != PJ_SUCCESS) {
app_perror("...error ", rc);
status=-80; goto on_error;
}
// Server socket accept()
pj_ioqueue_op_key_init(&server[i].accept_op,
sizeof(server[i].accept_op));
server[i].rem_addr_len = sizeof(pj_sockaddr_in);
status = pj_ioqueue_accept(listener.key, &server[i].accept_op,
&server[i].sock, &server[i].local_addr,
&server[i].rem_addr,
&server[i].rem_addr_len);
if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
app_perror("...ERROR in pj_ioqueue_accept()", rc);
status=-90; goto on_error;
}
if (status==PJ_EPENDING) {
++pending_op;
}
// Client socket connect()
status = pj_ioqueue_connect(client[i].key, &listener.addr,
sizeof(listener.addr));
if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
app_perror("...ERROR in pj_ioqueue_connect()", rc);
status=-100; goto on_error;
}
if (status==PJ_EPENDING) {
++pending_op;
}
}
// Poll until all connected
while (pending_op) {
pj_time_val timeout = {1, 0};
status=pj_ioqueue_poll(ioque, &timeout);
if (status > 0) {
if (status > pending_op) {
PJ_LOG(3,(THIS_FILE,
"...error: pj_ioqueue_poll() returned %d "
"(only expecting %d)",
status, pending_op));
return -110;
}
pending_op -= status;
if (pending_op == 0) {
status = 0;
}
}
}
// There's no pending operation.
// When we poll the ioqueue, there must not be events.
if (pending_op == 0) {
pj_time_val timeout = {1, 0};
status = pj_ioqueue_poll(ioque, &timeout);
if (status != 0) {
status=-120; goto on_error;
}
}
for (i=0; i<MAX_PAIR; ++i) {
// Check server socket.
if (server[i].sock == PJ_INVALID_SOCKET) {
status = -130;
app_perror("...accept() error", pj_get_os_error());
goto on_error;
}
// Check addresses
if (server[i].local_addr.sin_family != PJ_AF_INET ||
server[i].local_addr.sin_addr.s_addr == 0 ||
server[i].local_addr.sin_port == 0)
{
app_perror("...ERROR address not set", rc);
status = -140;
goto on_error;
}
if (server[i].rem_addr.sin_family != PJ_AF_INET ||
server[i].rem_addr.sin_addr.s_addr == 0 ||
server[i].rem_addr.sin_port == 0)
{
app_perror("...ERROR address not set", rc);
status = -150;
goto on_error;
}
// Register newly accepted socket.
rc = pj_ioqueue_register_sock(pool, ioque, server[i].sock, NULL,
&test_cb, &server[i].key);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR in pj_ioqueue_register_sock", rc);
status = -160;
goto on_error;
}
// Test send and receive.
t_elapsed.u32.lo = 0;
status = send_recv_test(ioque, server[i].key, client[i].key,
send_buf, recv_buf, bufsize, &t_elapsed);
if (status != 0) {
goto on_error;
}
}
// Success
status = 0;
for (i=0; i<MAX_PAIR; ++i) {
if (server[i].key != NULL) {
pj_ioqueue_unregister(server[i].key);
server[i].key = NULL;
server[i].sock = PJ_INVALID_SOCKET;
} else if (server[i].sock != PJ_INVALID_SOCKET) {
pj_sock_close(server[i].sock);
server[i].sock = PJ_INVALID_SOCKET;
}
if (client[i].key != NULL) {
pj_ioqueue_unregister(client[i].key);
client[i].key = NULL;
client[i].sock = PJ_INVALID_SOCKET;
} else if (client[i].sock != PJ_INVALID_SOCKET) {
pj_sock_close(client[i].sock);
client[i].sock = PJ_INVALID_SOCKET;
}
}
}
status = 0;
on_error:
for (i=0; i<MAX_PAIR; ++i) {
if (server[i].key != NULL) {
pj_ioqueue_unregister(server[i].key);
server[i].key = NULL;
server[i].sock = PJ_INVALID_SOCKET;
} else if (server[i].sock != PJ_INVALID_SOCKET) {
pj_sock_close(server[i].sock);
server[i].sock = PJ_INVALID_SOCKET;
}
if (client[i].key != NULL) {
pj_ioqueue_unregister(client[i].key);
client[i].key = NULL;
server[i].sock = PJ_INVALID_SOCKET;
} else if (client[i].sock != PJ_INVALID_SOCKET) {
pj_sock_close(client[i].sock);
client[i].sock = PJ_INVALID_SOCKET;
}
}
if (listener.key) {
pj_ioqueue_unregister(listener.key);
listener.key = NULL;
} else if (listener.sock != PJ_INVALID_SOCKET) {
pj_sock_close(listener.sock);
listener.sock = PJ_INVALID_SOCKET;
}
if (ioque != NULL)
pj_ioqueue_destroy(ioque);
pj_pool_release(pool);
return status;
}
int tcp_ioqueue_test()
{
int status;
PJ_LOG(3, (THIS_FILE, "..%s compliance test 0 (success scenario)",
pj_ioqueue_name()));
if ((status=compliance_test_0()) != 0) {
PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
return status;
}
PJ_LOG(3, (THIS_FILE, "..%s compliance test 1 (failed scenario)",
pj_ioqueue_name()));
if ((status=compliance_test_1()) != 0) {
PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
return status;
}
PJ_LOG(3, (THIS_FILE, "..%s compliance test 2 (repeated accept)",
pj_ioqueue_name()));
if ((status=compliance_test_2()) != 0) {
PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
return status;
}
return 0;
}
#endif /* PJ_HAS_TCP */
#else
/* To prevent warning about "translation unit is empty"
* when this test is disabled.
*/
int dummy_uiq_tcp;
#endif /* INCLUDE_TCP_IOQUEUE_TEST */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -