📄 hdfs.c
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */#include "hdfs.h"#include "hdfsJniHelper.h"/** * hdfsJniEnv: A wrapper struct to be used as 'value' * while saving thread -> JNIEnv* mappings */typedef struct { JNIEnv* env;} hdfsJniEnv;/** * Helpful macro to convert a pthread_t to a string */#define GET_threadID(threadID, key, keySize) \ snprintf(key, keySize, "__hdfs_threadID__%u", (unsigned)(threadID)); #define threadID_SIZE 32#define CHECK_jExceptionEPTION_IN_METH_INVOC {\ jthrowable _jException_;\ if ((_jException_ = (*env)->jExceptioneptionOccurred(env))) {\ (*env)->jExceptioneptionDescribe(env);\ *jException = _jException_;\ (*env)->jExceptioneptionClear(env);\ va_end(args);\ return -1;\ }\}/** * getJNIEnv: A helper function to get the JNIEnv* for the given thread. * @param: None. * @return The JNIEnv* corresponding to the thread. */static inline JNIEnv* getJNIEnv(){ char threadID[threadID_SIZE]; const jsize vmBufLength = 1; JavaVM* vmBuf[vmBufLength]; JNIEnv *env; jint rv = 0; jint noVMs = 0; //Get the threadID and stringize it GET_threadID(pthread_self(), threadID, sizeof(threadID)); //See if you already have the JNIEnv* cached... env = (JNIEnv*)searchEntryFromTable(threadID); if (env != NULL) { return env; } //All right... some serious work required here! //1. Initialize the HashTable //2. LOCK! //3. Check if any JVMs have been created here // Yes: Use it (we should only have 1 VM) // No: Create the JVM //4. UNLOCK hashTableInit(); LOCK_HASH_TABLE(); rv = JNI_GetCreatedJavaVMs(&(vmBuf[0]), vmBufLength, &noVMs); if (rv != 0) { fprintf(stderr, "Call to JNI_GetCreatedJavaVMs failed with error: %d\n", rv); exit(1); } if (noVMs == 0) { //Get the environment variables for initializing the JVM char *hadoopClassPath = getenv("CLASSPATH"); if (hadoopClassPath == NULL) { fprintf(stderr, "Please set the environment variable $CLASSPATH!\n"); exit(-1); } char *hadoopClassPathVMArg = "-Djava.class.path="; size_t optHadoopClassPathLen = strlen(hadoopClassPath) + strlen(hadoopClassPathVMArg) + 1; char *optHadoopClassPath = malloc(sizeof(char) * optHadoopClassPathLen); snprintf(optHadoopClassPath, optHadoopClassPathLen, "%s%s", hadoopClassPathVMArg, hadoopClassPath); //Create the VM JavaVMInitArgs vm_args; JavaVMOption options[1]; JavaVM *vm; // User classes options[0].optionString = optHadoopClassPath; // Print JNI-related messages //options[2].optionString = "-verbose:jni"; vm_args.version = JNI_VERSION_1_2; vm_args.options = options; vm_args.nOptions = 1; vm_args.ignoreUnrecognized = 1; rv = JNI_CreateJavaVM(&vm, (void**)&env, &vm_args); if (rv != 0) { fprintf(stderr, "Call to JNI_CreateJavaVM failed with error: %d\n"); exit(1); } free(optHadoopClassPath); } else { //Attach this thread to the VM JavaVM* vm = vmBuf[0]; rv = (*vm)->AttachCurrentThread(vm, (void**)&env, 0); if (rv != 0) { fprintf(stderr, "Call to AttachCurrentThread failed with error: %d\n"); exit(1); } } //Save the threadID -> env mapping ENTRY e, *ep; e.key = threadID; e.data = (void*)(env); if ((ep = hsearch(e, ENTER)) == NULL) { fprintf(stderr, "Call to hsearch(ENTER) failed\n"); exit(1); } UNLOCK_HASH_TABLE(); return env;}/** * Helper function to create a java.io.File object. * @param env: The JNIEnv pointer. * @param path: The file-path for which to construct java.io.File object. * @return Returns a jobject on success and NULL on error. */static inline jobject constructNewObjectOfJavaIOFile(JNIEnv *env, const char *path){ //Construct a java.lang.String object jstring jPath = (*env)->NewStringUTF(env, path); //Construct the java.io.File object jthrowable jException; jobject jFile = constructNewObjectOfClass(env, &jException, "java/io/File", "(Ljava/lang/String;)V", jPath); if (jFile == NULL) { fprintf(stderr, "Can't construct instance of class java.io.File for %s\n", path); errno = EINTERNAL; return NULL; } //Destroy the java.lang.String object (*env)->ReleaseStringUTFChars(env, jPath, (*env)->GetStringUTFChars(env, jPath, 0)); return jFile;}/** * Helper function to create a org.apache.hadoop.fs.Path object. * @param env: The JNIEnv pointer. * @param path: The file-path for which to construct org.apache.hadoop.fs.Path object. * @return Returns a jobject on success and NULL on error. */static inline jobject constructNewObjectOfPath(JNIEnv *env, const char *path){ //Construct a java.lang.String object jstring jPathString = (*env)->NewStringUTF(env, path); //Construct the org.apache.hadoop.fs.Path object jthrowable jException; jobject jPath = constructNewObjectOfClass(env, &jException, "org/apache/hadoop/fs/Path", "(Ljava/lang/String;)V", jPathString); if (jPath == NULL) { fprintf(stderr, "Can't construct instance of class org.apache.hadoop.fs.Path for %s\n", path); errno = EINTERNAL; return NULL; } //Destroy the java.lang.String object (*env)->ReleaseStringUTFChars(env, jPathString, (*env)->GetStringUTFChars(env, jPathString, 0)); return jPath;}/** * Helper function to destroy a local reference of java.lang.Object * @param env: The JNIEnv pointer. * @param jFile: The local reference of java.lang.Object object * @return None. */static inline void destroyLocalReference(JNIEnv *env, jobject jObject){ (*env)->DeleteLocalRef(env, jObject);}hdfsFS hdfsConnect(const char* host, tPort port){ // JAVA EQUIVALENT: // FileSystem fs = FileSystem.get(new Configuration()); // return fs; JNIEnv *env = 0; jobject jConfiguration; jobject jFS; jthrowable jException; //Get the JNIEnv* corresponding to current thread env = getJNIEnv(); //Create the org.apache.hadoop.conf.Configuration object jConfiguration = constructNewObjectOfClass(env, &jException, "org/apache/hadoop/conf/Configuration", "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class org.apache.hadoop.conf.Configuration\n"); errno = EINTERNAL; return NULL; } //Check what type of FileSystem the caller wants... if (host == NULL) { //fs = new LocalFileSystem(conf); jFS = constructNewObjectOfClass(env, &jException, "org/apache/hadoop/fs/LocalFileSystem", "(Lorg/apache/hadoop/conf/Configuration;)V", jConfiguration); if (jFS == NULL) { errno = EINTERNAL; goto done; } } else if (!strcmp(host, "default") && port == 0) { //fs = FileSystem::get(conf); if (invokeMethod(env, (RetVal*)&jFS, &jException, STATIC, NULL, "org/apache/hadoop/fs/FileSystem", "get", "(Lorg/apache/hadoop/conf/Configuration;)Lorg/apache/hadoop/fs/FileSystem;", jConfiguration) != 0) { fprintf(stderr, "Call to org.apache.hadoop.fs.FileSystem::get failed!\n"); errno = EINTERNAL; goto done; } } else { //fs = new DistributedFileSystem(new InetSocketAddress(host, port), conf) jstring jHostName = (*env)->NewStringUTF(env, host); jobject jNameNode = constructNewObjectOfClass(env, &jException, "java/net/InetSocketAddress", "(Ljava/lang/String;I)V", jHostName, port); (*env)->ReleaseStringUTFChars(env, jHostName, (*env)->GetStringUTFChars(env, jHostName, NULL)); if (jNameNode == NULL) { errno = EINTERNAL; goto done; } jFS = constructNewObjectOfClass(env, &jException, "org/apache/hadoop/dfs/DistributedFileSystem", "(Ljava/net/InetSocketAddress;Lorg/apache/hadoop/conf/Configuration;)V", jNameNode, jConfiguration); destroyLocalReference(env, jNameNode); if (jFS == NULL) { errno = EINTERNAL; goto done; } } done: //Release unnecessary local references destroyLocalReference(env, jConfiguration); return jFS;}int hdfsDisconnect(hdfsFS fs){ // JAVA EQUIVALENT: // fs.close() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; //jException reference jthrowable jException; //Sanity check if (fs == NULL) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jException, INSTANCE, jFS, "org/apache/hadoop/fs/FileSystem", "close", "()V") != 0) { fprintf(stderr, "Call to FileSystem::close failed!\n"); errno = EINTERNAL; return -1; } //Release unnecessary local references destroyLocalReference(env, jFS); return 0;}hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, short replication, tSize blockSize){ // JAVA EQUIVALENT: // File f = new File(path); // FSData{Input|Output}Stream f{is|os} = fs.create(f); // return f{is|os}; //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); jobject jFS = (jobject)fs; jthrowable jException; //The hadoop java api/signature const char* method = (flags == O_RDONLY) ? "open" : "create"; const char* signature = (flags == O_RDONLY) ? "(Lorg/apache/hadoop/fs/Path;I)Lorg/apache/hadoop/fs/FSDataInputStream;" : "(Lorg/apache/hadoop/fs/Path;ZISJ)Lorg/apache/hadoop/fs/FSDataOutputStream;"; //Return value hdfsFile file = NULL; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } //Create the org.apache.hadoop.conf.Configuration object //and get the configured values if need be jobject jConfiguration = constructNewObjectOfClass(env, &jException, "org/apache/hadoop/conf/Configuration", "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class org.apache.hadoop.conf.Configuration\n"); errno = EINTERNAL; return NULL; } jint jBufferSize = bufferSize; jshort jReplication = replication; jlong jBlockSize = blockSize; jstring jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size"); jstring jStrReplication = (*env)->NewStringUTF(env, "dfs.replication"); jstring jStrBlockSize = (*env)->NewStringUTF(env, "dfs.block.size"); //bufferSize if(!bufferSize) { if (invokeMethod(env, (RetVal*)&jBufferSize, &jException, INSTANCE, jConfiguration, "org/apache/hadoop/conf/Configuration", "getInt", "(Ljava/lang/String;I)I", jStrBufferSize, 4096)) { fprintf(stderr, "Call to org.apache.hadoop.conf.Configuration::getInt failed!\n"); errno = EINTERNAL; goto done; } } if(flags == O_WRONLY) { //replication
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -