📄 jk_channel_socket.c
字号:
"channelSocket.open() connect failed %s:%d %d %s \n", socketInfo->host, socketInfo->port, errno, strerror( errno ) ); return JK_ERR; } /* Enable the use of keep-alive packets on TCP connection */ if(keepalive) { int set = 1; setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,(char *)&set,sizeof(set)); } /* Disable the Nagle algorithm if ndelay is set */ if(ndelay) { int set = 1; setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,(char *)&set,sizeof(set)); } #if defined(F_SETFD) && defined(FD_CLOEXEC) /* Protect the socket so that it will not be inherited by child processes */ fcntl(sock, F_SETFD, FD_CLOEXEC);#endif if( ch->mbean->debug > 0 ) env->l->jkLog(env, env->l, JK_LOG_DEBUG, "channelSocket.connect(), sock = %d\n", sock); endpoint->sd=sock; return JK_OK;}/** close the socket ( was: jk2_close_socket )*/static int JK_METHOD jk2_channel_socket_close(jk_env_t *env,jk_channel_t *ch, jk_endpoint_t *endpoint){ int sd; sd=endpoint->sd; endpoint->sd=-1; /* nothing else to clean, the socket_data was allocated ouf of * endpoint's pool */ return jk2_close_socket(env, sd);}/** send a long message * @param sd opened socket. * @param b buffer containing the data. * @param len length to send. * @return -2: send returned 0 ? what this that ? * -3: send failed. * JK_OK: success * @bug this fails on Unixes if len is too big for the underlying * protocol. * @was: jk_tcp_socket_sendfull */static int JK_METHOD jk2_channel_socket_send(jk_env_t *env, jk_channel_t *ch, jk_endpoint_t *endpoint, jk_msg_t *msg) { unsigned char *b; int len; int sd; int sent=0; sd=endpoint->sd; if( sd<0 ) return JK_ERR; msg->end( env, msg ); len=msg->len; b=msg->buf; while(sent < len) {#ifdef WIN32 int this_time = send(sd, (char *)b + sent , len - sent, 0);#else int this_time = write(sd, (char *)b + sent , len - sent);#endif if(0 == this_time) { return -2; } if(this_time < 0) { return this_time; } sent += this_time; } /* return sent; */ return JK_OK; /* 0 */}/** receive len bytes. * @param sd opened socket. * @param b buffer to store the data. * @param len length to receive. * @return -1: receive failed or connection closed. * >0: length of the received data. * Was: tcp_socket_recvfull */static int JK_METHOD jk2_channel_socket_readN( jk_env_t *env, jk_channel_t *ch, jk_endpoint_t *endpoint, unsigned char *b, int len ){ int sd; int rdlen; sd=endpoint->sd; rdlen = 0; if( sd<0 ) return JK_ERR; while(rdlen < len) {#ifdef WIN32 /* WIN32 read cannot operate on sockets */ int this_time = recv(sd, (char *)b + rdlen, len - rdlen, 0); #else int this_time = read(sd, (char *)b + rdlen, len - rdlen); #endif if(-1 == this_time) {#ifdef WIN32 if(SOCKET_ERROR == this_time) { errno = WSAGetLastError() - WSABASEERR; }#endif /* WIN32 */ if(EAGAIN == errno) { continue; } return -1; } if(0 == this_time) { return -1; } rdlen += this_time; } return rdlen; }static int JK_METHOD jk2_channel_socket_readN2( jk_env_t *env, jk_channel_t *ch, jk_endpoint_t *endpoint, unsigned char *b, int minLen, int maxLen ){ int sd; int rdlen; sd=endpoint->sd; rdlen = 0; if( sd<0 ) return JK_ERR; while(rdlen < minLen ) {#ifdef WIN32 /* WIN32 read cannot operate on sockets */ int this_time = recv(sd, (char *)b + rdlen, maxLen - rdlen, 0); #else int this_time = read(sd, (char *)b + rdlen, maxLen - rdlen); #endif/* fprintf(stderr, "XXX received %d\n", this_time ); */ if(-1 == this_time) {#ifdef WIN32 if(SOCKET_ERROR == this_time) { errno = WSAGetLastError() - WSABASEERR; }#endif /* WIN32 */ if(EAGAIN == errno) { continue; } return -1; } if(0 == this_time) { return -1; } rdlen += this_time; } return rdlen; }/** receive len bytes. * @param sd opened socket. * @param b buffer to store the data. * @param len length to receive. * @return -1: receive failed or connection closed. * >0: length of the received data. * Was: tcp_socket_recvfull */static int JK_METHOD jk2_channel_socket_recv( jk_env_t *env, jk_channel_t *ch, jk_endpoint_t *endpoint, jk_msg_t *msg ){ int hlen=msg->headerLength; int blen; int rc; jk2_channel_socket_readN( env, ch, endpoint, msg->buf, hlen ); blen=msg->checkHeader( env, msg, endpoint ); if( blen < 0 ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "channelSocket.receive(): Bad header\n" ); return JK_ERR; } rc= jk2_channel_socket_readN( env, ch, endpoint, msg->buf + hlen, blen); if(rc < 0) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "channelSocket.receive(): Error receiving message body %d %d\n", rc, errno); return JK_ERR; } if( ch->mbean->debug > 0 ) env->l->jkLog(env, env->l, JK_LOG_DEBUG, "channelSocket.receive(): Received len=%d type=%d\n", blen, (int)msg->buf[hlen]); return JK_OK;}static int JK_METHOD jk2_channel_socket_recvNew( jk_env_t *env, jk_channel_t *ch, jk_endpoint_t *endpoint, jk_msg_t *msg ){ int hlen=msg->headerLength; int blen; int inBuf=0; if( endpoint->bufPos > 0 ) { memcpy( msg->buf, endpoint->readBuf, endpoint->bufPos ); inBuf=endpoint->bufPos; endpoint->bufPos=0; } /* Read at least hlen, at most maxlen ( we try to minimize the number of read() operations ) */ if( inBuf < hlen ) { /* Need more data to get the header */ int newData=jk2_channel_socket_readN2( env, ch, endpoint, msg->buf + inBuf, hlen - inBuf, msg->maxlen - inBuf ); if(newData < 0) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "channelSocket.receive(): Error receiving message head %d %d\n", inBuf, errno); return JK_ERR; } inBuf+=newData; } blen=msg->checkHeader( env, msg, endpoint ); if( blen < 0 ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "channelSocket.receive(): Bad header\n" ); return JK_ERR; } if( inBuf < hlen + blen ) { /* We need more data */ int newData=jk2_channel_socket_readN2( env, ch, endpoint, msg->buf + inBuf, blen + hlen - inBuf, msg->maxlen - inBuf ); inBuf+=newData; if(newData < 0) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "channelSocket.receive(): Error receiving message body %d %d\n", newData, errno); return JK_ERR; } } /* Now we have enough data - possibly more */ endpoint->bufPos = inBuf - hlen - blen; if( endpoint->bufPos > 0 ) { memcpy( endpoint->readBuf, msg->buf + hlen + blen, endpoint->bufPos ); } if( ch->mbean->debug > 0 ) env->l->jkLog(env, env->l, JK_LOG_DEBUG, "channelSocket.receive(): Received len=%d type=%d total=%d\n", blen, (int)msg->buf[hlen], inBuf ); return JK_OK;}int JK_METHOD jk2_channel_socket_factory(jk_env_t *env, jk_pool_t *pool, jk_bean_t *result, const char *type, const char *name){ jk_channel_t *ch; ch=(jk_channel_t *)pool->calloc(env, pool, sizeof( jk_channel_t)); ch->_privatePtr= (jk_channel_socket_private_t *) pool->calloc( env, pool, sizeof( jk_channel_socket_private_t)); ch->recv= jk2_channel_socket_recv; ch->send= jk2_channel_socket_send; ch->open= jk2_channel_socket_open; ch->close= jk2_channel_socket_close; ch->is_stream=JK_TRUE; result->setAttribute= jk2_channel_socket_setAttribute; result->init= jk2_channel_socket_init; /*result->getAttributeInfo=jk2_channel_socket_getAttributeInfo;*/ result->multiValueInfo=jk2_channel_socket_multiValueInfo; result->setAttributeInfo=jk2_channel_socket_setAttributeInfo; result->object= ch; ch->mbean=result; ch->workerEnv=env->getByName( env, "workerEnv" ); ch->workerEnv->addChannel( env, ch->workerEnv, ch ); return JK_OK;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -