⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioq_udp.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 2 页
字号:
	return -180;    }    /* Just to make sure things are settled.. */    pj_thread_sleep(100);    /* Start reading again. */    bytes = sizeof(recvbuf);    status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);    if (status != PJ_EPENDING) {	app_perror("Expecting PJ_EPENDING, but got this", status);	return -190;    }    /* Reset packet counter */    packet_cnt = 0;    /* Send one packet. */    bytes = sizeof(sendbuf);    status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,			    &addr, sizeof(addr));    if (status != PJ_SUCCESS) {	app_perror("sendto error", status);	return -200;    }    /* Now unregister and close socket. */    pj_ioqueue_unregister(key);    /* Poll ioqueue. */    timeout.sec = 1; timeout.msec = 0;    pj_ioqueue_poll(ioqueue, &timeout);    /* Must NOT receive any packets after socket is closed! */    if (packet_cnt > 0) {	PJ_LOG(3,(THIS_FILE, "....errror: not expecting to receive packet "			     "after socket has been closed"));	return -210;    }    /* Success */    pj_sock_close(ssock);    pj_ioqueue_destroy(ioqueue);    pj_pool_release(pool);    return 0;}/* * Testing with many handles. * This will just test registering PJ_IOQUEUE_MAX_HANDLES count * of sockets to the ioqueue. */static int many_handles_test(void){    enum { MAX = PJ_IOQUEUE_MAX_HANDLES };    pj_pool_t *pool;    pj_ioqueue_t *ioqueue;    pj_sock_t *sock;    pj_ioqueue_key_t **key;    pj_status_t rc;    int count, i; /* must be signed */    PJ_LOG(3,(THIS_FILE,"...testing with so many handles"));    pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);    if (!pool)	return PJ_ENOMEM;    key = pj_pool_alloc(pool, MAX*sizeof(pj_ioqueue_key_t*));    sock = pj_pool_alloc(pool, MAX*sizeof(pj_sock_t));        /* Create IOQueue */    rc = pj_ioqueue_create(pool, MAX, &ioqueue);    if (rc != PJ_SUCCESS || ioqueue == NULL) {	app_perror("...error in pj_ioqueue_create", rc);	return -10;    }    /* Register as many sockets. */    for (count=0; count<MAX; ++count) {	sock[count] = PJ_INVALID_SOCKET;	rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &sock[count]);	if (rc != PJ_SUCCESS || sock[count] == PJ_INVALID_SOCKET) {	    PJ_LOG(3,(THIS_FILE, "....unable to create %d-th socket, rc=%d", 				 count, rc));	    break;	}	key[count] = NULL;	rc = pj_ioqueue_register_sock(pool, ioqueue, sock[count],				      NULL, &test_cb, &key[count]);	if (rc != PJ_SUCCESS || key[count] == NULL) {	    PJ_LOG(3,(THIS_FILE, "....unable to register %d-th socket, rc=%d", 				 count, rc));	    return -30;	}    }    /* Test complete. */    /* Now deregister and close all handles. */     /* NOTE for RTEMS:     *  It seems that the order of close(sock) is pretty important here.     *  If we close the sockets with the same order as when they were created,     *  RTEMS doesn't seem to reuse the sockets, thus next socket created     *  will have descriptor higher than the last socket created.     *  If we close the sockets in the reverse order, then the descriptor will     *  get reused.     *  This used to cause problem with select ioqueue, since the ioqueue     *  always gives FD_SETSIZE for the first select() argument. This ioqueue     *  behavior can be changed with setting PJ_SELECT_NEEDS_NFDS macro.     */    for (i=count-1; i>=0; --i) {    ///for (i=0; i<count; ++i) {	rc = pj_ioqueue_unregister(key[i]);	if (rc != PJ_SUCCESS) {	    app_perror("...error in pj_ioqueue_unregister", rc);	}    }    rc = pj_ioqueue_destroy(ioqueue);    if (rc != PJ_SUCCESS) {	app_perror("...error in pj_ioqueue_destroy", rc);    }        pj_pool_release(pool);    PJ_LOG(3,(THIS_FILE,"....many_handles_test() ok"));    return 0;}/* * Multi-operation test. *//* * Benchmarking IOQueue */static int bench_test(int bufsize, int inactive_sock_count){    pj_sock_t ssock=-1, csock=-1;    pj_sockaddr_in addr;    pj_pool_t *pool = NULL;    pj_sock_t *inactive_sock=NULL;    pj_ioqueue_op_key_t *inactive_read_op;    char *send_buf, *recv_buf;    pj_ioqueue_t *ioque = NULL;    pj_ioqueue_key_t *skey, *ckey, *key;    pj_timestamp t1, t2, t_elapsed;    int rc=0, i;    /* i must be signed */    pj_str_t temp;    char errbuf[PJ_ERR_MSG_SIZE];    TRACE__((THIS_FILE, "   bench test %d", inactive_sock_count));    // Create pool.    pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);    // Allocate buffers for send and receive.    send_buf = (char*)pj_pool_alloc(pool, bufsize);    recv_buf = (char*)pj_pool_alloc(pool, bufsize);    // Allocate sockets for sending and receiving.    rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &ssock);    if (rc == PJ_SUCCESS) {        rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &csock);    } else        csock = PJ_INVALID_SOCKET;    if (rc != PJ_SUCCESS) {	app_perror("...error: pj_sock_socket()", rc);	goto on_error;    }    // Bind server socket.    pj_bzero(&addr, sizeof(addr));    addr.sin_family = PJ_AF_INET;    addr.sin_port = pj_htons(PORT);    if (pj_sock_bind(ssock, &addr, sizeof(addr)))	goto on_error;    pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES);    // Create I/O Queue.    rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);    if (rc != PJ_SUCCESS) {	app_perror("...error: pj_ioqueue_create()", rc);	goto on_error;    }    // Allocate inactive sockets, and bind them to some arbitrary address.    // Then register them to the I/O queue, and start a read operation.    inactive_sock = (pj_sock_t*)pj_pool_alloc(pool, 				    inactive_sock_count*sizeof(pj_sock_t));    inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool,                              inactive_sock_count*sizeof(pj_ioqueue_op_key_t));    pj_bzero(&addr, sizeof(addr));    addr.sin_family = PJ_AF_INET;    for (i=0; i<inactive_sock_count; ++i) {        pj_ssize_t bytes;	rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]);	if (rc != PJ_SUCCESS || inactive_sock[i] < 0) {	    app_perror("...error: pj_sock_socket()", rc);	    goto on_error;	}	if ((rc=pj_sock_bind(inactive_sock[i], &addr, sizeof(addr))) != 0) {	    pj_sock_close(inactive_sock[i]);	    inactive_sock[i] = PJ_INVALID_SOCKET;	    app_perror("...error: pj_sock_bind()", rc);	    goto on_error;	}	rc = pj_ioqueue_register_sock(pool, ioque, inactive_sock[i], 			              NULL, &test_cb, &key);	if (rc != PJ_SUCCESS) {	    pj_sock_close(inactive_sock[i]);	    inactive_sock[i] = PJ_INVALID_SOCKET;	    app_perror("...error(1): pj_ioqueue_register_sock()", rc);	    PJ_LOG(3,(THIS_FILE, "....i=%d", i));	    goto on_error;	}        bytes = bufsize;	rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0);	if (rc != PJ_EPENDING) {	    pj_sock_close(inactive_sock[i]);	    inactive_sock[i] = PJ_INVALID_SOCKET;	    app_perror("...error: pj_ioqueue_read()", rc);	    goto on_error;	}    }    // Register server and client socket.    // We put this after inactivity socket, hopefully this can represent the    // worst waiting time.    rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL, 			          &test_cb, &skey);    if (rc != PJ_SUCCESS) {	app_perror("...error(2): pj_ioqueue_register_sock()", rc);	goto on_error;    }    rc = pj_ioqueue_register_sock(pool, ioque, csock, NULL, 			          &test_cb, &ckey);    if (rc != PJ_SUCCESS) {	app_perror("...error(3): pj_ioqueue_register_sock()", rc);	goto on_error;    }    // Set destination address to send the packet.    pj_sockaddr_in_init(&addr, pj_cstr(&temp, "127.0.0.1"), PORT);    // Test loop.    t_elapsed.u64 = 0;    for (i=0; i<LOOP; ++i) {	pj_ssize_t bytes;        pj_ioqueue_op_key_t read_op, write_op;	// Randomize send buffer.	pj_create_random_string(send_buf, bufsize);	// Start reading on the server side.        bytes = bufsize;	rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);	if (rc != PJ_EPENDING) {	    app_perror("...error: pj_ioqueue_read()", rc);	    break;	}	// Starts send on the client side.        bytes = bufsize;	rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0,			       &addr, sizeof(addr));	if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {	    app_perror("...error: pj_ioqueue_write()", rc);	    break;	}	if (rc == PJ_SUCCESS) {	    if (bytes < 0) {		app_perror("...error: pj_ioqueue_sendto()", -bytes);		break;	    }	}	// Begin time.	pj_get_timestamp(&t1);	// Poll the queue until we've got completion event in the server side.        callback_read_key = NULL;        callback_read_size = 0;	TRACE__((THIS_FILE, "     waiting for key = %p", skey));	do {	    pj_time_val timeout = { 1, 0 };	    rc = pj_ioqueue_poll(ioque, &timeout);	    TRACE__((THIS_FILE, "     poll rc=%d", rc));	} while (rc >= 0 && callback_read_key != skey);	// End time.	pj_get_timestamp(&t2);	t_elapsed.u64 += (t2.u64 - t1.u64);	if (rc < 0) {	    app_perror("   error: pj_ioqueue_poll", -rc);	    break;	}	// Compare recv buffer with send buffer.	if (callback_read_size != bufsize || 	    pj_memcmp(send_buf, recv_buf, bufsize)) 	{	    rc = -10;	    PJ_LOG(3,(THIS_FILE, "   error: size/buffer mismatch"));	    break;	}	// Poll until all events are exhausted, before we start the next loop.	do {	    pj_time_val timeout = { 0, 10 };	    rc = pj_ioqueue_poll(ioque, &timeout);	} while (rc>0);	rc = 0;    }    // Print results    if (rc == 0) {	pj_timestamp tzero;	pj_uint32_t usec_delay;	tzero.u32.hi = tzero.u32.lo = 0;	usec_delay = pj_elapsed_usec( &tzero, &t_elapsed);	PJ_LOG(3, (THIS_FILE, "...%10d %15d  % 9d", 	           bufsize, inactive_sock_count, usec_delay));    } else {	PJ_LOG(2, (THIS_FILE, "...ERROR rc=%d (buf:%d, fds:%d)", 			      rc, bufsize, inactive_sock_count+2));    }    // Cleaning up.    for (i=inactive_sock_count-1; i>=0; --i) {	pj_sock_close(inactive_sock[i]);    }    pj_sock_close(ssock);    pj_sock_close(csock);    pj_ioqueue_destroy(ioque);    pj_pool_release( pool);    return rc;on_error:    PJ_LOG(1,(THIS_FILE, "...ERROR: %s", 	      pj_strerror(pj_get_netos_error(), errbuf, sizeof(errbuf))));    if (ssock)	pj_sock_close(ssock);    if (csock)	pj_sock_close(csock);    for (i=0; i<inactive_sock_count && inactive_sock && 	      inactive_sock[i]!=PJ_INVALID_SOCKET; ++i)     {	pj_sock_close(inactive_sock[i]);    }    if (ioque != NULL)	pj_ioqueue_destroy(ioque);    pj_pool_release( pool);    return -1;}int udp_ioqueue_test(){    int status;    int bufsize, sock_count;    //goto pass1;    PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name()));    if ((status=compliance_test()) != 0) {	return status;    }    PJ_LOG(3, (THIS_FILE, "....compliance test ok"));    PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name()));    if ((status=unregister_test()) != 0) {	return status;    }    PJ_LOG(3, (THIS_FILE, "....unregister test ok"));    if ((status=many_handles_test()) != 0) {	return status;    }        //return 0;    PJ_LOG(4, (THIS_FILE, "...benchmarking different buffer size:"));    PJ_LOG(4, (THIS_FILE, "... note: buf=bytes sent, fds=# of fds, "			  "elapsed=in timer ticks"));//pass1:    PJ_LOG(3, (THIS_FILE, "...Benchmarking poll times for %s:", pj_ioqueue_name()));    PJ_LOG(3, (THIS_FILE, "...====================================="));    PJ_LOG(3, (THIS_FILE, "...Buf.size   #inactive-socks  Time/poll"));    PJ_LOG(3, (THIS_FILE, "... (bytes)                    (nanosec)"));    PJ_LOG(3, (THIS_FILE, "...====================================="));    //goto pass2;    for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) {	if ((status=bench_test(bufsize, SOCK_INACTIVE_MIN)) != 0)	    return status;    }//pass2:    bufsize = 512;    for (sock_count=SOCK_INACTIVE_MIN+2; 	 sock_count<=SOCK_INACTIVE_MAX+2; 	 sock_count *= 2)     {	//PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));	if ((status=bench_test(bufsize, sock_count-2)) != 0)	    return status;    }    return 0;}#else/* To prevent warning about "translation unit is empty" * when this test is disabled.  */int dummy_uiq_udp;#endif	/* INCLUDE_UDP_IOQUEUE_TEST */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -