⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mtp.c

📁 asterisk1.4.6版本下 7#信令驱动 源码
💻 C
📖 第 1 页 / 共 5 页
字号:
               m->retrans_buf[m->retrans_seq].buf,
               m->retrans_buf[m->retrans_seq].len);
        m->tx_len = m->retrans_buf[m->retrans_seq].len;
        m->tx_buffer[0] = m->send_bsn | (m->send_bib << 7);
        m->tx_buffer[1] = m->retrans_seq | (m->send_fib << 7);

        if(m->retrans_seq == m->retrans_last_sent) {
          /* Retransmission done. */
          m->retrans_seq = -1;
        } else {
          /* Move to the next one. */
          m->retrans_seq = MTP_NEXT_SEQ(m->retrans_seq);
        }

        return;
      }

      /* Send Fill-in signalling unit (FISU) if nothing else is pending. */
      m->tx_len = 3;
      m->tx_buffer[0] = m->send_bsn | (m->send_bib << 7);
      m->tx_buffer[1] = m->retrans_last_sent | (m->send_fib << 7);
      m->tx_buffer[2] = 0;      /* Length 0, meaning FISU. */
      return;

    default:
      fifo_log(m, LOG_ERROR, "Internal: Unknown MTP2 state %d on link '%s'?!?\n", m->state, m->name);
  }
}

/* Fill in a buffer for Zaptel transmission, picking frames as necessary.
   The passed buffer is of size ZAP_BUF_SIZE.
*/
static void mtp2_fill_zaptel_buf(mtp2_t *m, unsigned char *buf) {
  int i;

  for(i = 0; i < ZAP_BUF_SIZE; i++) {
    if(m->h_tx.bits < 8) {
      /* Need some more bits. */
      if(m->tx_do_crc == 1) {
        /* Put first byte of CRC. */
        fasthdlc_tx_load_nocheck(&(m->h_tx), m->tx_crc & 0xff);
        m->tx_do_crc = 2;
      } else if(m->tx_do_crc == 2) {
        /* Put second byte of CRC. */
        fasthdlc_tx_load_nocheck(&(m->h_tx), (m->tx_crc >> 8) & 0xff);
        m->tx_do_crc = 0;
      } else if(m->tx_sofar >= m->tx_len) {
        /* Fetch a new frame. */
#ifdef DROP_PACKETS_PCT
        do {
#endif
        mtp2_pick_frame(m);
#ifdef DROP_PACKETS_PCT
        } while(rand() <= DROP_PACKETS_PCT/100.0*RAND_MAX);
#endif
	if (m->tx_len > 4)
	  fifo_log(m, LOG_DEBUG, "Sending buffer to zaptel len=%d, on link '%s' bsn=%d, fsn=%d.\n", m->tx_len, m->name, m->tx_buffer[0]&0x7f,  m->tx_buffer[1]&0x7f);
        log_frame(m, 1, m->tx_buffer, m->tx_len);
        m->tx_sofar = 0;
        m->tx_crc = 0xffff;
        fasthdlc_tx_frame_nocheck(&(m->h_tx));
        /* A zero-length frame from mtp2_pick_frame() will cause
           sending of a single flag, without crc check bits.
        */
      } else {
        unsigned char data = m->tx_buffer[m->tx_sofar++];
        fasthdlc_tx_load_nocheck(&(m->h_tx), data);
        m->tx_crc = PPP_FCS(m->tx_crc, data);
        if(m->tx_sofar == m->tx_len) {
          /* At frame end, also push the crc bits into the fasthdlc buffer.
             Because of bit stuffing, we might not have room in the buffer for
             8 bits of data + 16 bits of crc, so set a flag to do it later. */
          m->tx_crc ^= 0xffff;
          m->tx_do_crc = 1;
        }
      }
    }

    buf[i] = fasthdlc_tx_run_nocheck(&(m->h_tx));
    m->writecount++;
  }
}

void *mtp_thread_main(void *data) {
  struct mtp2_state *m = NULL;
  int i, lsi;
  int res;
  struct pollfd fds[MAX_SCHANNELS];
  unsigned char fifobuf[MTP_REQ_MAX_SIZE];
  struct mtp_req *req;
  int last_send_ix = 0;
#ifdef MTP_OVER_UDP
  int sent_fisu[MAX_SCHANNELS] = {0,};
  int sent_bsn[MAX_SCHANNELS] = {0,};
#endif
  ast_verbose(VERBOSE_PREFIX_3 "Starting MTP thread, pid=%d.\n", getpid());

  /* These counters are used to generate timestamps for raw dumps.
     The write count is offset with one zaptel buffer size to account for
     the buffer-introduced write latency. This way the dump timings should
     approximately reflect the time that the last byte of the frame went
     out on the wire. */
  for (i = 0; i < this_host->n_schannels; i++) {
    m = &mtp2_state[i];
    m->readcount = 0;
    m->writecount = ZAP_BUF_SIZE;

    fds[i].fd = m->fd;
    fds[i].events = POLLIN|POLLPRI|POLLOUT;
  }
  while(!stop_mtp_thread) {
    struct timeval now;
    struct timeval last;
    int tdiff;

    for (i = 0; i < this_host->n_schannels; i++) {
      m = &mtp2_state[i];
#ifdef MTP_OVER_UDP
      if (0)
	if (((m->state == MTP2_READY || m->state == MTP2_INSERVICE)))
	  ast_log(LOG_DEBUG, "Poll2, state=%d, retrans_seq=%d last_sent=%d last_ack=%d, send_bsn=%d sent_bsn=%d sent_fisu=%d\n", m->state, m->retrans_seq, m->retrans_last_sent, m->retrans_last_acked, m->send_bsn, sent_bsn[i], sent_fisu[i]);
      if (((m->state == MTP2_READY || m->state == MTP2_INSERVICE) && ((m->retrans_seq != -1) || (m->retrans_last_acked != m->retrans_last_sent))) ||
	  ((m->state != MTP2_READY && m->state != MTP2_INSERVICE)) ||
	  (m->send_bsn != sent_bsn[i])) {
	/* avoid sending FISU */
	fds[i].events = POLLIN|POLLPRI|POLLOUT;
	sent_fisu[i] = 0;
	sent_bsn[i] = m->send_bsn;
      }
      else {
	if (sent_fisu[i] < 100) {
	  fds[i].events = POLLIN|POLLPRI|POLLOUT;
	  sent_fisu[i]++;
	  sent_bsn[i] = m->send_bsn;
	}
	else
	  fds[i].events = POLLIN|POLLPRI;
      }
#endif
    }
#ifdef TESTINPUT
    {
      int cmdfd = open("/tmp/mtp3d.sock", O_RDONLY | O_NONBLOCK);

      if (cmdfd != -1) {
	struct pollfd cmdfds[1];
	cmdfds[0].fd = cmdfd;
	cmdfds[0].events = POLLIN;
	res = poll(cmdfds, 1, 100);
	if (res > 0) {
	  unsigned char buf[1024];
	  res = read(cmdfd, buf, sizeof(buf));
	  if (res > 0) {
	    m = &mtp2_state[0];
	    log_frame(m, 0, buf, res);
	    process_msu(m, buf, res);
	  }
	}
	close(cmdfd);
      }
    }
#endif

    /* No need to calculate timeout with ast_sched_wait, as we will be
       woken up every 2 msec. anyway to read/write zaptel buffers. */
    gettimeofday(&last, NULL);
    res = poll(fds, this_host->n_schannels, 20);

    gettimeofday(&now, NULL);
    tdiff = timediff_usec(now, last);
#ifndef MTP_OVER_UDP
    if (tdiff > 5000)
      if (this_host->n_schannels)
	fifo_log(m, LOG_NOTICE, "Excessive poll delay %d!\n", tdiff);//xxxx
#endif

    if(res < 0) {
      if(errno == EINTR) {
        /* Just try again. */
      } else {
        fifo_log(m, LOG_NOTICE, "poll() failure, errno=%d: %s\n",
		 errno, strerror(errno));
      }
    } else if(res > 0) {

      for (i = 0; i < this_host->n_schannels; i++) {
	if(fds[i].revents & POLLPRI) {
	  mtp2_fetch_zap_event(&mtp2_state[i]);
	}
      }
      /* Do the read before write, so that we can send any responses
         immediately (since we will usually/always also have a ready
         POLLOUT condition). */
      for (i = 0; i < this_host->n_schannels; i++) {
	m = &mtp2_state[i];
	if(fds[i].revents & POLLIN) {
	  unsigned char buf[1024];
	  int count = 0;

	  for(;;) {
	    res = read(fds[i].fd, buf, sizeof(buf));
	    if(res == 0) {
	      /* EOF. */
	      break;
	    } else if(res < 0) {
	      if(errno == EAGAIN || errno == EWOULDBLOCK) {
		/* Done until next successful poll(). */
		break;
	      } else if(errno == EINTR) {
		/* Try again. */
	      } else if(errno == ELAST) {
		mtp2_fetch_zap_event(m);
	      } else {
		/* Some unexpected error. */
		fifo_log(m, LOG_DEBUG, "Error reading zaptel device '%s', errno=%d: %s.\n", m->name, errno, strerror(errno));
		break;
	      }
	    } else {
	      /* Got some data. */
	      count += res;
#ifdef DO_RAW_DUMPS
	      mtp2_dump_raw(m, buf, res, 0);
#endif
	      mtp2_read_su(m, buf, res);
	    }
	  }
	  //if(count > 2*ZAP_BUF_SIZE) fifo_log(m, LOG_NOTICE, "%d bytes read (%d buffers).\n", count, count/ZAP_BUF_SIZE);
#ifndef MTP_OVER_UDP
	  if(count >= NUM_ZAP_BUF*ZAP_BUF_SIZE) {
	    fifo_log(m, LOG_NOTICE, "Full Zaptel input buffer detected, incoming "
		     "packets may have been lost on link '%s' (count=%d.\n", m->name, count);
	  }
#endif
	}
      }
      for (i = 0; i < this_host->n_schannels; i++) {
	m = &mtp2_state[i];
	if(fds[i].revents & POLLOUT) {
	  int count = 0;

	  for(;;) {
	    /* We buffer an extra ZAP_BUF_SIZE bytes locally. This creates
	       extra latency, but it is necessary to be able to detect write()
	       buffer underrun by doing an extra write() to see the EAGAIN
	       return.
	    */
	    if(!m->zap_buf_full) {
	      mtp2_fill_zaptel_buf(m, m->zap_buf);
	      m->zap_buf_full = 1;
	    }
	    res = write(fds[i].fd, m->zap_buf, ZAP_BUF_SIZE);
	    if(res == 0) {
	      /* EOF. */
	      break;
	    } else if(res < 0) {
	      if(errno == EAGAIN || errno == EWOULDBLOCK) {
		/* Done until next successful poll(). */
		break;
	      } else if(errno == EINTR) {
		/* Try again. */
	      } else if(errno == ELAST) {
		mtp2_fetch_zap_event(m);
	      } else {
		/* Some unexpected error. */
		fifo_log(m, LOG_DEBUG, "Error writing zaptel device '%s', errno=%d: %s.\n", m->name, errno, strerror(errno));
		break;
	      }
	    } else {
	      /* Successful write. */
	      count += res;
#ifdef DO_RAW_DUMPS
	      mtp2_dump_raw(m, m->zap_buf, res, 1);
#endif
	      m->zap_buf_full = 0;
#ifdef MTP_OVER_UDP
	      break;
#endif
	    }
	  }
	  //if(count > 2*ZAP_BUF_SIZE) fifo_log(m, LOG_NOTICE, "%d bytes written (%d buffers).\n", count, count/ZAP_BUF_SIZE);
	  if(count >= NUM_ZAP_BUF*ZAP_BUF_SIZE) {
	    fifo_log(m, LOG_NOTICE, "Empty Zaptel output buffer detected, outgoing "
		     "packets may have been lost on link '%s'.\n", m->name);
	  }
#ifdef MTP_OVER_UDP
	  if ((m->state != MTP2_READY) && (m->state != MTP2_INSERVICE))
	    res = poll(fds, 0, 20); /* sending lssu, small delay */
	  else
	    res = poll(fds, 0, 10); /* sending msu or fisu */
#endif
	}
      }
    }

    for (lsi = 0; lsi < n_linksets; lsi++) {
      m = NULL;
      if (!linksets[lsi].enabled)
	continue;
#ifdef xxxx
      n_inservice = 0;
      for (i = 0; i < this_host->n_schannels; i++) {
	struct mtp2_state* trym = &mtp2_state[last_send_ix];
	last_send_ix = (last_send_ix + 1) % this_host->n_schannels;
	if (trym->link->linkset != &linksets[lsi])
	  continue;
	if (trym->state != MTP2_INSERVICE)
	  continue;
	m = trym;
	n_inservice += 1;

	/* Handle requests from the channel driver threads.
	   We don't pull in requests when the retransmit buffer is full (otherwise
	   we would loose messages as we cannot fetch from the lffifo
	   out-of-order). */
      }
      if (!n_inservice) {
	for (i = 0; i < this_host->n_schannels; i++) {
	  struct mtp2_state* trym = &mtp2_state[last_send_ix];
	  last_send_ix = (last_send_ix + 1) % this_host->n_schannels;
	  if (!is_combined_linkset(trym->link->linkset, &linksets[lsi]))
	    continue;
	  if (trym->state != MTP2_INSERVICE)
	    continue;
	  m = trym;
	  n_inservice += 1;
	}
      }
#endif
      m = get_inservice_schannel(linksets[lsi].links[0]);
      if (m) {
	while(MTP_NEXT_SEQ(m->retrans_last_sent) != m->retrans_last_acked &&
	      (res = lffifo_get(sendbuf[lsi], fifobuf, sizeof(fifobuf))) != 0) {
	  if(res < 0) {
	    fifo_log(m, LOG_ERROR, "Got oversize packet in MTP request buffer -> choking on link '%s'.\n", m->name);
	    break;
	  }
	  req = (struct mtp_req *)fifobuf;
	  switch(req->typ) {
	  case MTP_REQ_ISUP:
	  case MTP_REQ_ISUP_FORWARD: {
	    if (req->isup.slink) {
	      struct mtp2_state* targetm = req->isup.slink->mtp;
	      if (targetm && (targetm->state == MTP2_INSERVICE) &&
		  (MTP_NEXT_SEQ(targetm->retrans_last_sent) != targetm->retrans_last_acked)) /* Not full */
		m = targetm;
	    }
	    int subservice = SS7_PROTO_ISUP | (m->subservice << 4);
	    mtp3_set_sls(variant(m), m->sls, req->buf); // xxx is this necessary?
	    fifo_log(m, LOG_DEBUG, "Queue MSU, lsi=%d, last_send_ix=%d, linkset=%s, m->link=%s\n", lsi, last_send_ix, linksets[lsi].name, m->link->name);
	    mtp2_queue_msu(m, subservice, req->buf, req->len);
#if 1
	    if (testfailover) {
	      int tx_len = m->retrans_buf[m->retrans_last_sent].len;
	      close(m->fd);
	      m->fd = -1;
	      fds[m->slinkno].fd = -1;
	      testfailover = 0;
	      fifo_log(m, LOG_DEBUG, "Closing link with tx_len %d, on link '%s'\n", tx_len, m->name);
	    }
#endif
	  }
	    break;
	  case MTP_REQ_SCCP: {
	    if (req->sccp.slink) {
	      struct mtp2_state* targetm = req->sccp.slink->mtp;
	      if (targetm && (targetm->state == MTP2_INSERVICE) &&
		  (MTP_NEXT_SEQ(targetm->retrans_last_sent) != targetm->retrans_last_acked)) /* Not full */
		m = targetm;
	    }
	    int subservice = SS7_PROTO_SCCP | (m->subservice << 4);
	    fifo_log(m, LOG_DEBUG, "Queue MSU, lsi=%d, last_send_ix=%d, linkset=%s, m->link=%s\n", lsi, last_send_ix, linksets[lsi].name, m->link->name);
	    mtp2_queue_msu(m, subservice, req->buf, req->len);
	  }
	    break;
	  case MTP_REQ_LINK_DOWN:
	    fifo_log(m, LOG_ERROR, "Got MTP_REQ_LINK_DOWN packet in MTP send buffer???.\n");
	    break;
	  case MTP_REQ_LINK_UP:
	    fifo_log(m, LOG_ERROR, "Got MTP_REQ_LINK_UP packet in MTP send buffer???.\n");
	    break;
	  case MTP_REQ_REGISTER_L4:
	    break;

	  }
	}
      }
      else if (cluster_receivers_alive(&linksets[lsi])) {
	while((res = lffifo_get(sendbuf[lsi], fifobuf, sizeof(fifobuf))) != 0) {
	  if(res < 0) {
	    fifo_log(m, LOG_ERROR, "Got oversize packet in MTP request buffer -> choking on link '%s'.\n", m->name);
	    break;
	  }
	 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -