📄 nth_client.c
字号:
int error_msg = he->he_error_msg; int streaming = he->he_streaming; url_string_t const *route_url = NULL; ta_list ta; ta_start(ta, tag, value); route_url = (url_string_t *) he->he_default_proxy; tl_gets(ta_args(ta), NTHTAG_PROXY_REF(route_url), NTHTAG_ERROR_MSG_REF(error_msg), NTHTAG_STREAMING_REF(streaming), TAG_END()); hc->hc_engine = he; hc->hc_callback = callback; hc->hc_magic = magic; hc->hc_tags = tl_afilter(home, tport_tags, ta_args(ta)); hc->hc_error_msg = error_msg; hc->hc_streaming = streaming; hc->hc_route_url = route_url; ta_end(ta); } hc->hc_request = msg; return hc;}staticint hc_resolve_and_send(nth_client_t * hc){ msg_t *msg = hc->hc_request; http_t *http = http_object(msg); su_home_t *home = msg_home(msg); int resolved = -1; if (hc->hc_route_url) { resolved = tport_name_by_url(home, hc->hc_tpn, hc->hc_route_url); } else { resolved = tpn_by_host(hc->hc_tpn, http->http_host, hc->hc_url); } if (resolved < 0) { SU_DEBUG_3(("nth client resolve: %s\n", "cannot resolve URL")); return -1; } hc->hc_route_url = NULL; hc->hc_tport = tport_by_name(hc->hc_engine->he_tports, hc->hc_tpn); if (!hc->hc_tport) { assert(hc->hc_tport); SU_DEBUG_3(("nth client create: %s\n", !hc->hc_tport ? "no transport" : "invalid message")); return -1; } if (msg_serialize(msg, http) < 0) { assert(hc->hc_tport); SU_DEBUG_3(("nth client create: invalid message")); return -1; } hc_send(hc); hc_insert(hc->hc_engine, hc); return 0;}/**@internal * Insert client request to the hash table */staticvoid hc_insert(nth_engine_t * he, nth_client_t * hc){ if (hc_htable_is_full(he->he_clients)) hc_htable_resize(he->he_home, he->he_clients, 0); hc_htable_insert(he->he_clients, hc); hc->hc_inserted = 1;}/**@internal * Remove client request from the hash table */staticvoid hc_remove(nth_engine_t * he, nth_client_t * hc){ if (hc->hc_inserted) hc_htable_remove(he->he_clients, hc); hc->hc_inserted = 0;}/** Destroy client request. */void nth_client_destroy(nth_client_t * hc){ if (hc == NULL) ; else if (hc->hc_completed) hc_free(hc); else hc->hc_callback = hc_default_cb;}/**@internal Free client request. */void hc_free(nth_client_t * hc){ if (hc) { if (hc->hc_pending) tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, NULL, hc, 0); tport_decref(&hc->hc_tport); msg_destroy(hc->hc_request); msg_destroy(hc->hc_response); su_free(hc->hc_engine->he_home, hc); }}/** * Gets client status. * * @param hc pointer to a nth client object * * @return * Returns the status code from the response message if it has been * received. A status code below 100 indicates that no response has been * received. If request timeouts, the connection is closed and the status * code is set to 408. If @a hc is NULL, returns 400 (Bad Request). */int nth_client_status(nth_client_t const *hc){ return hc ? hc->hc_status : 400;}/** * Gets client method. * * @param hc pointer to a nth client object * * @return * Returns the HTTP method from the request. * If @a hc is NULL, returns #http_method_invalid. */http_method_t nth_client_method(nth_client_t const *hc){ return hc ? hc->hc_method : http_method_invalid;}/** Get original Request-URI */url_t const *nth_client_url(nth_client_t const *hc){ return hc ? hc->hc_url : NULL;}/** Get request message. */msg_t *nth_client_request(nth_client_t * hc){ msg_t *request = NULL; if (hc) request = hc->hc_request, hc->hc_request = NULL; return request;}/** Get response message. */msg_t *nth_client_response(nth_client_t const *hc){ if (hc) return msg_ref_create(hc->hc_response); else return NULL;}/** Is client streaming response? */int nth_client_is_streaming(nth_client_t const *hc){ return hc && hc->hc_is_streaming;}/** Send request. */static nth_client_t *hc_send(nth_client_t * hc){ nth_engine_t *he = hc->hc_engine; tport_t *tp; he->he_stats->st_requests++; tp = tport_tsend(hc->hc_tport, hc->hc_request, hc->hc_tpn, TAG_NEXT(hc->hc_tags)); if (tp == NULL) { he->he_stats->st_tp_errors++; hc_reply(hc, HTTP_503_NO_SERVICE); return hc; } hc->hc_tport = tport_incref(tp); hc->hc_pending = tport_pend(tp, hc->hc_request, hc_tport_error, hc); if (hc->hc_pending == -1) hc->hc_pending = 0; if (hc->hc_expires) { hc->hc_timeout = he_now(he) + hc->hc_expires; /* XXX */ if (hc->hc_timeout == 0) hc->hc_timeout++; } return hc;}/** @internal Report transport errors. */void hc_tport_error(nth_engine_t * he, nth_client_t * hc, tport_t * tp, msg_t *msg, int error){ su_sockaddr_t const *su = msg_addr(msg); tp_name_t const *tpn = tp ? tport_name(tp) : hc->hc_tpn; char addr[SU_ADDRSIZE]; char const *errmsg; if (error) errmsg = su_strerror(error); else errmsg = "Remote end closed connection"; su_log("nth: %s: %s (%u) with %s@%s:%u\n", hc->hc_method_name, errmsg, error, tpn->tpn_proto, su_inet_ntop(su->su_family, SU_ADDR(su), addr, sizeof(addr)), htons(su->su_port)); he->he_stats->st_tp_errors++; hc_reply(hc, HTTP_503_NO_SERVICE);}staticvoid hc_delayed_recv(su_root_magic_t *rm, su_msg_r msg, union sm_arg_u *u);/** Respond internally to a transaction. */int hc_reply(nth_client_t * hc, int status, char const *phrase){ nth_engine_t *he = hc->hc_engine; msg_t *msg = NULL; http_t *http = NULL; assert(status >= 400); SU_DEBUG_5(("nth: hc_reply(%p, %u, %s)\n", (void *)hc, status, phrase)); if (hc->hc_pending) { tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, NULL, hc, status < 200); if (status >= 200) hc->hc_pending = 0; } tport_shutdown(hc->hc_tport, 2); hc->hc_completed = 1; hc->hc_timeout = 0; if (hc->hc_callback == hc_default_cb) { hc_free(hc); return 0; } /* Create response message, if needed */ if (hc->hc_error_msg) { msg = he_msg_create(he, NTH_INTERNAL_MSG, NULL, 0, NULL, hc); http = http_object(msg); http_complete_response(msg, status, phrase, http_object(hc->hc_request)); } else hc->hc_status = status; if (hc->hc_inserted) { hc_recv(hc, msg, http); return 0; } else { /* * The thread creating outgoing transaction must return to application * before transaction callback can be invoked. Processing an internally * generated response message must be delayed until transaction creation * is completed. * * The internally generated message is transmitted using su_msg_send() * and it is delivered back to NTA when the application next time * executes the su_root_t event loop. */ su_root_t *root = he->he_root; su_msg_r su_msg = SU_MSG_R_INIT; if (su_msg_create(su_msg, su_root_task(root), su_root_task(root), hc_delayed_recv, sizeof(struct hc_recv_s)) == SU_SUCCESS) { struct hc_recv_s *a = su_msg_data(su_msg)->hc_recv; a->hc = hc; a->msg = msg; a->http = http; if (su_msg_send(su_msg) == SU_SUCCESS) return 0; } } if (msg) msg_destroy(msg); return -1;}staticvoid hc_delayed_recv(su_root_magic_t *rm, su_msg_r msg, union sm_arg_u *u){ struct hc_recv_s *a = u->hc_recv; if (hc_recv(a->hc, a->msg, a->http) < 0 && a->msg) msg_destroy(a->msg);}/** Receive response to transaction. */int hc_recv(nth_client_t * hc, msg_t *msg, http_t * http){ short status; int streaming = msg_is_streaming(msg); int shutdown = 0; if (http && http->http_status) { status = http->http_status->st_status; if (status < 100) status = 100; if (streaming && !hc->hc_streaming) { /* Disable streaming for this msg */ msg_set_streaming(msg, 0); return 0; /* Wait for complete message */ } hc->hc_status = status; } else if (http) status = hc->hc_status = 500, streaming = 0, http = NULL; else status = hc->hc_status, streaming = 0; if (status == 400 || (http && (http->http_flags & MSG_FLG_ERROR))) shutdown = 2; if (!streaming || shutdown) msg_set_streaming(msg, 0); if (hc->hc_pending) { tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, msg, hc, streaming || status < 200); if (!streaming && status >= 200) hc->hc_pending = 0; } if (!streaming && status >= 200) { /* Completed. */ hc->hc_completed = 1; hc_remove(hc->hc_engine, hc); if (shutdown || !http || (http->http_status->st_version == http_version_1_1 && http->http_connection && msg_params_find(http->http_connection->k_items, "close")) || (http->http_status->st_version == http_version_1_0)) shutdown = 2; } if (shutdown) { if (status < 200) status = 400; tport_shutdown(hc->hc_tport, shutdown); } if (msg_is_complete(msg)) { if (status < 200) hc->hc_engine->he_stats->st_1xxresponses++; else hc->hc_engine->he_stats->st_responses++; } if (hc->hc_response) msg_destroy(hc->hc_response); hc->hc_response = msg; hc->hc_is_streaming = streaming; /* Call callback */ hc->hc_callback(hc->hc_magic, hc, http); return 0;}/** @internal Default callback for request */int hc_default_cb(nth_client_magic_t * magic, nth_client_t * hc, http_t const *http){ if (http == NULL || http->http_status->st_status >= 200) hc_free(hc); return 0;}/** @internal Client transaction timer routine. */staticvoid hc_timer(nth_engine_t * he, nth_client_t * hc, uint32_t now){ if (hc->hc_timeout == 0) return; if ((int)hc->hc_timeout - (int)now > 0) return; hc_reply(hc, HTTP_408_TIMEOUT);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -