📄 mover.c
字号:
{ Error(LOG_ERR, "ndmpdLocalRead: error sending ndmp_notify_mover_paused_request.\n"); ndmpdMoverError(session, NDMP_MOVER_HALT_INTERNAL_ERROR); return(-1); } /* * Process messages until the state is changed by * an abort, continue, or close request . */ for (;;) { if (ndmpdSelect(session, TRUE, HC_CLIENT) < 0) return(-1); if (session->eof == TRUE) return(-1); switch (session->mover.state) { case NDMP_MOVER_STATE_ACTIVE: break; case NDMP_MOVER_STATE_PAUSED: continue; default: return(-1); } } continue; } len = length - count; /* * Prevent reading past the end of the window. */ if (len > session->mover.windowOffset + session->mover.windowLength - session->mover.position) len = session->mover.windowOffset + session->mover.windowLength - session->mover.position; /* * Copy from the data buffer first. */ if (session->mover.wIndex - session->mover.rIndex != 0) { /* Limit the copy to the amount of data in the buffer. */ if (len > session->mover.wIndex - session->mover.rIndex) len = session->mover.wIndex - session->mover.rIndex; memcpy((void*)&data[count], (void*)&session->mover.buf[session->mover.rIndex], len); count += len; session->mover.rIndex += len; session->mover.bytesLeftToRead -= len; session->mover.position += len; continue; } /* * Determine if data needs to be buffered or * can be read directly to user supplied location. * We can fast path the read if at least a full record * needs to be read and there is no seek pending. * This is done to eliminate a buffer copy. */ if (len >= session->mover.recordSize && session->mover.position >= session->mover.seekPosition) { n = moverTapeRead(session, &data[count]); if (n <= 0) { ndmpdMoverError(session, (n == 0 ? NDMP_MOVER_HALT_ABORTED : NDMP_MOVER_HALT_INTERNAL_ERROR)); return(-1); } count += n; session->mover.bytesLeftToRead -= n; session->mover.position += n; continue; } /* Read the next record into the buffer. */ n = moverTapeRead(session, session->mover.buf); if (n <= 0) { ndmpdMoverError(session, (n == 0 ? NDMP_MOVER_HALT_ABORTED : NDMP_MOVER_HALT_INTERNAL_ERROR)); return(-1); } session->mover.wIndex = n; session->mover.rIndex = 0; /* * Discard data if the current data stream position is * prior to the seek position. This is necessary if a seek * request set the seek pointer to a position that is not a * record boundary. The seek request handler can only position * to the start of a record. */ if (session->mover.position < session->mover.seekPosition) { session->mover.rIndex = session->mover.seekPosition - session->mover.position; session->mover.position = session->mover.seekPosition; } } return(0);}/**** ndmpd internal functions ******************************************//* * ndmpdMoverInit * Initialize mover specific session variables. * DONT initialize variables such as recordSize that need to * persist across data operations. A client may open a connection and * do multiple backups after setting the recordSize. * * Parameters: * session (input) - session pointer. * * Returns: * void */voidndmpdMoverInit(NdmpdSession* session){ session->mover.state = NDMP_MOVER_STATE_IDLE; session->mover.pauseReason = NDMP_MOVER_PAUSE_NA; session->mover.haltReason = NDMP_MOVER_HALT_NA; session->mover.dataWritten = 0LL; session->mover.seekPosition = 0LL; session->mover.bytesLeftToRead = 0LL; session->mover.windowOffset = 0LL; session->mover.windowLength = 0xffffffffffffffffLL; session->mover.position = 0LL; session->mover.recordNum = 0; session->mover.listenSock = -1; session->mover.sock = -1; session->mover.rIndex = 0; session->mover.wIndex = 0;}/* * ndmpdMoverCleanup * Just a place holder for now. * * Parameters: * session (input) - session pointer. * * Returns: * void */voidndmpdMoverCleanup(NdmpdSession* session __attribute__ ((unused))){}/* * ndmpdMoverError * This function is called when an unrecoverable mover error * has been detected. A notify message is sent to the client and the * mover is placed into the halted state. * * Parameters: * session (input) - session pointer. * reason (input) - halt reason. * * Returns: * void. */voidndmpdMoverError(NdmpdSession* session, ndmp_mover_halt_reason reason){ ndmp_notify_mover_halted_request request; if (session->mover.state == NDMP_MOVER_STATE_HALTED || session->mover.state == NDMP_MOVER_STATE_IDLE) return; request.reason = reason; request.text_reason = ""; if (ndmpSendRequest(session->connection, NDMP_NOTIFY_MOVER_HALTED, NDMP_NO_ERR, (void *)&request, 0) < 0) { Error(LOG_ERR, "ndmpdMoverError: error sending notify_mover_halted request.\n"); } if (session->mover.listenSock != -1) { (void)ndmpdRemoveFileHandler(session, session->mover.listenSock); (void)close(session->mover.listenSock); session->mover.listenSock = -1; } if (session->mover.sock != -1) { (void)ndmpdRemoveFileHandler(session, session->mover.sock); (void)close(session->mover.sock); session->mover.sock = -1; } session->mover.state = NDMP_MOVER_STATE_HALTED; session->mover.haltReason = reason;}/* * ndmpdMoverSeek * Seek to the requested data stream position. * If the requested offset is outside of the current window, * the mover is paused and a notify_mover_paused request is sent * notifying the client that a seek is required. * If the requested offest is within the window but not within the * current record, then the tape is positioned to the record containing * the requested offest. * The requested amount of data is then read from the tape device and * written to the data connection. * * Parameters: * session (input) - session pointer. * offset (input) - data stream position to seek to. * length (input) - amount of data that will be read. * * Returns: * 1 - seek pending completion by the NDMP client. * 0 - seek successfully completed. * -1 - error. */intndmpdMoverSeek(NdmpdSession* session, u_longlong_t offset, u_longlong_t length){ u_longlong_t tapePosition; u_longlong_t bufPosition; struct mtop tapeop; session->mover.seekPosition = offset; session->mover.bytesLeftToRead = length; /* * If the requested position is outside of the window, * notify the client that a seek is required. */ if (session->mover.seekPosition < session->mover.windowOffset || session->mover.seekPosition >= session->mover.windowOffset + session->mover.windowLength) { ndmp_notify_mover_paused_request pauseRequest; session->mover.state = NDMP_MOVER_STATE_PAUSED; session->mover.pauseReason = NDMP_MOVER_PAUSE_SEEK; pauseRequest.reason = NDMP_MOVER_PAUSE_SEEK; pauseRequest.seek_position = longLongToQuad(offset); if (ndmpSendRequest(session->connection, NDMP_NOTIFY_MOVER_PAUSED, NDMP_NO_ERR, (void *)&pauseRequest, 0) < 0) { Error(LOG_ERR, "ndmpdMoverSeek: error sending ndmp_notify_mover_paused_request.\n"); return(-1); } return(1); } /* * Determine the data stream position of the first byte in the * data buffer. */ bufPosition = session->mover.position - (session->mover.position % session->mover.recordSize); /* * Determine the data stream position of the next byte that * will be read from tape. */ tapePosition = bufPosition; if (session->mover.wIndex != 0) tapePosition += session->mover.recordSize; /* * Check if requested position is for data that has been read and is * in the buffer. */ if (offset >= bufPosition && offset < tapePosition) { session->mover.position = offset; session->mover.rIndex = session->mover.position - bufPosition; return(0); } tapeop.mt_count = 0; if (tapePosition > session->mover.seekPosition) { /* Need to seek backward. */ tapeop.mt_op = MTBSR; tapeop.mt_count = ((tapePosition - offset - 1) / session->mover.recordSize) + 1; tapePosition -= ((u_longlong_t)tapeop.mt_count * (u_longlong_t)session->mover.recordSize); } else if (offset >= tapePosition + session->mover.recordSize) { /* Need to seek forward. */ tapeop.mt_op = MTFSR; tapeop.mt_count = ((offset - tapePosition) / session->mover.recordSize); tapePosition += ((u_longlong_t)tapeop.mt_count * (u_longlong_t)session->mover.recordSize); } /* Reposition the tape if necessary. */ if (tapeop.mt_count != 0) { if (ioctl(session->tape.fd, MTIOCTOP, &tapeop) < 0) { Error(LOG_ERR, "ndmpdMoverSeek: error positioning tape: %s.\n", strerror(errno)); return(-1); } } session->mover.position = tapePosition; session->mover.rIndex = 0; session->mover.wIndex = 0; return(0);}/*** static functions ***************************************************//* * moverListen * Creates a socket for listening for TCP/IP data connections * initiated by a remote data server. Configures the mover to * listen and accept a connection. * * Parameters: * session (input) - session pointer. * addr (output) - location to store address of socket. * port (output) - location to store port of socket. * * Returns: * 0 - success. * -1 - error. */static intmoverListen(NdmpdSession* session, u_long *addr, u_short *port){ struct hostent* hp; struct sockaddr_in sin; struct utsname uts; int length; if (uname(&uts) < 0) { Error(LOG_ERR, "moverListen: uname error: %s\n", strerror(errno)); return(-1); } if ((hp = gethostbyname(uts.nodename)) == 0) { Error(LOG_ERR, "moverListen: unknown host: %s\n", uts.nodename); return(-1); } *addr = ((struct in_addr*)(hp->h_addr))->s_addr; session->mover.listenSock = socket(AF_INET, SOCK_STREAM, 0); if (session->mover.listenSock < 0) { Error(LOG_ERR, "moverListen: socket error: %s.\n", strerror(errno)); return(-1); } sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = 0; if (bind(session->mover.listenSock, (struct sockaddr *) &sin, sizeof(sin)) < 0) { Error(LOG_ERR, "moverListen: bind error: %s.\n", strerror(errno)); (void)close(session->mover.listenSock); session->mover.listenSock = -1; return(-1); } length = sizeof(sin); if (getsockname(session->mover.listenSock, (struct sockaddr *) &sin, &length) < 0) { Error(LOG_ERR, "moverListen: getsockname error: %s.\n", strerror(errno)); (void)close(session->mover.listenSock); session->mover.listenSock = -1; return(-1); } if (listen(session->mover.listenSock, 5) < 0) { Error(LOG_ERR, "moverListen: listen error: %s.\n", strerror(errno)); (void)close(session->mover.listenSock); session->mover.listenSock = -1; return(-1); } /* * Add a file handler for the listen socket. * ndmpdSelect will call moverAcceptConnection when a * connection is ready to be accepted. */ if (ndmpdAddFileHandler(session, (void*)session, session->mover.listenSock, NDMPD_SELECT_MODE_READ, HC_MOVER, moverAcceptConnection) < 0) { (void)close(session->mover.listenSock); session->mover.listenSock = -1; return(-1); } *port = sin.sin_port; return(0);}/* * moverAcceptConnection * Accept a data connection from a data server. * Called by ndmpdSelect when a connection is pending on * the mover listen socket. * * Parameters: * cookie (input) - session pointer. * fd (input) - file descriptor. * mode (input) - select mode. * * Returns: * void. */static voidmoverAcceptConnection(void* cookie, int fd, u_long mode __attribute__ ((unused))){ NdmpdSession* session = (NdmpdSession*)cookie; struct sockaddr_in from; int from_len = sizeof(from); int flags; Debug(DBG_CAT_MOVER|DBG_FOC_FLOW, "moverAcceptConnection\n"); session->mover.sock = accept(fd, (struct sockaddr *)&from, &from_len); (void)ndmpdRemoveFileHandler(session, fd); (void)close(session->mover.listenSock); session->mover.listenSock = -1; if (session->mover.sock < 0) { Error(LOG_ERR, "moverAcceptConnection: accept error: %s.\n", strerror(errno)); ndmpdMoverError(session, NDMP_MOVER_HALT_CONNECT_ERROR); return; } /* * Set the O_NDELAY flag on the socket to prevent * reads and writes from blocking. */ if ((flags = fcntl(session->mover.sock, F_GETFL)) < 0) { Error(LOG_ERR, "moverAcceptConnection: fcntl(F_GETFL) error: %s.\n", strerror(errno)); ndmpdMoverError(session, NDMP_MOVER_HALT_INTERNAL_ERROR); return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -