📄 jump_messaging.c
字号:
/* * Copyright 1990-2006 Sun Microsystems, Inc. All Rights Reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License version * 2 only, as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License version 2 for more details (a copy is * included at /legal/license.txt). * * You should have received a copy of the GNU General Public License * version 2 along with this work; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa * Clara, CA 95054 or visit www.sun.com if you need additional * information or have any questions. */#include <jni.h>#include <jump_messaging.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <assert.h>#include "porting/JUMPProcess.h"#include "porting/JUMPMessageQueue.h"#include "porting/JUMPThread.h"#include "porting/JUMPClib.h"typedef struct { JUMPAddress address; /* FIXME: rename type to thread? */ JUMPPlatformCString returnType;} JUMPReturnAddress;struct _JUMPMessageHeader { uint32 messageId; int32 requestId; JUMPReturnAddress sender; JUMPPlatformCString type;};#define jumpMessagingInitialized (1)/* * @brief variable sized encapsulation of a message */struct _JUMPMessage { struct _JUMPMessageHeader header; /* The message's data */ uint8* data; /* The length of the data */ int dataBufferLen; /* The byte after the message's data */ const uint8* dataEnd; /* Current location of the data pointer, for adding to the message */ uint8* dataPtr; /* First error encountered by jumpMessageAdd...(), or JUMP_SUCCESS. */ JUMPMessageStatusCode status;};struct JUMPMessageHandlerRegistration { JUMPPlatformCString messageType;};/* * This is the api and OS-independent implementation of native * messaging in JUMP. */static JUMPMessageStatusCodetranslateJumpMessageQueueStatusCode(const JUMPMessageQueueStatusCode *mqcode){ switch (*mqcode) { case JUMP_MQ_TIMEOUT: return JUMP_TIMEOUT; case JUMP_MQ_BUFFER_SMALL: return JUMP_FAILURE; case JUMP_MQ_SUCCESS: return JUMP_SUCCESS; case JUMP_MQ_FAILURE: return JUMP_FAILURE; case JUMP_MQ_OUT_OF_MEMORY: return JUMP_OUT_OF_MEMORY; case JUMP_MQ_BAD_MESSAGE_SIZE: return JUMP_FAILURE; case JUMP_MQ_WOULD_BLOCK: return JUMP_WOULD_BLOCK; case JUMP_MQ_NO_SUCH_QUEUE: return JUMP_NO_SUCH_QUEUE; case JUMP_MQ_UNBLOCKED: return JUMP_UNBLOCKED; case JUMP_MQ_TARGET_NONEXISTENT: return JUMP_TARGET_NONEXISTENT; default: assert(0); return JUMP_FAILURE; }}static JUMPAddressmkAddr(int32 pid){ JUMPAddress addr; addr.processId = pid; return addr;}/* Type should be newly allocated. */static JUMPReturnAddressmkReturnAddr(int32 pid, JUMPPlatformCString type){ JUMPReturnAddress addr; addr.address = mkAddr(pid); addr.returnType = type; return addr;}JUMPAddress jumpMessageGetMyAddress(void){ assert(jumpMessagingInitialized != 0); return mkAddr(jumpProcessGetId());}/* * This is how message queues are named, based on pid, and a name */#define JUMP_RESPONSE_QUEUE_NAME_PATTERN "<response-thread-%d>"char*jumpMessageGetReturnTypeName(void){ char name[80]; assert(jumpMessagingInitialized != 0); snprintf(name, sizeof(name), JUMP_RESPONSE_QUEUE_NAME_PATTERN, jumpThreadGetId()); return strdup(name);}static JUMPReturnAddressgetMyReturnAddress(void){ assert(jumpMessagingInitialized != 0); return mkReturnAddr(jumpProcessGetId(), jumpMessageGetReturnTypeName());}JUMPAddress jumpMessageGetExecutiveAddress(void){ assert(jumpMessagingInitialized != 0); return mkAddr(jumpProcessGetExecutiveId());}/* * Given a "raw" buffer, allocated or received, make a corresponding * JUMPMessage. */static struct _JUMPMessage* newMessageFromBuffer(uint8* buffer, uint32 len){ struct _JUMPMessage* message = calloc(1, sizeof(struct _JUMPMessage)); if (message == NULL) { return NULL; } message->data = buffer; message->dataBufferLen = len; message->dataEnd = buffer + len; message->dataPtr = buffer + jumpMessageQueueDataOffset(); message->status = JUMP_SUCCESS; return message;}/* * Create a new blank message */static struct _JUMPMessage* newMessage(void){ uint8* buffer; struct _JUMPMessage* message; buffer = calloc(1, JUMP_MESSAGE_BUFFER_SIZE); if (buffer == NULL) { return NULL; } message = newMessageFromBuffer(buffer, JUMP_MESSAGE_BUFFER_SIZE); if (message == NULL) { free(buffer); return NULL; } return message;}/* * Take the 'header' struct in a JUMPMessage, and "serialize" it into * the message data area. On success, returns JUMP_SUCCESS. On failure, * returns JUMP_OVERRUN. */static JUMPMessageStatusCodeputHeaderInMessage(struct _JUMPMessage* m){ struct _JUMPMessageHeader* hdr = &m->header; jumpMessageAddInt((JUMPOutgoingMessage)m, hdr->messageId); jumpMessageAddInt((JUMPOutgoingMessage)m, hdr->requestId); jumpMessageAddInt((JUMPOutgoingMessage)m, hdr->sender.address.processId); jumpMessageAddString((JUMPOutgoingMessage)m, hdr->sender.returnType); jumpMessageAddString((JUMPOutgoingMessage)m, hdr->type); return ((JUMPOutgoingMessage)m)->status;}/* * Deserialize header from message to 'header' structure in JUMPMessage. * Returns one of JUMP_SUCCESS, JUMP_OUT_OF_MEMORY, JUMP_OVERRUN, * or JUMP_NEGATIVE_ARRAY_LENGTH. */static JUMPMessageStatusCodegetHeaderFromMessage(struct _JUMPMessage* m){ JUMPMessageReader reader; struct _JUMPMessageHeader* hdr = &m->header; jumpMessageReaderInit(&reader, (JUMPMessage)m); hdr->messageId = jumpMessageGetInt(&reader); hdr->requestId = jumpMessageGetInt(&reader); hdr->sender.address.processId = jumpMessageGetInt(&reader); hdr->sender.returnType = jumpMessageGetString(&reader); hdr->type = jumpMessageGetString(&reader); /* Make sure the data pointer in the message is now set past the header */ m->dataPtr = reader.ptr; return reader.status;}/* * Given a "raw" buffer, allocated or received, make a corresponding * JUMPMessage. * * On success the struct _JUMPMessage is returned and *code is set to * JUMP_SUCCESS. On failure, NULL is returned and *code is set to one * of JUMP_OUT_OF_MEMORY, JUMP_OVERRUN, or JUMP_NEGATIVE_ARRAY_LENGTH. * */static struct _JUMPMessage* newMessageFromReceivedBuffer(uint8* buffer, uint32 len, JUMPMessageStatusCode *code){ struct _JUMPMessage* message = newMessageFromBuffer(buffer, len); if (message == NULL) { *code = JUMP_OUT_OF_MEMORY; return NULL; } *code = getHeaderFromMessage(message); if (*code != JUMP_SUCCESS) { free(message); return NULL; } return message;}/* * Running counters for message id's and request id's *//* FIXME not thread safe. */static uint32 thisProcessMessageId;static int32 thisProcessRequestId;JUMPOutgoingMessagejumpMessageNewOutgoingFromBuffer(uint8* buffer, int isResponse, JUMPMessageStatusCode *code){ struct _JUMPMessage* message; JUMPMessageMark mmarkBeforeHeader; JUMPMessageMark mmarkAfterHeader; uint32 messageId; message = newMessageFromBuffer(buffer, JUMP_MESSAGE_BUFFER_SIZE); if (message == NULL) { *code = JUMP_OUT_OF_MEMORY; return NULL; } jumpMessageMarkSet(&mmarkBeforeHeader, message); /* If this works, the following adds will work. */ *code = getHeaderFromMessage(message); if (*code != JUMP_SUCCESS) { free(message); return NULL; } jumpMessageMarkSet(&mmarkAfterHeader, message); /* rewind to beginning of header */ jumpMessageMarkResetTo(&mmarkBeforeHeader, message); /* Set message ID in the message payload and in the header. */ messageId = thisProcessMessageId++; jumpMessageAddInt(message, messageId); message->header.messageId = messageId; if (!isResponse) { int32 requestId; requestId = thisProcessRequestId++; jumpMessageAddInt(message, requestId); message->header.requestId = requestId; } /* Remember where we left the header */ jumpMessageMarkResetTo(&mmarkAfterHeader, message); return (JUMPOutgoingMessage)message;}uint8*jumpMessageGetData(JUMPMessage message){ return ((struct _JUMPMessage*)message)->data;}/* * Message header inspection */static JUMPReturnAddressgetReturnAddress(JUMPMessage m){ assert(jumpMessagingInitialized != 0); return m->header.sender;}static JUMPAddresscloneJUMPAddress(JUMPAddress address){ return address;}static JUMPReturnAddresscloneJUMPReturnAddress(JUMPReturnAddress returnAddress){ JUMPReturnAddress newReturnAddress; newReturnAddress.address = cloneJUMPAddress(returnAddress.address); newReturnAddress.returnType = strdup(returnAddress.returnType); return newReturnAddress;}static voidfreeJUMPAddress(JUMPAddress address){ /* Nothing to do. */}static voidfreeJUMPReturnAddress(JUMPReturnAddress returnAddress){ freeJUMPAddress(returnAddress.address); free(returnAddress.returnType);}/* * Free an incoming message. */static voidfreeMessage(struct _JUMPMessage* m){ /* Free all component allocations, and then the message itself */ free(m->data); freeJUMPReturnAddress(m->header.sender); free(m->header.type); /* Make sure the contents are not used accidentally */ memset(m, 0, sizeof(struct _JUMPMessage)); free(m);}/* type and addr are copied, and freed with the JUMPOutgoingMessage. On success, *code is set to JUMP_SUCCESS. Otherwise it is set to one of JUMP_OUT_OF_MEMORY or JUMP_OVERRUN. */static JUMPOutgoingMessagenewOutgoingMessage(JUMPPlatformCString type, uint32 requestId, JUMPReturnAddress addr, JUMPMessageStatusCode *code){ struct _JUMPMessage* message; assert(jumpMessagingInitialized != 0); message = newMessage(); if (message == NULL) { *code = JUMP_OUT_OF_MEMORY; goto fail; } message->header.messageId = thisProcessMessageId++; message->header.requestId = requestId; message->header.sender = cloneJUMPReturnAddress(addr); if (message->header.sender.returnType == NULL) { *code = JUMP_OUT_OF_MEMORY; goto fail; } message->header.type = strdup(type); if (message->header.type == NULL) { *code = JUMP_OUT_OF_MEMORY; goto fail; } *code = putHeaderInMessage(message); if (*code != JUMP_SUCCESS) { goto fail; } return (JUMPOutgoingMessage)message; fail: if (message != NULL) { freeMessage(message); } return NULL;}JUMPOutgoingMessagejumpMessageNewOutgoingByType(JUMPPlatformCString type, JUMPMessageStatusCode *code){ uint32 requestId = thisProcessRequestId++; JUMPReturnAddress myReturnAddress; JUMPOutgoingMessage message; assert(jumpMessagingInitialized != 0); myReturnAddress = getMyReturnAddress(); message = newOutgoingMessage(type, requestId, myReturnAddress, code); freeJUMPReturnAddress(myReturnAddress); return message;}JUMPOutgoingMessagejumpMessageNewOutgoingByRequest(JUMPMessage requestMessage, JUMPMessageStatusCode *code){ uint32 requestId = requestMessage->header.requestId; assert(jumpMessagingInitialized != 0); return newOutgoingMessage(jumpMessageGetType(requestMessage), requestId, getReturnAddress(requestMessage), code);}voidjumpMessageMarkSet(JUMPMessageMark* mmark, struct _JUMPMessage* m){ mmark->ptr = m->dataPtr; mmark->m = m;}voidjumpMessageMarkResetTo(JUMPMessageMark* mmark, struct _JUMPMessage* m){ assert(m == mmark->m); /* sanity */ m->dataPtr = mmark->ptr;}voidjumpMessageAddByte(JUMPOutgoingMessage m, int8 value){ assert(jumpMessagingInitialized != 0); if (m->status != JUMP_SUCCESS) { return; } if (m->dataEnd - m->dataPtr < 1) { m->status = JUMP_OVERRUN; return; } m->dataPtr[0] = value; m->dataPtr += 1;}voidjumpMessageAddBytesFrom(JUMPOutgoingMessage m, const int8* values, int length){ assert(jumpMessagingInitialized != 0); if (m->status != JUMP_SUCCESS) { return; } if ((values == NULL) || (length == 0)) { return; } if (length < 0) { m->status = JUMP_NEGATIVE_ARRAY_LENGTH; return; } if (m->dataEnd - m->dataPtr < length) { m->status = JUMP_OVERRUN; return; } memcpy(m->dataPtr, values, length); m->dataPtr += length;}voidjumpMessageAddByteArray(JUMPOutgoingMessage m, const int8* values, int length){ assert(jumpMessagingInitialized != 0); if (m->status != JUMP_SUCCESS) { return; } if (values == NULL) { jumpMessageAddInt(m, -1); return; } if (length < 0) { m->status = JUMP_NEGATIVE_ARRAY_LENGTH; return; } jumpMessageAddInt(m, length); if (m->status != JUMP_SUCCESS) { return; } if (m->dataEnd - m->dataPtr < length) { m->status = JUMP_OVERRUN; return; } memcpy(m->dataPtr, values, length); m->dataPtr += length;}voidjumpMessageAddInt(JUMPOutgoingMessage m, int32 value){ uint32 v; assert(jumpMessagingInitialized != 0); if (m->status != JUMP_SUCCESS) { return; } if (m->dataEnd - m->dataPtr < 4) { m->status = JUMP_OVERRUN; return; } v = (uint32)value; m->dataPtr[0] = (v >> 24) & 0xff; m->dataPtr[1] = (v >> 16) & 0xff; m->dataPtr[2] = (v >> 8) & 0xff; m->dataPtr[3] = (v >> 0) & 0xff;#if 0 printf("Encoded %d as [%d,%d,%d,%d]\n", value, m->dataPtr[0], m->dataPtr[1], m->dataPtr[2], m->dataPtr[3]);#endif m->dataPtr += 4;}voidjumpMessageAddShort(JUMPOutgoingMessage m, int16 value) { uint16 v; assert(jumpMessagingInitialized != 0); if (m->status != JUMP_SUCCESS) { return; } if (m->dataEnd - m->dataPtr < 2) { m->status = JUMP_OVERRUN; return; } v = (uint16)value; m->dataPtr[0] = (v >> 8) & 0xff; m->dataPtr[1] = (v >> 0) & 0xff;#if 0 printf("Encoded %d as [%d,%d]\n", value, m->dataPtr[0], m->dataPtr[1]);#endif m->dataPtr += 2;}voidjumpMessageAddLong(JUMPOutgoingMessage m, int64 value){ uint64 v; assert(jumpMessagingInitialized != 0); if (m->status != JUMP_SUCCESS) { return; } if (m->dataEnd - m->dataPtr < 8) { m->status = JUMP_OVERRUN; return; } v = (uint64)value; m->dataPtr[0] = (v >> 56) & 0xff; m->dataPtr[1] = (v >> 48) & 0xff; m->dataPtr[2] = (v >> 40) & 0xff; m->dataPtr[3] = (v >> 32) & 0xff; m->dataPtr[4] = (v >> 24) & 0xff; m->dataPtr[5] = (v >> 16) & 0xff; m->dataPtr[6] = (v >> 8) & 0xff; m->dataPtr[7] = (v >> 0) & 0xff;#if 0 printf("Encoded %d%d as [%d,%d,%d,%d,%d,%d,%d,%d]\n", value/(1<<32), value%(1<<32), m->dataPtr[0], m->dataPtr[1], m->dataPtr[2], m->dataPtr[3], m->dataPtr[4], m->dataPtr[5], m->dataPtr[6], m->dataPtr[7]);#endif m->dataPtr += 8;}voidjumpMessageAddString(JUMPOutgoingMessage m, JUMPPlatformCString str){ assert(jumpMessagingInitialized != 0); /* FIXME: ASCII assumption for now */ /* By the ascii assumption, a string is a byte array of length strlen(str) + 1 for the terminating '\0' */ jumpMessageAddByteArray(m, (int8*)str, strlen(str) + 1);}voidjumpMessageAddStringArray(JUMPOutgoingMessage m, JUMPPlatformCString* strs, uint32 length){ uint32 i; assert(jumpMessagingInitialized != 0); if (m->status != JUMP_SUCCESS) { return; } if (strs == NULL) { jumpMessageAddInt(m, -1);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -