📄 orbfunctional.java
字号:
msh_reply.version = msh_request.version; msh_reply.message_type = MessageHeader.REPLY; msh_reply.message_size = out.buffer.size(); // Write the reply. msh_reply.write(net_out); out.buffer.writeTo(net_out); net_out.flush(); } /** * Forward request to another target, as indicated by the passed exception. */ private void forward_request(OutputStream net_out, MessageHeader msh_request, RequestHeader rh_request, gnuForwardRequest info ) throws IOException { MessageHeader msh_forward = new MessageHeader(); msh_forward.version = msh_request.version; ReplyHeader rh_forward = msh_forward.create_reply_header(); msh_forward.message_type = MessageHeader.REPLY; rh_forward.reply_status = info.forwarding_code; rh_forward.request_id = rh_request.request_id; // The forwarding code is either LOCATION_FORWARD or LOCATION_FORWARD_PERM. BufferedCdrOutput out = new BufferedCdrOutput(); out.setOrb(this); out.setOffset(msh_forward.getHeaderSize()); rh_forward.write(out); if (msh_forward.version.since_inclusive(1, 2)) out.align(8); out.write_Object(info.forward_reference); msh_forward.message_size = out.buffer.size(); // Write the forwarding instruction. msh_forward.write(net_out); out.buffer.writeTo(net_out); net_out.flush(); } /** * Contains a single servicing task. * * Normally, each task matches a single remote invocation. However under * frequent tandem submissions the same task may span over several * invocations. * * @param serverSocket the ORB server socket. * * @throws MARSHAL * @throws IOException */ void serve(final portServer p, ServerSocket serverSocket) throws MARSHAL, IOException { final Socket service; service = serverSocket.accept(); // Tell the server there are no more resources. if (p.running_threads >= MAX_RUNNING_THREADS) { serveStep(service, true); return; } new Thread() { public void run() { try { synchronized (p) { p.running_threads++; } serveStep(service, false); } finally { synchronized (p) { p.running_threads--; } } } }.start(); } /** * A single servicing step, when the client socket is alrady open. * * Normally, each task matches a single remote invocation. However under * frequent tandem submissions the same task may span over several * invocations. * * @param service the opened client socket. * @param no_resources if true, the "NO RESOURCES" exception is thrown to the * client. */ void serveStep(Socket service, boolean no_resources) { try { Serving: while (true) { InputStream in = service.getInputStream(); service.setSoTimeout(TOUT_START_READING_MESSAGE); MessageHeader msh_request = new MessageHeader(); try { msh_request.read(in); } catch (MARSHAL ex) { // This exception may be thrown due closing the connection. return; } if (max_version != null) { if (!msh_request.version.until_inclusive(max_version.major, max_version.minor)) { OutputStream out = service.getOutputStream(); new ErrorMessage(max_version).write(out); return; } } byte[] r = msh_request.readMessage(in, service, TOUT_WHILE_READING, TOUT_AFTER_RECEIVING); if (msh_request.message_type == MessageHeader.REQUEST) { RequestHeader rh_request; BufferredCdrInput cin = new BufferredCdrInput(r); cin.setOrb(this); cin.setVersion(msh_request.version); cin.setOffset(msh_request.getHeaderSize()); cin.setBigEndian(msh_request.isBigEndian()); rh_request = msh_request.create_request_header(); // Read header and auto set the charset. rh_request.read(cin); // in 1.2 and higher, align the current position at // 8 octet boundary. if (msh_request.version.since_inclusive(1, 2)) { cin.align(8); // find the target object. } InvokeHandler target = (InvokeHandler) find_connected_object( rh_request.object_key, -1); // Prepare the reply header. This must be done in advance, // as the size must be known for handler to set alignments // correctly. ReplyHeader rh_reply = msh_request.create_reply_header(); // TODO log errors about not existing objects and methods. ResponseHandlerImpl handler = new ResponseHandlerImpl( this, msh_request, rh_reply, rh_request); SystemException sysEx = null; try { if (no_resources) { NO_RESOURCES no = new NO_RESOURCES("Too many parallel calls"); no.minor = Minor.Threads; throw no; } if (target == null) throw new OBJECT_NOT_EXIST(); target._invoke(rh_request.operation, cin, handler); } catch (gnuForwardRequest forwarded) { OutputStream sou = service.getOutputStream(); forward_request(sou, msh_request, rh_request, forwarded); if (service != null && !service.isClosed()) { // Wait for the subsequent invocations on the // same socket for the TANDEM_REQUEST duration. service.setSoTimeout(TANDEM_REQUESTS); continue Serving; } } catch (UnknownException uex) { sysEx = new UNKNOWN("Unknown", 2, CompletionStatus.COMPLETED_MAYBE); sysEx.initCause(uex.originalEx); org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply(); rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext( rh_reply.service_context, uex.originalEx, ech); ObjectCreator.writeSystemException(ech, sysEx); } catch (SystemException ex) { sysEx = ex; org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply(); rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext( rh_reply.service_context, ex, ech); ObjectCreator.writeSystemException(ech, ex); } catch (Exception except) { // This should never happen under normal operation and // can only indicate errors in user object implementation. // We inform the user. except.printStackTrace(); sysEx = new UNKNOWN("Unknown", 2, CompletionStatus.COMPLETED_MAYBE); sysEx.initCause(except); org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply(); rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext( rh_reply.service_context, except, ech); ObjectCreator.writeSystemException(ech, sysEx); } // Write the response. if (rh_request.isResponseExpected()) { OutputStream sou = service.getOutputStream(); respond_to_client(sou, msh_request, rh_request, handler, sysEx); } } else if (msh_request.message_type == MessageHeader.CLOSE_CONNECTION || msh_request.message_type == MessageHeader.MESSAGE_ERROR) { CloseMessage.close(service.getOutputStream()); service.close(); return; } if (service != null && !service.isClosed()) // Wait for the subsequent invocations on the // same socket for the TANDEM_REQUEST duration. service.setSoTimeout(TANDEM_REQUESTS); else return; } } catch (SocketException ex) { // OK. return; } catch (IOException ioex) { // Network error, probably transient. // TODO log it. return; } finally { try { if (service!=null && !service.isClosed()) service.close(); } catch (IOException ioex) { // OK. } } } /** * Set the ORB parameters from the properties that were accumulated * from several locations. */ protected void useProperties(Properties props) { if (props != null) { if (props.containsKey(LISTEN_ON)) Port = Integer.parseInt(props.getProperty(LISTEN_ON)); if (props.containsKey(NS_HOST)) ns_host = props.getProperty(NS_HOST); try { if (props.containsKey(NS_PORT)) ns_port = Integer.parseInt(props.getProperty(NS_PORT)); if (props.containsKey(START_READING_MESSAGE)) TOUT_START_READING_MESSAGE = Integer.parseInt(props.getProperty(START_READING_MESSAGE)); if (props.containsKey(WHILE_READING)) TOUT_WHILE_READING = Integer.parseInt(props.getProperty(WHILE_READING)); if (props.containsKey(AFTER_RECEIVING)) TOUT_AFTER_RECEIVING = Integer.parseInt(props.getProperty(AFTER_RECEIVING)); if (props.containsKey(SERVER_ERROR_PAUSE)) TWAIT_SERVER_ERROR_PAUSE = Integer.parseInt(props.getProperty(SERVER_ERROR_PAUSE)); } catch (NumberFormatException ex) { throw new BAD_PARAM("Invalid " + NS_PORT + "property, unable to parse '" + props.getProperty(NS_PORT) + "'" ); } if (props.containsKey(SocketFactory.PROPERTY)) { String factory = null; try { factory = props.getProperty(SocketFactory.PROPERTY); if (factory!=null) socketFactory = (SocketFactory) ObjectCreator.forName(factory).newInstance(); } catch (Exception ex) { BAD_PARAM p = new BAD_PARAM("Bad socket factory "+factory); p.initCause(ex); throw p; } } if (props.containsKey(ORB_ID)) orb_id = props.getProperty(ORB_ID); if (props.containsKey(SERVER_ID)) server_id = props.getProperty(SERVER_ID); Enumeration en = props.elements(); while (en.hasMoreElements()) { String item = (String) en.nextElement(); if (item.equals(REFERENCE)) initial_references.put(item, string_to_object(props.getProperty(item)) ); } } } /** * Get the next instance with a response being received. If all currently sent * responses not yet processed, this method pauses till at least one of them * is complete. If there are no requests currently sent, the method pauses * till some request is submitted and the response is received. This strategy * is identical to the one accepted by Suns 1.4 ORB implementation. * * The returned response is removed from the list of the currently submitted * responses and is never returned again. * * @return the previously sent request that now contains the received * response. * * @throws WrongTransaction If the method was called from the transaction * scope different than the one, used to send the request. The exception can * be raised only if the request is implicitly associated with some particular * transaction. */ public Request get_next_response() throws org.omg.CORBA.WrongTransaction { return asynchron.get_next_response(); } /** * Find if any of the requests that have been previously sent with * {@link #send_multiple_requests_deferred}, have a response yet. * * @return true if there is at least one response to the previously sent * request, false otherwise. */ public boolean poll_next_response() { return asynchron.poll_next_response(); } /** * Send multiple prepared requests expecting to get a reply. All requests are * send in parallel, each in its own separate thread. When the reply arrives, * it is stored in the agreed fields of the corresponing request data * structure. If this method is called repeatedly, the new requests are added * to the set of the currently sent requests, but the old set is not * discarded. * * @param requests the prepared array of requests. * * @see #poll_next_response() * @see #get_next_response() * @see Request#send_deferred() */ public void send_multiple_requests_deferred(Request[] requests) { asynchron.send_multiple_requests_deferred(requests); } /** * Send multiple prepared requests one way, do not caring about the answer. * The messages, containing requests, will be marked, indicating that the * sender is not expecting to get a reply. * * @param requests the prepared array of requests. * * @see Request#send_oneway() */ public void send_multiple_requests_oneway(Request[] requests) { asynchron.send_multiple_requests_oneway(requests); } /** * Set the flag, forcing all server threads to terminate. */ protected void finalize() throws java.lang.Throwable { running = false; super.finalize(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -