📄 nth_client.c
字号:
ta_start(ta, tag, value); tl_gets(ta_args(ta), NTHTAG_TEMPLATE_REF(template), NTHTAG_AUTHENTICATION_REF(auc), NTHTAG_MESSAGE_REF(msg), NTHTAG_EXPIRES_REF(expires), HTTPTAG_VERSION_REF(version), TAG_END()); if (msg == none) { if (template && template->hc_request) msg = msg_copy(template->hc_request); else msg = msg_create(engine->he_mclass, engine->he_mflags); } http = http_object(msg); if (template) { if (callback == NULL) callback = template->hc_callback; if (magic == NULL) magic = template->hc_magic; if (name == NULL) method = template->hc_method, name = template->hc_method_name; if (uri == NULL) uri = (url_string_t *) template->hc_url; if (auc == none) auc = template->hc_auc; } else if (auc == none) { auc = NULL; } hc = hc_create(engine, callback, magic, msg, ta_tags(ta)); if (hc) hc->hc_expires = expires; if (hc == NULL) ; else if (http_add_tl(msg, http, ta_tags(ta)) < 0) ; else if (!(uri = hc_request_complete(hc, msg, http, method, name, uri, version, nth_client_url(template)))) ; else if (auc && hc_request_authenticate(hc, msg, http, uri, auc) <= 0) ; else if (hc_resolve_and_send(hc) < 0) ; else ok = 1; if (!ok) { if (hc) hc_free(hc); else msg_destroy(msg); hc = NULL; } ta_end(ta); } return hc;}staticurl_string_t const *hc_request_complete(nth_client_t * hc, msg_t *msg, http_t * http, http_method_t method, char const *name, url_string_t const *uri, char const *version, url_t const *parent){ su_home_t *home = msg_home(msg); http_host_t *host = http->http_host; void *tbf = NULL; url_t const *url; url_t u[1]; if (uri == NULL && http->http_request) uri = (url_string_t *) http->http_request->rq_url; if (uri == NULL) uri = (url_string_t *) parent; url = url_string_p(uri) ? (tbf = url_hdup(NULL, uri->us_url)) : uri->us_url; if (!url) return NULL; *u = *url; if (u->url_type == url_unknown && u->url_path && !u->url_host) { if (parent) { *u = *parent; u->url_path = url->url_path; /* XXX - relative URLs! */ u->url_params = url->url_params; u->url_headers = url->url_headers; /* Query */ } } if (!hc->hc_route_url && u->url_type != url_http && u->url_type != url_https) hc->hc_route_url = (url_string_t *) u; if (host && (str0casecmp(host->h_host, u->url_host) || str0cmp(host->h_port, u->url_port))) host = NULL; if (host == NULL && u->url_host) { host = http_host_create(home, u->url_host, u->url_port); msg_header_insert(msg, http, (http_header_t *) host); } if (u->url_host || hc->hc_route_url || host) hc->hc_url = url_hdup(home, u); if (hc->hc_route_url == (url_string_t *) u) hc->hc_route_url = (url_string_t *) hc->hc_url; if (hc->hc_url) { http_request_t *rq = http->http_request; if (rq && !method && !name) method = rq->rq_method, name = rq->rq_method_name; else if (rq && method && method != rq->rq_method) rq = NULL; else if (rq && name && strcmp(name, rq->rq_method_name)) rq = NULL; if (rq && version && strcasecmp(version, rq->rq_version)) rq = NULL; if (!hc->hc_route_url) { u->url_type = url_unknown, u->url_scheme = NULL; u->url_user = NULL, u->url_password = NULL; u->url_host = NULL, u->url_port = NULL; u->url_root = '/'; if (!u->url_path) u->url_path = ""; u->url_fragment = NULL; } if (rq && http_url_cmp(u, rq->rq_url)) rq = NULL; if (!rq) { if (http->http_request) msg_header_remove(msg, http, (msg_header_t *) http->http_request); http->http_request = http_request_create(home, method, name, (url_string_t *) u, version); if (!http->http_request) uri = NULL; } } else { uri = NULL; } if (http_message_complete(msg, http) < 0) uri = NULL; if (tbf) su_free(NULL, tbf); if (uri) { hc->hc_method = http->http_request->rq_method; hc->hc_method_name = http->http_request->rq_method_name; } return uri;}staticint hc_request_authenticate(nth_client_t * hc, msg_t *msg, http_t * http, url_string_t const *uri, auth_client_t **auc){ return auc_authorization(auc, msg, http, http->http_request->rq_method_name, uri->us_url, http->http_payload);}staticnth_client_t *hc_create(nth_engine_t * he, nth_response_f * callback, nth_client_magic_t * magic, msg_t *msg, tag_type_t tag, tag_value_t value, ...){ nth_client_t *hc; su_home_t *home = msg_home(msg); if (!(hc = su_salloc(he->he_home, sizeof(*hc)))) return NULL; if (!callback) callback = hc_default_cb; { int error_msg = he->he_error_msg; int streaming = he->he_streaming; url_string_t const *route_url = NULL; ta_list ta; ta_start(ta, tag, value); route_url = (url_string_t *) he->he_default_proxy; tl_gets(ta_args(ta), NTHTAG_PROXY_REF(route_url), NTHTAG_ERROR_MSG_REF(error_msg), NTHTAG_STREAMING_REF(streaming), TAG_END()); hc->hc_engine = he; hc->hc_callback = callback; hc->hc_magic = magic; hc->hc_tags = tl_afilter(home, tport_tags, ta_args(ta)); hc->hc_error_msg = error_msg; hc->hc_streaming = streaming; hc->hc_route_url = route_url; ta_end(ta); } hc->hc_request = msg; return hc;}staticint hc_resolve_and_send(nth_client_t * hc){ msg_t *msg = hc->hc_request; http_t *http = http_object(msg); su_home_t *home = msg_home(msg); int resolved = -1; if (hc->hc_route_url) { resolved = tport_name_by_url(home, hc->hc_tpn, hc->hc_route_url); } else { resolved = tpn_by_host(hc->hc_tpn, http->http_host, hc->hc_url); } if (resolved < 0) { SU_DEBUG_3(("nth client resolve: %s\n", "cannot resolve URL")); return -1; } hc->hc_route_url = NULL; hc->hc_tport = tport_by_name(hc->hc_engine->he_tports, hc->hc_tpn); if (!hc->hc_tport) { assert(hc->hc_tport); SU_DEBUG_3(("nth client create: %s\n", !hc->hc_tport ? "no transport" : "invalid message")); return -1; } if (msg_serialize(msg, http) < 0) { assert(hc->hc_tport); SU_DEBUG_3(("nth client create: invalid message")); return -1; } hc_send(hc); hc_insert(hc->hc_engine, hc); return 0;}/**@internal * Insert client request to the hash table */staticvoid hc_insert(nth_engine_t * he, nth_client_t * hc){ if (hc_htable_is_full(he->he_clients)) hc_htable_resize(he->he_home, he->he_clients, 0); hc_htable_insert(he->he_clients, hc); hc->hc_inserted = 1;}/**@internal * Remove client request from the hash table */staticvoid hc_remove(nth_engine_t * he, nth_client_t * hc){ if (hc->hc_inserted) hc_htable_remove(he->he_clients, hc); hc->hc_inserted = 0;}/** Destroy client request. */void nth_client_destroy(nth_client_t * hc){ if (hc == NULL) ; else if (hc->hc_completed) hc_free(hc); else hc->hc_callback = hc_default_cb;}/**@internal Free client request. */void hc_free(nth_client_t * hc){ if (hc) { if (hc->hc_pending) tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, NULL, hc, 0); tport_decref(&hc->hc_tport); msg_destroy(hc->hc_request); msg_destroy(hc->hc_response); su_free(hc->hc_engine->he_home, hc); }}/** * Gets client status. * * @param hc pointer to a nth client object * * @return * Returns the status code from the response message if it has been * received. A status code below 100 indicates that no response has been * received. If request timeouts, the connection is closed and the status * code is set to 408. If @a hc is NULL, returns 400 (Bad Request). */int nth_client_status(nth_client_t const *hc){ return hc ? hc->hc_status : 400;}/** * Gets client method. * * @param hc pointer to a nth client object * * @return * Returns the HTTP method from the request. * If @a hc is NULL, returns #http_method_invalid. */http_method_t nth_client_method(nth_client_t const *hc){ return hc ? hc->hc_method : http_method_invalid;}/** Get original Request-URI */url_t const *nth_client_url(nth_client_t const *hc){ return hc ? hc->hc_url : NULL;}/** Get request message. */msg_t *nth_client_request(nth_client_t * hc){ msg_t *request = NULL; if (hc) request = hc->hc_request, hc->hc_request = NULL; return request;}/** Get response message. */msg_t *nth_client_response(nth_client_t const *hc){ if (hc) return msg_ref_create(hc->hc_response); else return NULL;}/** Is client streaming response? */int nth_client_is_streaming(nth_client_t const *hc){ return hc && hc->hc_is_streaming;}/** Send request. */static nth_client_t *hc_send(nth_client_t * hc){ nth_engine_t *he = hc->hc_engine; tport_t *tp; he->he_stats->st_requests++; tp = tport_tsend(hc->hc_tport, hc->hc_request, hc->hc_tpn, TAG_NEXT(hc->hc_tags)); if (tp == NULL) { he->he_stats->st_tp_errors++; hc_reply(hc, HTTP_503_NO_SERVICE); return hc; } hc->hc_tport = tport_incref(tp); hc->hc_pending = tport_pend(tp, hc->hc_request, hc_tport_error, hc); if (hc->hc_pending < 0) hc->hc_pending = 0; if (hc->hc_expires) { hc->hc_timeout = he_now(he) + hc->hc_expires; /* XXX */ if (hc->hc_timeout == 0) hc->hc_timeout++; } return hc;}/** @internal Report transport errors. */void hc_tport_error(nth_engine_t * he, nth_client_t * hc, tport_t * tp, msg_t *msg, int error){ su_sockaddr_t *su = msg_addr(msg); tp_name_t const *tpn = tp ? tport_name(tp) : hc->hc_tpn; char addr[SU_ADDRSIZE]; char const *errmsg; if (error) errmsg = su_strerror(error); else errmsg = "Remote end closed connection"; su_log("nth: %s: %s (%u) with %s@%s:%u\n", hc->hc_method_name, errmsg, error, tpn->tpn_proto, inet_ntop(su->su_family, SU_ADDR(su), addr, sizeof(addr)), htons(su->su_port)); he->he_stats->st_tp_errors++; hc_reply(hc, HTTP_503_NO_SERVICE);}staticvoid hc_delayed_recv(su_root_magic_t *rm, su_msg_r msg, union sm_arg_u *u);/** Respond internally to a transaction. */int hc_reply(nth_client_t * hc, int status, char const *phrase){ nth_engine_t *he = hc->hc_engine; msg_t *msg = NULL; http_t *http = NULL; assert(status >= 400); SU_DEBUG_5(("nth: hc_reply(%p, %u, %s)\n", hc, status, phrase)); if (hc->hc_pending) { tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, NULL, hc, status < 200); if (status >= 200) hc->hc_pending = 0; } tport_shutdown(hc->hc_tport, 2); hc->hc_completed = 1; hc->hc_timeout = 0; if (hc->hc_callback == hc_default_cb) { hc_free(hc); return 0; } /* Create response message, if needed */ if (hc->hc_error_msg) { msg = he_msg_create(he, NTH_INTERNAL_MSG, NULL, 0, NULL, hc); http = http_object(msg); http_complete_response(msg, status, phrase, http_object(hc->hc_request)); } else hc->hc_status = status; if (hc->hc_inserted) { hc_recv(hc, msg, http); return 0; } else { /* * The thread creating outgoing transaction must return to application * before transaction callback can be invoked. Processing an internally * generated response message must be delayed until transaction creation * is completed. * * The internally generated message is transmitted using su_msg_send() * and it is delivered back to NTA when the application next time * executes the su_root_t event loop. */ su_root_t *root = he->he_root; su_msg_r su_msg = SU_MSG_R_INIT; if (su_msg_create(su_msg, su_root_task(root), su_root_task(root), hc_delayed_recv, sizeof(struct hc_recv_s)) == SU_SUCCESS) { struct hc_recv_s *a = su_msg_data(su_msg)->hc_recv; a->hc = hc; a->msg = msg; a->http = http; if (su_msg_send(su_msg) == SU_SUCCESS) return 0; } } if (msg) msg_destroy(msg); return -1;}staticvoid hc_delayed_recv(su_root_magic_t *rm, su_msg_r msg, union sm_arg_u *u){ struct hc_recv_s *a = u->hc_recv; if (hc_recv(a->hc, a->msg, a->http) < 0 && a->msg) msg_destroy(a->msg);}/** Receive response to transaction. */int hc_recv(nth_client_t * hc, msg_t *msg, http_t * http){ short status; int streaming = msg_is_streaming(msg); int shutdown = 0; if (http && http->http_status) { status = http->http_status->st_status; if (status < 100) status = 100; if (streaming && !hc->hc_streaming) { /* Disable streaming for this msg */ msg_set_streaming(msg, 0); return 0; /* Wait for complete message */ } hc->hc_status = status; } else if (http) status = hc->hc_status = 500, streaming = 0, http = NULL; else status = hc->hc_status, streaming = 0; if (status == 400 || (http && (http->http_flags & MSG_FLG_ERROR))) shutdown = 2; if (!streaming || shutdown) msg_set_streaming(msg, 0); if (hc->hc_pending) { tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, msg, hc, streaming || status < 200); if (!streaming && status >= 200) hc->hc_pending = 0; } if (!streaming && status >= 200) { /* Completed. */ hc->hc_completed = 1; hc_remove(hc->hc_engine, hc); if (shutdown || !http || (http->http_status->st_version == http_version_1_1 && http->http_connection && msg_params_find(http->http_connection->k_items, "close")) || (http->http_status->st_version == http_version_1_0)) shutdown = 2; } if (shutdown) { if (status < 200) status = 400; tport_shutdown(hc->hc_tport, shutdown); } if (msg_is_complete(msg)) { if (status < 200) hc->hc_engine->he_stats->st_1xxresponses++; else hc->hc_engine->he_stats->st_responses++; } if (hc->hc_response) msg_destroy(hc->hc_response); hc->hc_response = msg; hc->hc_is_streaming = streaming; /* Call callback */ hc->hc_callback(hc->hc_magic, hc, http); return 0;}/** @internal Default callback for request */int hc_default_cb(nth_client_magic_t * magic, nth_client_t * hc, http_t const *http){ if (http == NULL || http->http_status->st_status >= 200) hc_free(hc); return 0;}/** @internal Client transaction timer routine. */staticvoid hc_timer(nth_engine_t * he, nth_client_t * hc, uint32_t now){ if (hc->hc_timeout == 0) return; if ((int)hc->hc_timeout - (int)now > 0) return; hc_reply(hc, HTTP_408_TIMEOUT);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -