⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp-client.c

📁 分布式文件系统
💻 C
字号:
/*   Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com>   This file is part of GlusterFS.   GlusterFS is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published   by the Free Software Foundation; either version 3 of the License,   or (at your option) any later version.   GlusterFS is distributed in the hope that it will be useful, but   WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU   General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program.  If not, see   <http://www.gnu.org/licenses/>.*/#ifndef _CONFIG_H#define _CONFIG_H#include "config.h"#endif#include "dict.h"#include "glusterfs.h"#include "transport.h"#include "protocol.h"#include "logging.h"#include "xlator.h"#include "tcp.h"static int32_ttcp_connect (struct transport *this){  tcp_private_t *priv = this->private;  dict_t *options = priv->options;  char non_blocking = 1;  if (!priv->options) {    priv->options = dict_copy (this->xl->options, NULL);    options = priv->options;  }  if (dict_get (options, "non-blocking-connect")) {    char *nb_connect =data_to_str (dict_get (options,					     "non-blocking-connect"));    if ((!strcasecmp (nb_connect, "off")) ||	(!strcasecmp (nb_connect, "no")))      non_blocking = 0;  }  struct sockaddr_in sin;  struct sockaddr_in sin_src;  int32_t ret = 0;  uint16_t try_port = CLIENT_PORT_CIELING;  struct pollfd poll_s;  int    nfds;  int    timeout = 0;  int optval_s;  unsigned int optvall_s = sizeof(int);  int window_size = 640 * 1024;  // Create the socket if no connection ?  if (priv->connected)    return 0;  if (!priv->connection_in_progress)  {    timeout = 100; /* If first attempt of reconnection, wait sometime */    priv->sock = socket (AF_INET, SOCK_STREAM, 0);        gf_log (this->xl->name, GF_LOG_DEBUG,	    "socket fd = %d", priv->sock);      if (priv->sock == -1) {      gf_log (this->xl->name, GF_LOG_ERROR,	      "socket () - error: %s", strerror (errno));      return -errno;    }    if (dict_get (this->xl->options, "window-size")) {      window_size = data_to_uint32 (dict_get (this->xl->options,					      "window-size"));      gf_log (this->xl->name, GF_LOG_DEBUG,	      "setting tcp window size %d", window_size);      setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, (char *)&window_size,		  sizeof (window_size));      setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, (char *)&window_size,		  sizeof (window_size));    }    // Find a local port avaiable for use    while (try_port) {       sin_src.sin_family = PF_INET;      sin_src.sin_port = htons (try_port); //FIXME: have it a #define or configurable      sin_src.sin_addr.s_addr = INADDR_ANY;            if ((ret = bind (priv->sock,		       (struct sockaddr *)&sin_src,		       sizeof (sin_src))) == 0) {	gf_log (this->xl->name, GF_LOG_DEBUG,		"finalized on port `%d'", try_port);	break;      }	      try_port--;    }	    if (ret != 0) {      gf_log (this->xl->name, GF_LOG_ERROR,	      "bind loop failed - error: %s", strerror (errno));      close (priv->sock);      return -errno;    }	    sin.sin_family = AF_INET;	    if (dict_get (options, "remote-port")) {      sin.sin_port = htons (data_to_uint64 (dict_get (options,						   "remote-port")));    } else {      gf_log (this->xl->name, GF_LOG_DEBUG,	      "defaulting remote-port to %d", GF_DEFAULT_LISTEN_PORT);      sin.sin_port = htons (GF_DEFAULT_LISTEN_PORT);    }	    if (dict_get (options, "remote-host")) {      sin.sin_addr.s_addr = gf_resolve_ip (data_to_str (dict_get (options,							       "remote-host")),					   &this->dnscache);    } else {      gf_log (this->xl->name, GF_LOG_DEBUG,	      "error: missing 'option remote-host <hostname>'");      close (priv->sock);      return -errno;    }    if (non_blocking) {    // TODO, others ioctl, ioctlsocket, IoctlSocket, or if dont support      fcntl (priv->sock, F_SETFL, O_NONBLOCK);    }    // Try to connect    ret = connect (priv->sock, (struct sockaddr *)&sin, sizeof (sin));        if (ret == -1) {      if (errno != EINPROGRESS)	{	gf_log (this->xl->name, GF_LOG_ERROR,		"error: not in progress - trace: %s",		strerror (errno));	close (priv->sock);	return -errno;      }    }    gf_log (this->xl->name, GF_LOG_DEBUG,	    "connect on %d in progress (non-blocking)", priv->sock);    priv->connection_in_progress = 1;    priv->connected = 0;  }  if (non_blocking) {    nfds = 1;    memset (&poll_s, 0, sizeof(poll_s));    poll_s.fd = priv->sock;    poll_s.events = POLLOUT;    ret = poll (&poll_s, nfds, timeout);     if (ret) {      /* success or not, connection is no more in progress */      priv->connection_in_progress = 0;      ret = getsockopt (priv->sock,			SOL_SOCKET,			SO_ERROR,			(void *)&optval_s,			&optvall_s);      if (ret) {	gf_log (this->xl->name, GF_LOG_ERROR, "%s: SOCKET ERROR");	close (priv->sock);	return -1;      }      if (optval_s) {	gf_log (this->xl->name, GF_LOG_ERROR,		"non-blocking connect() returned: %d (%s)",		optval_s, strerror (optval_s));	close (priv->sock);	return -1;      }    } else {      /* connection is still in progress */      gf_log (this->xl->name, GF_LOG_DEBUG,	      "connection on %d still in progress - try later",	       priv->sock);      return -1;    }  }  /* connection was successful */  gf_log (this->xl->name, GF_LOG_DEBUG,	  "connection on %d success", priv->sock);  if (non_blocking) {    int flags = fcntl (priv->sock, F_GETFL, 0);    fcntl (priv->sock, F_SETFL, flags & (~O_NONBLOCK));  }  socklen_t sock_len = sizeof (struct sockaddr_in);  getpeername (priv->sock, &this->peerinfo.sockaddr, &sock_len);  priv->connected = 1;  priv->connection_in_progress = 0;  poll_register (this->xl->ctx, priv->sock, transport_ref (this));  return 0;}static int32_ttcp_client_submit (transport_t *this, char *buf, int32_t len){  tcp_private_t *priv = this->private;  int32_t ret = 0;  pthread_mutex_lock (&priv->write_mutex);  if (!priv->connected) {    /*    ret = tcp_connect (this, priv->options);    if (ret == 0) {      poll_register (this->xl->ctx, priv->sock, transport_ref (this));      ret = gf_full_write (priv->sock, buf, len);    } else {      ret = -1;    }    */    ret = -1;  } else {    ret = gf_full_write (priv->sock, buf, len);  }  pthread_mutex_unlock (&priv->write_mutex);  return ret;}static int32_ttcp_client_writev (transport_t *this,		   const struct iovec *vector,		   int32_t count){  tcp_private_t *priv = this->private;  int32_t ret = 0;    pthread_mutex_lock (&priv->write_mutex);  if (!priv->connected) {    /*    ret = tcp_connect (this, priv->options);    if (ret == 0) {      poll_register (this->xl->ctx, priv->sock, transport_ref (this));      ret = gf_full_writev (priv->sock, vector, count);    } else {      ret = -1;    }    */    ret = -1;  } else {    ret = gf_full_writev (priv->sock, vector, count);  }  pthread_mutex_unlock (&priv->write_mutex);  return ret;}struct transport_ops transport_ops = {  //  .flush = tcp_flush,  .recieve = tcp_recieve,  .submit = tcp_client_submit,  .connect = tcp_connect,  .disconnect = tcp_disconnect,  .except = tcp_except,  .readv = tcp_readv,  .writev = tcp_client_writev,  .bail = tcp_bail,};int gf_transport_init (struct transport *this,		   dict_t *options,		   event_notify_fn_t notify){  tcp_private_t *priv;  priv = calloc (1, sizeof (tcp_private_t));  this->private = priv;  this->notify = notify;  pthread_mutex_init (&priv->read_mutex, NULL);  pthread_mutex_init (&priv->write_mutex, NULL);    priv->connection_in_progress = 0;  /*  ret = tcp_connect (this, options);  if (!ret) {    poll_register (this->xl->ctx, priv->sock, transport_ref (this));  }  if (ret) {    retry_data = dict_get (options, "background-retry");    if (retry_data) {      if (strcasecmp (data_to_str (retry_data), "off") == 0)	return -1;    }  }  */  return 0;}int gf_transport_fini (struct transport *this){  tcp_private_t *priv = this->private;  //  this->ops->flush (this);  dict_destroy (priv->options);  close (priv->sock);  free (priv);  return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -