📄 olfile.cpp
字号:
if (-1 == ns)
{
fprintf(stderr, "Listening error.\n");
ret = -1;
}
else
{
ret = srv_op(s_con_arg, ns);
}
close_conn(ns);
}
else if ( CLIENT == s_con_arg->peer_mode )
{
// entered client mode
ret = clt_op(s_con_arg);
}
t_end = time(&t_end);
t_interval = t_end - t_start;
t_hou = t_interval/3600;
t_min = (t_interval%3600)/60;
t_sec = (t_interval%3600)%60;
fprintf(stderr, "\nOperation consumes %02dh:%02dm:%02ds\n", t_hou, t_min, t_sec);
DEL_CHAR_BUF(lanin_buf);
DEL_CHAR_BUF(lanout_buf);
DEL_CHAR_BUF(gfin_buf);
DEL_CHAR_BUF(gfout_buf);
if ( s_con_arg->sfd > 0 ) {
close_conn(s_con_arg->sfd);
}
delete s_con_arg;
return ret;
}
/** send hello message.
out_buf format:
[operation head + file path]
*/
int
send_hello(struct st_con_arg *con, // connection context
struct st_op_head *s_op_head, // operation head
char *out_buf, // send buffer pool
int buflen) // 0, this value is calculated in the function.
{
char *pbuf = out_buf;
s_op_head->msg_type = MSG_HELLO;
s_op_head->cfm_mode = con->cfm_mode;
if ( DATA_SNK == con->con_mode)
s_op_head->len = OP_HEAD_SIZE + con->src_addr.pathlen;
else{
s_op_head->len = OP_HEAD_SIZE + con->sink_addr.pathlen;
s_op_head->filelen = con->fstat.st_size;
}
memcpy(pbuf, s_op_head, OP_HEAD_SIZE);
pbuf = out_buf + OP_HEAD_SIZE;
if ( DATA_SNK == con->con_mode ) {
// local peer is data source, so need the destination file path in remote peer
memcpy(pbuf, con->src_addr.ppath, con->src_addr.pathlen);
}
else
{ // local peer is data source, so need the destination file path in remote peer
memcpy(pbuf, con->sink_addr.ppath, con->sink_addr.pathlen);
}
fprintf(stderr, "Hello message sent.\n");
// tell the remote a read operation is attempted.
return send(con->sfd, out_buf, s_op_head->len, 0);
}
/** process hello message.
hello message format:
[operation head + file path]
*/
int
recv_hello(struct st_con_arg *s_con_arg, struct st_op_head *s_op_head, char *in_buf, int buflen)
{
char *phead, *ppayload;
int in_len;
int ret = 0;
fprintf(stderr, "Receiveing Hello message...");
in_len = recv(s_con_arg->sfd, in_buf, MAX_MSG_LEN, MSG_PARTIAL);
fprintf(stderr, "done.\n");
s_con_arg->tot_recv += in_len;
phead = in_buf;
ppayload = in_buf + OP_HEAD_SIZE;
memcpy(s_op_head, phead, OP_HEAD_SIZE);
if ( MSG_HELLO == s_op_head->msg_type ) {
fprintf(stderr, "Hello message received.\n");
// hello message received
switch(s_op_head->op_type) {
case OP_READ:
fprintf(stderr, "A read operation is scheduled.\n");
s_con_arg->con_mode = DATA_SRC;
s_con_arg->src_addr.pathlen = s_op_head->len - OP_HEAD_SIZE;
s_con_arg->src_addr.ppath = new char [s_con_arg->src_addr.pathlen];
memcpy(s_con_arg->src_addr.ppath, ppayload, s_con_arg->src_addr.pathlen);
s_con_arg->src_addr.ppath[s_con_arg->src_addr.pathlen] = 0;
//s_con_arg->fd = open(s_con_arg->src_addr.ppath, O_RDONLY|O_BINARY);
s_con_arg->fp = fopen(s_con_arg->src_addr.ppath, "r+b");
if (NULL == s_con_arg->fp){//(0 > s_con_arg->fd) {
fprintf(stderr, "File %s can't be opened for read.\n", s_con_arg->src_addr.ppath);
s_op_head->ret_val = OP_SRCFILEOPEN;
ret = -1;
}
else{
fprintf(stderr, "File %s opened for read.\n", s_con_arg->src_addr.ppath);
s_con_arg->fd = open(s_con_arg->src_addr.ppath, O_RDONLY);
if (s_con_arg->fd > 0) {
fstat(s_con_arg->fd, &(s_con_arg->fstat));
s_op_head->filelen = s_con_arg->fstat.st_size;
}
}
break;
case OP_WRITE:
fprintf(stderr, "A write operation is scheduled.\n");
s_con_arg->con_mode = DATA_SNK;
s_con_arg->sink_addr.pathlen = s_op_head->len - OP_HEAD_SIZE;
s_con_arg->sink_addr.ppath = new char [s_con_arg->sink_addr.pathlen];
memcpy(s_con_arg->sink_addr.ppath, ppayload, s_con_arg->sink_addr.pathlen);
s_con_arg->sink_addr.ppath[s_con_arg->sink_addr.pathlen] = 0;
//s_con_arg->fd = open(s_con_arg->sink_addr.ppath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY);
s_con_arg->fp = fopen(s_con_arg->sink_addr.ppath, "w+b");
if(NULL == s_con_arg->fp ){ //( 0 > s_con_arg->fd) {
fprintf(stderr, "File %s can't be opened for write.\n", s_con_arg->sink_addr.ppath);
s_op_head->ret_val = OP_SNKFILEOPEN;
ret = -1;
}
else{
fprintf(stderr, "File %s opened for write.\n", s_con_arg->sink_addr.ppath);
s_con_arg->fstat.st_size = s_op_head->filelen;
}
break;
default:
fprintf(stderr, "Unspecified operation.\n");
}
}
else
{
fprintf(stderr, "A hello message is needed.\n");
ret = -1;
}
return ret;
}
/** send helloack message.
*/
int
send_helloack(struct st_con_arg *con, struct st_op_head *s_op_head, char *out_buf, int buflen)
{
int ret = 0;
s_op_head->msg_type = MSG_HELLOACK;
s_op_head->len = OP_HEAD_SIZE;
memcpy(out_buf, s_op_head, OP_HEAD_SIZE);
fprintf(stderr, "HelloAck message sent.\n");
return send(con->sfd, out_buf, s_op_head->len, 0);
}
/** process helloack message.
*/
int
recv_helloack(struct st_con_arg *con, struct st_op_head *s_op_head, char *in_buf, int buflen)
{
char *pbuf;
int ret = 0;
ret = recv(con->sfd, in_buf, buflen, MSG_PARTIAL);
if (-1 != ret) {
pbuf = in_buf;
memcpy(s_op_head, in_buf, OP_HEAD_SIZE);
if (s_op_head->msg_type == MSG_HELLOACK) {
fprintf(stderr, "HelloAck message received.\n");
con->fstat.st_size = s_op_head->filelen;
if ( OP_SUCCESS != s_op_head->ret_val) {
ret = -1;
if ( OP_SNKFILEOPEN == s_op_head->ret_val ) {
fprintf(stderr, "Data sink can't open file %s for write.\n", con->sink_addr.ppath);
}else if ( OP_SRCFILEOPEN == s_op_head->ret_val) {
fprintf(stderr, "Data source can't open file %s for read.\n", con->src_addr.ppath);
}
}
}
else
{
fprintf(stderr, "HelloAck message is needed.\n");
ret = -1;
}
}
return ret;
}
/** send data to LLP.
out_buf format:
operation head + payload]
*/
int
send_data(struct st_con_arg *con, // connection context
struct st_op_head *s_op_head, // operation head
char *out_buf, // send buffer pool
int buflen) // desired send buffer length
{
s_op_head->len = buflen;
s_op_head->msg_type = MSG_DATA;
memcpy(out_buf, s_op_head, OP_HEAD_SIZE);
return send(con->sfd, out_buf, buflen, 0);
}
/** receive data from LLP.
in_buf format:
[operation head + payload]
*/
int
recv_data(struct st_con_arg *con, // connection context
struct st_op_head *s_op_head, // operation head
char *in_buf, // receive buffer pool
int buflen) // received buffer length.
{
int recv_len = recv(con->sfd, in_buf, buflen, MSG_PARTIAL);
if (recv_len >= 0) {
memcpy(s_op_head, in_buf, OP_HEAD_SIZE);
if (MSG_DATA != s_op_head->msg_type) {
recv_len = -1;
}
}
return recv_len;
}
/** when one peer received a data message, it should send out a data ack message to
inform the remote peer that the data is received.
*/
int
send_dataack(struct st_con_arg *con,
struct st_op_head *s_op_head,
char *out_buf,
int buflen)
{
// the return value of the received data is setted out side.
s_op_head->len = OP_HEAD_SIZE; // the dataack message only has a op head.
s_op_head->msg_type = MSG_DATAACK;
memcpy(out_buf, s_op_head, OP_HEAD_SIZE);
return send(con->sfd, out_buf, s_op_head->len, 0);
}
/** when one peer received a dataack message, it should continue to send out a data message
to remote peer.
*/
int
recv_dataack(struct st_con_arg *con,
struct st_op_head *s_op_head,
char *in_buf,
int buflen)
{
int recv_len = recv(con->sfd, in_buf, OP_HEAD_SIZE, MSG_PARTIAL);
if (recv_len >= 0) {
memcpy(s_op_head, in_buf, OP_HEAD_SIZE);
if (MSG_DATAACK != s_op_head->msg_type) {
recv_len = -1;
}
}
return recv_len;
}
/** server side operations, send hello message,
and receive the helloack message to handing,
or receive hello messages, and send helloack message to handing,
then send/receive data to/from the remote peer.
*/
int srv_op(struct st_con_arg* s_con_arg, unsigned int s)
{
s_con_arg->sfd = s;
return proc_op(s_con_arg);
}
/** client side operations, send hello messages,
and receive the helloack message to handing,
or receive hello messages, and send helloack message to handling,
then send/receive data to/from the remote peer.
*/
int clt_op(struct st_con_arg* s_con_arg)
{
unsigned int s; // socket fd
int ret = 0;
char *ftopen; // file to open
//int openflg;
char *openflag;
// Open the file for write.
if ( DATA_SNK == s_con_arg->con_mode) {
ftopen = s_con_arg->sink_addr.ppath;
if (OVERIDE == s_con_arg->cfm_mode) {
//openflg = O_WRONLY|O_CREAT|O_TRUNC|O_BINARY;
openflag = "w+b";
}
}else{
ftopen = s_con_arg->src_addr.ppath;
//openflg = O_RDONLY|O_BINARY;
openflag = "r+b";
s_con_arg->fd = open(s_con_arg->src_addr.ppath, O_RDONLY);
if (s_con_arg->fd > 0) {
fstat(s_con_arg->fd, &(s_con_arg->fstat));
}
}
//s_con_arg->fd = open(ftopen, openflg);
s_con_arg->fp = fopen(ftopen, openflag);
if (NULL == s_con_arg->fp){//(s_con_arg->fd < 0) {
fprintf(stderr, "File %s can't be opened.\n", ftopen);
return -1;
}
// attempt to connect to remote service
if (-1 == (s = open_conn(s_con_arg->str_ip)))
{
return -1;
}
s_con_arg->sfd = s;
ret = proc_op(s_con_arg);
return ret;
}
/** process operations either server side or client side.
*/
int
proc_op(struct st_con_arg *s_con_arg)
{
char *phead, *ppayload;
int in_len; // bytes read from the nic.
int payload_len; // bytes followed the st_op_head.
int out_len; // bytes send to nic.
struct st_op_head s_op_head;
int ret =0;
/** treat below 4 buffer pointer as buffer pool.*/
char *in_buf = lanin_buf;
char *out_buf = lanout_buf;
char *fin_buf = gfin_buf;
char *fout_buf = gfout_buf;
s_con_arg->tot_file_recv = 0;
s_con_arg->tot_file_send = 0;
s_con_arg->tot_recv = 0;
s_con_arg->tot_send = 0;
if (s_con_arg->peer_mode == SERVER) {
// wait for hello message.
ret = recv_hello(s_con_arg, &s_op_head, in_buf, MAX_MSG_LEN);
}
if (s_con_arg->con_mode == DATA_SNK) {
// the local peer is data sink, perform a reading operation from remote peer.
if (CLIENT == s_con_arg->peer_mode) {
s_reg_sem.send = send_hello;
s_reg_sem.con = s_con_arg;
s_op_head.op_type = OP_READ;
s_op_head.ret_val = OP_SUCCESS;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -