📄 jump_messaging.c
字号:
return; } if (length < 0) { m->status = JUMP_NEGATIVE_ARRAY_LENGTH; return; } jumpMessageAddInt(m, length); if (m->status != JUMP_SUCCESS) { return; } for (i = 0; i < length; i++) { jumpMessageAddString(m, strs[i]); if (m->status != JUMP_SUCCESS) { return; } }}JUMPMessageStatusCodejumpMessageGetStatus(JUMPOutgoingMessage m){ return m->status;}/* * An iterator to read off of a message */voidjumpMessageReaderInit(JUMPMessageReader* r, JUMPMessage m){ assert(jumpMessagingInitialized != 0); r->ptr = m->dataPtr; r->ptrEnd = m->dataEnd; r->status = JUMP_SUCCESS;}int8jumpMessageGetByte(JUMPMessageReader* r){ int8 ret; assert(jumpMessagingInitialized != 0); if (r->status != JUMP_SUCCESS) { return 0; } if (r->ptrEnd - r->ptr < 1) { r->status = JUMP_OVERRUN; return 0; } ret = r->ptr[0]; r->ptr += 1; return ret;}int8*jumpMessageGetBytesInto(JUMPMessageReader* r, int8* buffer, uint32 length) { if (r->status != JUMP_SUCCESS) { return NULL; } if (r->ptrEnd - r->ptr < length) { r->status = JUMP_OVERRUN; return NULL; } memcpy(buffer, r->ptr, length); r->ptr += length; return buffer;}int8*jumpMessageGetByteArray(JUMPMessageReader* r, uint32* lengthPtr){ int8* bytearray; uint32 length; assert(jumpMessagingInitialized != 0); if (r->status != JUMP_SUCCESS) { return NULL; } length = jumpMessageGetInt(r); if (r->status != JUMP_SUCCESS) { return NULL; } *lengthPtr = length; if (length == -1) { /* NULL array was written, this is ok. */ return NULL; } if (length < 0) { r->status = JUMP_NEGATIVE_ARRAY_LENGTH; return NULL; } if (r->ptrEnd - r->ptr < length) { r->status = JUMP_OVERRUN; return NULL; } bytearray = calloc(1, length); if (bytearray == NULL) { /* Caller discards message? Or do we "rewind" to the start of the length field again? */ r->status = JUMP_OUT_OF_MEMORY; return NULL; } memcpy(bytearray, r->ptr, length); r->ptr += length; return bytearray;}int32jumpMessageGetInt(JUMPMessageReader* r){ int32 i; assert(jumpMessagingInitialized != 0); if (r->status != JUMP_SUCCESS) { return 0; } if (r->ptrEnd - r->ptr < 4) { r->status = JUMP_OVERRUN; return 0; } i = (int32) (((uint8)r->ptr[0] << 24) | ((uint8)r->ptr[1] << 16) | ((uint8)r->ptr[2] << 8) | (uint8)r->ptr[3]); r->ptr += 4; return i;}int16jumpMessageGetShort(JUMPMessageReader* r){ int16 i; assert(jumpMessagingInitialized != 0); if (r->status != JUMP_SUCCESS) { return 0; } if (r->ptrEnd - r->ptr < 2) { r->status = JUMP_OVERRUN; return 0; } i = (int16) (((uint8)r->ptr[0] << 8) | (uint8)r->ptr[1]); r->ptr += 2; return i;}int64jumpMessageGetLong(JUMPMessageReader* r){ int64 i; assert(jumpMessagingInitialized != 0); if (r->status != JUMP_SUCCESS) { return 0; } if (r->ptrEnd - r->ptr < 8) { r->status = JUMP_OVERRUN; return 0; } i = (int64) (((uint64)r->ptr[0] << 56) | ((uint64)r->ptr[1] << 48) | ((uint64)r->ptr[2] << 40) | ((uint64)r->ptr[3] << 32) | ((uint64)r->ptr[4] << 24) | ((uint64)r->ptr[5] << 16) | ((uint64)r->ptr[6] << 8) | (uint64)r->ptr[7]); r->ptr += 8; return i;}JUMPPlatformCStringjumpMessageGetString(JUMPMessageReader* r){ int len; assert(jumpMessagingInitialized != 0); /* FIXME: ASCII assumption for now */ return (JUMPPlatformCString)jumpMessageGetByteArray(r, &len);}JUMPPlatformCString*jumpMessageGetStringArray(JUMPMessageReader* r, uint32* lengthPtr){ uint32 length; uint32 i; JUMPPlatformCString* strs; assert(jumpMessagingInitialized != 0); if (r->status != JUMP_SUCCESS) { return NULL; } length = jumpMessageGetInt(r); if (r->status != JUMP_SUCCESS) { return NULL; } *lengthPtr = length; if (length == -1) { /* NULL array was written, this is ok. */ return NULL; } if (length < 0) { r->status = JUMP_NEGATIVE_ARRAY_LENGTH; return NULL; } strs = calloc(length, sizeof(JUMPPlatformCString)); if (strs == NULL) { r->status = JUMP_OUT_OF_MEMORY; return NULL; } for (i = 0; i < length; i++) { strs[i] = jumpMessageGetString(r); if (r->status != JUMP_SUCCESS) { jumpMessageFreeStringArray(strs, i); return NULL; } } return strs;}voidjumpMessageFreeStringArray(JUMPPlatformCString* p, uint32 length){ uint32 i; /* jumpMessageGetStringArray() may validly return NULL, so accept it here. */ if (p == NULL) { return; } for (i = 0; i < length; i++) { free(p[i]); } free(p);}JUMPPlatformCStringjumpMessageGetType(JUMPMessage m){ assert(jumpMessagingInitialized != 0); return m->header.type;}JUMPAddress*jumpMessageGetSender(JUMPMessage m) { assert(jumpMessagingInitialized != 0); return &m->header.sender.address;}static voidsendAsyncOfType(JUMPAddress target, JUMPOutgoingMessage m, JUMPPlatformCString type, JUMPMessageStatusCode* code){ int targetpid = target.processId; JUMPMessageQueueHandle targetMq; JUMPMessageQueueStatusCode mqcode; assert(jumpMessagingInitialized != 0); targetMq = jumpMessageQueueOpen(targetpid, type, &mqcode); if (targetMq == NULL) { goto out; } jumpMessageQueueSend(targetMq, m->data, m->dataBufferLen, &mqcode); jumpMessageQueueClose(targetMq); out: *code = translateJumpMessageQueueStatusCode(&mqcode);}voidjumpMessageSendAsync(JUMPAddress target, JUMPOutgoingMessage m, JUMPMessageStatusCode* code){ sendAsyncOfType(target, m, jumpMessageGetType(m), code);}voidjumpMessageSendAsyncResponse(JUMPOutgoingMessage m, JUMPMessageStatusCode* code){ JUMPReturnAddress target; assert(jumpMessagingInitialized != 0); target = getReturnAddress(m); /* For now, just revert to async send. ReturnAddress can contain more information to indicate if a special response should be sent out */ sendAsyncOfType(target.address, m, target.returnType, code);}/* * On return, sets *code to one of JUMP_SUCCESS, JUMP_OUT_OF_MEMORY, * JUMP_TIMEOUT, JUMP_OVERRUN, JUMP_NEGATIVE_ARRAY_LENGTH, * JUMP_NO_SUCH_QUEUE, JUMP_UNBLOCKED, or JUMP_FAILURE. */static JUMPMessagedoWaitFor(JUMPPlatformCString type, int32 timeout, JUMPMessageStatusCode *code){ int status; JUMPMessageQueueStatusCode mqcode; uint8* buffer; struct _JUMPMessage* incoming; status = jumpMessageQueueWaitForMessage(type, timeout, &mqcode); if (status != 0) { /* Timed out, or in error. Must indicate to caller so it can decide which exception to throw (in case of Java), or what error code to handle (in case of native). */ *code = translateJumpMessageQueueStatusCode(&mqcode); return NULL; } buffer = calloc(1, JUMP_MESSAGE_BUFFER_SIZE); if (buffer == NULL) { *code = JUMP_OUT_OF_MEMORY; return NULL; } status = jumpMessageQueueReceive( type, buffer, JUMP_MESSAGE_BUFFER_SIZE, &mqcode); if (status == -1) { *code = translateJumpMessageQueueStatusCode(&mqcode); free(buffer); return NULL; } incoming = newMessageFromReceivedBuffer(buffer, JUMP_MESSAGE_BUFFER_SIZE, code); if (incoming == NULL) { free(buffer); return NULL; } return (JUMPMessage)incoming;}JUMPMessagejumpMessageSendSync(JUMPAddress target, JUMPOutgoingMessage m, int32 timeout, JUMPMessageStatusCode* code){ JUMPMessageHandlerRegistration registration = NULL; JUMPMessage r = NULL; assert(jumpMessagingInitialized != 0); /* Register the message type before sending the message to ensure the queue exists before the recipient sends a message to it. */ registration = jumpMessageRegisterDirect(m->header.sender.returnType, code); if (registration == NULL) { goto out; } jumpMessageSendAsync(target, m, code); if (*code != JUMP_SUCCESS) { goto out; } /* Get a response. Discard any that don't match outgoing request id. */ /* FIXME This is no good, each call to doWaitFor() gets a new timeout. doWaitFor() should use a deadline, not a timeout. */ do { r = doWaitFor(m->header.sender.returnType, timeout, code); } while (r != NULL && r->header.requestId != m->header.requestId); /* sanity? */ if (r != NULL) { assert(!strcmp(r->header.type, m->header.type)); } out: if (registration != NULL) { jumpMessageCancelRegistration(registration); } return r;}JUMPMessageHandlerRegistrationjumpMessageRegisterDirect(JUMPPlatformCString type, JUMPMessageStatusCode *code){ JUMPMessageHandlerRegistration registration; JUMPMessageQueueStatusCode mqcode; registration = malloc(sizeof(*registration)); if (registration == NULL) { *code = JUMP_OUT_OF_MEMORY; goto fail; } registration->messageType = strdup(type); if (registration->messageType == NULL) { *code = JUMP_OUT_OF_MEMORY; goto fail; } jumpMessageQueueCreate(registration->messageType, &mqcode); if (mqcode != JUMP_MQ_SUCCESS) { *code = translateJumpMessageQueueStatusCode(&mqcode); goto fail; } *code = JUMP_SUCCESS; return registration; fail: if (registration != NULL) { free(registration->messageType); free(registration); } return NULL;}/* * Block and wait for incoming message of a given type */JUMPMessagejumpMessageWaitFor(JUMPPlatformCString type, int32 timeout, JUMPMessageStatusCode *code){ assert(jumpMessagingInitialized != 0); return doWaitFor(type, timeout, code);}voidjumpMessageUnblock(JUMPPlatformCString messageType, JUMPMessageStatusCode* code){ JUMPMessageQueueStatusCode mqcode; assert(jumpMessagingInitialized != 0); jumpMessageQueueUnblock(messageType, &mqcode); *code = translateJumpMessageQueueStatusCode(&mqcode);}intjumpMessageGetFd(JUMPPlatformCString type){ return jumpMessageQueueGetFd(type);}JUMPMessageHandlerRegistrationjumpMessageAddHandlerByType(JUMPPlatformCString type, JUMPMessageHandler handler, void* data){ /* FIXME: Should we even have this? I want to avoid having to create new threads in native code, because it seriously complicates the porting layer. Caller code can still do this. */ return NULL;}JUMPMessageHandlerRegistrationjumpMessageAddHandlerByOutgoingMessage(JUMPOutgoingMessage m, JUMPMessageHandler handler, void* data){ return NULL;}voidjumpMessageCancelRegistration(JUMPMessageHandlerRegistration r){ jumpMessageQueueDestroy(r->messageType); free(r->messageType); free(r);}JUMPMessageStatusCodejumpMessageShutdown(void){ /* * Destroy all my message queues */ jumpMessageQueueInterfaceDestroy(); return JUMP_SUCCESS;}JUMPMessageStatusCodejumpMessageStart(void){ /* Ensure the porting layer can handle messages of the size we need. */ assert(JUMP_MESSAGE_BUFFER_SIZE <= JUMP_MESSAGE_QUEUE_MAX_MESSAGE_SIZE); return JUMP_SUCCESS;}JUMPMessageStatusCodejumpMessageRestart(void){ return JUMP_SUCCESS;}/* * Free an outgoing message. * It is OK to free a message after a send call. * Any entity wanting to queue a message for later send must clone it first. */voidjumpMessageFreeOutgoing(JUMPOutgoingMessage m){ freeMessage((struct _JUMPMessage*)m);}/* * Clone outgoing message. Must be freed via jumpMessageFreeOutgoing() */JUMPOutgoingMessagejumpMessageCloneOutgoing(JUMPOutgoingMessage m){ return NULL;}/* * Free an incoming message. */voidjumpMessageFree(JUMPMessage m){ freeMessage((struct _JUMPMessage*)m);}/* * Clone incoming message. Must be freed via jumpMessageFree() */JUMPMessagejumpMessageClone(JUMPMessage m){ return NULL;}/* * Example code * FIXME: Move to unit testing. */int doit(void){ JUMPMessageStatusCode status; JUMPAddress executive = jumpMessageGetExecutiveAddress(); JUMPOutgoingMessage m = jumpMessageNewOutgoingByType("message/test", &status); if (m == NULL) { return JUMP_FAILURE; } jumpMessageAddInt(m, 5); jumpMessageAddByte(m, 3); jumpMessageAddString(m, "test"); if (m->status != JUMP_SUCCESS) { return JUMP_FAILURE; } jumpMessageSendAsync(executive, m, &status); return (status == JUMP_SUCCESS);}JUMPMessageHandlerRegistration myTypeRegistration;voidmyMessageListener(JUMPMessage m, void* data) { JUMPPlatformCString type = jumpMessageGetType(m); printf("Message 0x%x of type %s received\n", (uint32)m, type); jumpMessageCancelRegistration(myTypeRegistration);}void registerMyListener(void){ myTypeRegistration = jumpMessageAddHandlerByType("mytype", myMessageListener, NULL);}void registerResponseListener(JUMPOutgoingMessage m) { myTypeRegistration = jumpMessageAddHandlerByOutgoingMessage(m, myMessageListener, NULL);}/* Emulating a message processor, sending a response */int processRequest(JUMPMessage m) { JUMPMessageStatusCode code; JUMPOutgoingMessage responseMessage; JUMPMessageReader reader; int param1, param2; jumpMessageReaderInit(&reader, m); param1 = jumpMessageGetInt(&reader); param2 = jumpMessageGetInt(&reader); /* .. get other data fields .. */ responseMessage = jumpMessageNewOutgoingByRequest(m, &code); /* * Fill in response */ jumpMessageAddInt(responseMessage, 5); /* ..... etc ..... */ jumpMessageSendAsyncResponse(responseMessage, &code); return (code == JUMP_SUCCESS);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -