⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hdfs.c

📁 hadoop:Nutch集群平台
💻 C
📖 第 1 页 / 共 4 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */#include "hdfs.h"#include "hdfsJniHelper.h"/** * hdfsJniEnv: A wrapper struct to be used as 'value' * while saving thread -> JNIEnv* mappings */typedef struct {    JNIEnv* env;} hdfsJniEnv;/** * Helpful macro to convert a pthread_t to a string */#define GET_threadID(threadID, key, keySize) \    snprintf(key, keySize, "__hdfs_threadID__%u", (unsigned)(threadID)); #define threadID_SIZE 32#define CHECK_jExceptionEPTION_IN_METH_INVOC {\    jthrowable _jException_;\    if ((_jException_ = (*env)->jExceptioneptionOccurred(env))) {\        (*env)->jExceptioneptionDescribe(env);\        *jException = _jException_;\        (*env)->jExceptioneptionClear(env);\        va_end(args);\        return -1;\    }\}/** * getJNIEnv: A helper function to get the JNIEnv* for the given thread. * @param: None. * @return The JNIEnv* corresponding to the thread. */static inline JNIEnv* getJNIEnv(){    char threadID[threadID_SIZE];    const jsize vmBufLength = 1;    JavaVM* vmBuf[vmBufLength];     JNIEnv *env;    jint rv = 0;     jint noVMs = 0;    //Get the threadID and stringize it     GET_threadID(pthread_self(), threadID, sizeof(threadID));    //See if you already have the JNIEnv* cached...    env = (JNIEnv*)searchEntryFromTable(threadID);    if (env != NULL) {        return env;     }    //All right... some serious work required here!    //1. Initialize the HashTable    //2. LOCK!    //3. Check if any JVMs have been created here    //      Yes: Use it (we should only have 1 VM)    //      No: Create the JVM    //4. UNLOCK    hashTableInit();    LOCK_HASH_TABLE();    rv = JNI_GetCreatedJavaVMs(&(vmBuf[0]), vmBufLength, &noVMs);    if (rv != 0) {        fprintf(stderr,                "Call to JNI_GetCreatedJavaVMs failed with error: %d\n", rv);        exit(1);    }    if (noVMs == 0) {        //Get the environment variables for initializing the JVM        char *hadoopClassPath = getenv("CLASSPATH");        if (hadoopClassPath == NULL) {        		fprintf(stderr, "Please set the environment variable $CLASSPATH!\n");        		exit(-1);        }         char *hadoopClassPathVMArg = "-Djava.class.path=";        size_t optHadoopClassPathLen = strlen(hadoopClassPath) +         								strlen(hadoopClassPathVMArg) + 1;        char *optHadoopClassPath = malloc(sizeof(char) * optHadoopClassPathLen);        snprintf(optHadoopClassPath, optHadoopClassPathLen,        	"%s%s", hadoopClassPathVMArg, hadoopClassPath);        //Create the VM        JavaVMInitArgs vm_args;        JavaVMOption options[1];        JavaVM *vm;                // User classes        options[0].optionString = optHadoopClassPath;        // Print JNI-related messages              //options[2].optionString = "-verbose:jni";        vm_args.version = JNI_VERSION_1_2;        vm_args.options = options;        vm_args.nOptions = 1;         vm_args.ignoreUnrecognized = 1;        rv = JNI_CreateJavaVM(&vm, (void**)&env, &vm_args);        if (rv != 0) {            fprintf(stderr,                     "Call to JNI_CreateJavaVM failed with error: %d\n");            exit(1);        }        free(optHadoopClassPath);    } else {        //Attach this thread to the VM        JavaVM* vm = vmBuf[0];        rv = (*vm)->AttachCurrentThread(vm, (void**)&env, 0);        if (rv != 0) {            fprintf(stderr,                     "Call to AttachCurrentThread failed with error: %d\n");            exit(1);        }    }    //Save the threadID -> env mapping    ENTRY e, *ep;    e.key = threadID;    e.data = (void*)(env);    if ((ep = hsearch(e, ENTER)) == NULL) {        fprintf(stderr, "Call to hsearch(ENTER) failed\n");        exit(1);    }    UNLOCK_HASH_TABLE();    return env;}/** * Helper function to create a java.io.File object. * @param env: The JNIEnv pointer.  * @param path: The file-path for which to construct java.io.File object. * @return Returns a jobject on success and NULL on error. */static inline jobject constructNewObjectOfJavaIOFile(JNIEnv *env, const char *path){    //Construct a java.lang.String object    jstring jPath = (*env)->NewStringUTF(env, path);     //Construct the java.io.File object    jthrowable jException;    jobject jFile = constructNewObjectOfClass(env, &jException,             "java/io/File", "(Ljava/lang/String;)V", jPath);    if (jFile == NULL) {        fprintf(stderr,                 "Can't construct instance of class java.io.File for %s\n",                path);        errno = EINTERNAL;        return NULL;    }    //Destroy the java.lang.String object    (*env)->ReleaseStringUTFChars(env, jPath,                (*env)->GetStringUTFChars(env, jPath, 0));    return jFile;}/** * Helper function to create a org.apache.hadoop.fs.Path object. * @param env: The JNIEnv pointer.  * @param path: The file-path for which to construct org.apache.hadoop.fs.Path object. * @return Returns a jobject on success and NULL on error. */static inline jobject constructNewObjectOfPath(JNIEnv *env, const char *path){    //Construct a java.lang.String object    jstring jPathString = (*env)->NewStringUTF(env, path);     //Construct the org.apache.hadoop.fs.Path object    jthrowable jException;    jobject jPath = constructNewObjectOfClass(env, &jException,             "org/apache/hadoop/fs/Path", "(Ljava/lang/String;)V", jPathString);    if (jPath == NULL) {        fprintf(stderr,                 "Can't construct instance of class org.apache.hadoop.fs.Path for %s\n",                 path);        errno = EINTERNAL;        return NULL;    }    //Destroy the java.lang.String object    (*env)->ReleaseStringUTFChars(env, jPathString,                (*env)->GetStringUTFChars(env, jPathString, 0));    return jPath;}/** * Helper function to destroy a local reference of java.lang.Object * @param env: The JNIEnv pointer.  * @param jFile: The local reference of java.lang.Object object * @return None. */static inline void destroyLocalReference(JNIEnv *env, jobject jObject){  (*env)->DeleteLocalRef(env, jObject);}hdfsFS hdfsConnect(const char* host, tPort port){    // JAVA EQUIVALENT:    //  FileSystem fs = FileSystem.get(new Configuration());    //  return fs;    JNIEnv *env = 0;    jobject jConfiguration;    jobject jFS;    jthrowable jException;    //Get the JNIEnv* corresponding to current thread    env = getJNIEnv();    //Create the org.apache.hadoop.conf.Configuration object    jConfiguration = constructNewObjectOfClass(env, &jException,             "org/apache/hadoop/conf/Configuration", "()V");    if (jConfiguration == NULL) {        fprintf(stderr,                "Can't construct instance of class org.apache.hadoop.conf.Configuration\n");        errno = EINTERNAL;        return NULL;    }     //Check what type of FileSystem the caller wants...    if (host == NULL) {        //fs = new LocalFileSystem(conf);        jFS = constructNewObjectOfClass(env, &jException,                "org/apache/hadoop/fs/LocalFileSystem",                "(Lorg/apache/hadoop/conf/Configuration;)V", jConfiguration);        if (jFS == NULL) {            errno = EINTERNAL;            goto done;        }    } else if (!strcmp(host, "default") && port == 0) {        //fs = FileSystem::get(conf);         if (invokeMethod(env, (RetVal*)&jFS, &jException, STATIC, NULL,                    "org/apache/hadoop/fs/FileSystem", "get",                     "(Lorg/apache/hadoop/conf/Configuration;)Lorg/apache/hadoop/fs/FileSystem;",                     jConfiguration) != 0) {            fprintf(stderr,                     "Call to org.apache.hadoop.fs.FileSystem::get failed!\n");            errno = EINTERNAL;            goto done;        }    } else {        //fs = new DistributedFileSystem(new InetSocketAddress(host, port), conf)        jstring jHostName = (*env)->NewStringUTF(env, host);            jobject jNameNode = constructNewObjectOfClass(env, &jException,                "java/net/InetSocketAddress", "(Ljava/lang/String;I)V",                 jHostName, port);        (*env)->ReleaseStringUTFChars(env, jHostName,                            (*env)->GetStringUTFChars(env, jHostName, NULL));        if (jNameNode == NULL) {            errno = EINTERNAL;            goto done;        }            jFS = constructNewObjectOfClass(env, &jException,                "org/apache/hadoop/dfs/DistributedFileSystem",                "(Ljava/net/InetSocketAddress;Lorg/apache/hadoop/conf/Configuration;)V",                 jNameNode, jConfiguration);        destroyLocalReference(env, jNameNode);        if (jFS == NULL) {            errno = EINTERNAL;            goto done;        }    }    done:        //Release unnecessary local references    destroyLocalReference(env, jConfiguration);    return jFS;}int hdfsDisconnect(hdfsFS fs){    // JAVA EQUIVALENT:    //  fs.close()    //Get the JNIEnv* corresponding to current thread    JNIEnv* env = getJNIEnv();    //Parameters    jobject jFS = (jobject)fs;    //jException reference    jthrowable jException;    //Sanity check    if (fs == NULL) {        errno = EBADF;        return -1;    }    if (invokeMethod(env, NULL, &jException, INSTANCE, jFS,                 "org/apache/hadoop/fs/FileSystem",                "close", "()V") != 0) {        fprintf(stderr, "Call to FileSystem::close failed!\n");         errno = EINTERNAL;        return -1;    }    //Release unnecessary local references    destroyLocalReference(env, jFS);    return 0;}hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,         int bufferSize, short replication, tSize blockSize){    // JAVA EQUIVALENT:    //  File f = new File(path);    //  FSData{Input|Output}Stream f{is|os} = fs.create(f);    //  return f{is|os};    //Get the JNIEnv* corresponding to current thread    JNIEnv* env = getJNIEnv();    jobject jFS = (jobject)fs;    jthrowable jException;    //The hadoop java api/signature    const char* method = (flags == O_RDONLY) ? "open" : "create";    const char* signature = (flags == O_RDONLY) ?         "(Lorg/apache/hadoop/fs/Path;I)Lorg/apache/hadoop/fs/FSDataInputStream;" :         "(Lorg/apache/hadoop/fs/Path;ZISJ)Lorg/apache/hadoop/fs/FSDataOutputStream;";    //Return value    hdfsFile file = NULL;    //Create an object of org.apache.hadoop.fs.Path     jobject jPath = constructNewObjectOfPath(env, path);    if (jPath == NULL) {        return NULL;     }    //Create the org.apache.hadoop.conf.Configuration object    //and get the configured values if need be    jobject jConfiguration = constructNewObjectOfClass(env, &jException,             "org/apache/hadoop/conf/Configuration", "()V");    if (jConfiguration == NULL) {        fprintf(stderr,                "Can't construct instance of class org.apache.hadoop.conf.Configuration\n");        errno = EINTERNAL;        return NULL;    }    jint jBufferSize = bufferSize;    jshort jReplication = replication;    jlong jBlockSize = blockSize;    jstring jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size");     jstring jStrReplication = (*env)->NewStringUTF(env, "dfs.replication");    jstring jStrBlockSize = (*env)->NewStringUTF(env, "dfs.block.size");     //bufferSize    if(!bufferSize) {        if (invokeMethod(env, (RetVal*)&jBufferSize, &jException, INSTANCE, jConfiguration,                     "org/apache/hadoop/conf/Configuration", "getInt",                    "(Ljava/lang/String;I)I", jStrBufferSize, 4096)) {            fprintf(stderr,                    "Call to org.apache.hadoop.conf.Configuration::getInt failed!\n");            errno = EINTERNAL;            goto done;        }    }    if(flags == O_WRONLY) {        //replication

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -