📄 tport_sigcomp.c
字号:
msg_set_address(msg, self->tp_addr, self->tp_addrlen); else msg_set_address(msg, su, su_size); SU_DEBUG_5(("%s(%p): sigcomp recv = %u => %u %s\n", __func__, self, N, dlen, eos ? " (complete)" : "")); msg_mark_as_compressed(msg); /* Write the received data to the message dump file */ if (self->tp_master->mr_dump_file && !self->tp_pri->pri_threadpool) tport_dump_iovec(self, msg, n, iovec, veclen, "recv", "from"); msg_recv_commit(msg, dlen, eos); /* Mark buffer as used */ } else { SU_DEBUG_5(("%s(%p): sigcomp recv = %u => %u %s\n", __func__, self, N, dlen, eos ? " (complete)" : "")); if (complete || !tport_is_stream(self)) { sigcomp_udvm_reject(udvm); /* Reject empty message */ } } if (self->tp_addrinfo->ai_socktype == SOCK_STREAM) { if (eos) return 0; if (output->b_complete) return n < N || sigcomp_udvm_has_pending_data(udvm) ? 2 : 1; if (!sigcomp_udvm_has_input(udvm)) return 1; } } return eos ? 0 : 2;}staticint vsc_send_sigcomp(tport_t const *self, msg_t *msg, msg_iovec_t iov[], int iovused, struct sigcomp_compartment *cc, tport_sigcomp_t *sc){ struct sigcomp_compressor *c = sc->sc_compressor; struct sigcomp_buffer *input = sc->sc_input; struct sigcomp_buffer *output = sc->sc_output; msg_iovec_t ciov[1]; int i, n, m, k, stream = tport_is_stream(self); char const *ccname; int ccnamelen; su_addrinfo_t *ai = msg_addrinfo(msg); int compress = (cc || (cc = sc->sc_cc)) && ai->ai_flags & TP_AI_COMPRESSED; if (!compress) { if (stream) sc->sc_outfmt = format_is_noncomp; ai->ai_flags &= ~TP_AI_COMPRESSED; return self->tp_pri->pri_vtable->vtp_send(self, msg, iov, iovused, NULL); } if (stream) sc->sc_outfmt = format_is_sigcomp; assert(cc); if (c == NULL) { assert(input == NULL); if (stream) c = sigcomp_compressor_create_for_stream(cc); else c = sigcomp_compressor_create(cc); sc->sc_compressor = c; } ccname = sigcomp_compartment_name(cc, &ccnamelen); if (sc->sc_compressed != 0) { input = NONE; } else if (input == NULL) { int input_size = -1; if (tport_is_udp(self)) { input_size = 0; for (i = 0; i < iovused; i++) input_size += iov[i].siv_len; } sc->sc_input = input = sigcomp_compressor_input_buffer(c, input_size); assert(input->b_avail == 0 && input->b_used == 0); } else if (!input->b_complete) { int input_size = 0; for (i = 0; i < iovused; i++) input_size += iov[i].siv_len; if (input_size > input->b_size - input->b_avail) sigcomp_buffer_align_available(input, 0); } if (output == NULL) sc->sc_output = output = sigcomp_compressor_output_buffer(c, NULL); if (!c || !input || !output) { SU_DEBUG_3(("%s(%p): %s (%u)%s%s%s\n", __func__, self, strerror(errno), errno, c ? "" : " (comp)", input ? "" : " (input)", output ? "" : " (output)")); sigcomp_compressor_free(c); sc->sc_compressor = NULL; sc->sc_output = NULL; sc->sc_input = NULL; sc->sc_compressed = 0; sc->sc_copied = 0; return -1; } if (sc->sc_compressed == 0) { k = sc->sc_copied; if (!input->b_complete) { int m = sc->sc_copied; for (i = 0, n = 0; i < iovused; i++) { char *b = iov[i].siv_base; int l = iov[i].siv_len; if (m >= l) { m -= l; continue; } b += m; l -= m; if (input->b_size == input->b_avail) break; if (l > input->b_size - input->b_avail) l = input->b_size - input->b_avail; memcpy(input->b_data + input->b_avail, b, l); input->b_avail += l; n += l; sc->sc_copied += l; if (l != iov[i].siv_len) break; } input->b_complete = i == iovused; assert(stream || input->b_complete); (void)stream; } m = output->b_avail - output->b_used; n = sigcomp_compressor_compress(c, output, input); if (n < 0) { SU_DEBUG_3(("%s(%p): %s (%u)\n", __func__, self, sigcomp_compressor_strerror(c), sigcomp_compressor_errno(c))); sigcomp_compressor_free(c); sc->sc_compressor = NULL; sc->sc_output = NULL; sc->sc_input = NULL; sc->sc_compressed = 0; return -1; } assert(input->b_complete || sc->sc_copied - k > 0); SU_DEBUG_5(("%s: input %u (%u new) compressed %u to %u with '%.*s'\n", __func__, sc->sc_copied, k, n, (output->b_avail - output->b_used) - m, ccnamelen, ccname)); sc->sc_compressed = n; assert(stream || output->b_complete); } else { assert(tport_is_connection_oriented(self)); n = sc->sc_compressed; } assert(input && cc && c && output); ciov->siv_base = output->b_data + output->b_used; ciov->siv_len = output->b_avail - output->b_used; m = self->tp_pri->pri_vtable->vtp_send(self, msg, ciov, 1); if (m == -1) { int error = su_errno(); if (su_is_blocking(error)) { sigcomp_compressor_free(c); sc->sc_compressor = NULL; sc->sc_output = NULL; sc->sc_input = NULL; sc->sc_compressed = 0; sc->sc_copied = 0; su_seterrno(error); } return -1; } output->b_used += m; if (output->b_used < output->b_avail) return 0; if (output->b_complete) { sigcomp_compressor_accept(c, cc), sc->sc_output = output = NULL; } if (input != NONE && input->b_avail == input->b_used && input->b_complete) sigcomp_buffer_reset(input, -1), sc->sc_input = input = NULL; if (!input && !output) { sigcomp_compressor_free(c); sc->sc_compressor = NULL; } assert(sc->sc_compressed >= n); assert(sc->sc_copied >= n); sc->sc_compressed -= n; sc->sc_copied -= n; return n;}/** Initialize UDVM */static struct sigcomp_udvm *tport_init_udvm(tport_t *self){ struct sigcomp_compartment *cc; struct sigcomp_udvm *udvm; if (self->tp_sigcomp->sc_udvm) return self->tp_sigcomp->sc_udvm; cc = tport_primary_compartment(self->tp_master); if (!cc) return NULL; if (self->tp_addrinfo->ai_socktype == SOCK_STREAM) udvm = sigcomp_udvm_create_for_stream(cc); else udvm = sigcomp_udvm_create_for_compartment(cc); return udvm;}/** Get primary compartment */static struct sigcomp_compartment *tport_primary_compartment(tport_master_t *mr){ return mr->mr_compartment;}/** Test if tport has a SigComp compartment is assigned to it. */int vsc_has_sigcomp_assigned(tport_sigcomp_t const *sc){ return sc && sc->sc_udvm != NULL;}staticvoid vsc_try_accept_sigcomp(tport_t *self, msg_t *msg){ struct sigcomp_udvm *udvm; udvm = self->tp_sigcomp->sc_udvm; if (udvm && sigcomp_udvm_is_complete(udvm)) { if (self->tp_master->mr_tpac->tpac_sigcomp_accept && self->tp_sigcomp->sc_cc == NULL) { tport_t *ref; struct tport_delivery *d; d = self->tp_master->mr_delivery; d->d_tport = self; d->d_msg = msg; d->d_udvm = &self->tp_sigcomp->sc_udvm; *d->d_from = *self->tp_name; ref = tport_incref(self); STACK_SIGCOMP_ACCEPT(self, msg); /* Reject by default */ if (self->tp_sigcomp->sc_udvm) sigcomp_udvm_accept(self->tp_sigcomp->sc_udvm, NULL); tport_decref(&ref); d->d_msg = NULL; } else { if (tport_log->log_level >= 5) { char const *name; int namelen; name = sigcomp_compartment_name(self->tp_sigcomp->sc_cc, &namelen); SU_DEBUG_5(("tport(%p): msg %p SigComp implicit accept '%.*s'\n", self, msg, namelen, name)); } sigcomp_udvm_accept(udvm, self->tp_sigcomp->sc_cc); } }}/** Accept a SigComp message */int tport_sigcomp_accept(tport_t *self, struct sigcomp_compartment *cc, msg_t *msg){ struct sigcomp_udvm *udvm; if (self == NULL || msg != self->tp_master->mr_delivery->d_msg) return su_seterrno(EINVAL); if (!self->tp_master->mr_delivery->d_udvm || cc == NONE) return 0; udvm = *self->tp_master->mr_delivery->d_udvm; if (udvm) { if (tport_log->log_level >= 5) { char const *name; int namelen; if (cc) { name = sigcomp_compartment_name(cc, &namelen); SU_DEBUG_5(("tport(%p): msg %p SigComp accept '%.*s'\n", self, msg, namelen, name)); } else { SU_DEBUG_5(("tport(%p): msg %p SigComp reject\n", self, msg)); } } sigcomp_udvm_accept(udvm, cc); } self->tp_master->mr_delivery->d_udvm = NULL; return 0;}/** Pass message to the protocol stack */voidtport_sigcomp_deliver(tport_t *self, msg_t *msg, su_time_t now){ /* XXX - no d */ STACK_RECV(self, msg, now); if (d->d_udvm && *d->d_udvm) sigcomp_udvm_accept(*d->d_udvm, NULL); /* reject */}#if HAVE_SIGCOMP && 0su_inlineint msg_is_compressed(msg_t *msg){ return msg && (msg_addrinfo(msg)->ai_flags & TP_AI_COMPRESSED) == TP_AI_COMPRESSED;}su_inlinevoid msg_mark_as_compressed(msg_t *msg){ if (msg) msg_addrinfo(msg)->ai_flags |= TP_AI_COMPRESSED;}struct sigcomp_udvm **tport_get_udvm_slot(tport_t *self){ tport_sigcomp_vtable_t const *vsc = tport_sigcomp_vtable; if (vsc)#if HAVE_SIGCOMP return &self->tp_sigcomp->sc_udvm;#else return NULL;#endif}struct sigcomp_compartment *tport_sigcomp_assign_if_needed(tport_t *self, struct sigcomp_compartment *cc){ if (self->tp_name->tpn_comp) { if (cc) tport_sigcomp_assign(self, cc); else if (self->tp_sigcomp->sc_cc) cc = self->tp_sigcomp->sc_cc; else /* Use default compartment */ cc = self->tp_master->mr_compartment; } else cc = NULL; return cc;} /** Receive data from datagram using SigComp. */int tport_recv_sigcomp_dgram(tport_t *self, int N){ char dummy[1]; int error = EBADMSG;#if HAVE_SIGCOMP struct sigcomp_udvm *udvm; if (self->tp_sigcomp->sc_udvm == 0) self->tp_sigcomp->sc_udvm = tport_init_udvm(self); udvm = self->tp_sigcomp->sc_udvm; if (udvm) { retval = tport_recv_sigcomp_r(self, &self->tp_msg, udvm, N); if (retval < 0) sigcomp_udvm_reject(udvm); return retval; } error = su_errno();#endif recv(self->tp_socket, dummy, 1, 0); /* remove msg from socket */ /* XXX - send NACK ? */ return su_seterrno(error); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -