📄 hdfs.c
字号:
jint jTmpReplication; if(!replication) { if (invokeMethod(env, (RetVal*)&jTmpReplication, &jException, INSTANCE, jConfiguration, "org/apache/hadoop/conf/Configuration", "getInt", "(Ljava/lang/String;I)I", jStrReplication, 1)) { fprintf(stderr, "Call to org.apache.hadoop.conf.Configuration::getInt failed!\n"); errno = EINTERNAL; goto done; } jReplication = jTmpReplication; } //blockSize if(!blockSize) { if (invokeMethod(env, (RetVal*)&jBlockSize, &jException, INSTANCE, jConfiguration, "org/apache/hadoop/conf/Configuration", "getLong", "(Ljava/lang/String;J)J", jStrBlockSize, 67108864)) { fprintf(stderr, "Call to org.apache.hadoop.fs.FileSystem::%s(%s) failed!\n", method, signature); errno = EINTERNAL; goto done; } } } //Create and return either the FSDataInputStream or FSDataOutputStream references jobject jStream; if(flags == O_RDONLY) { if (invokeMethod(env, (RetVal*)&jStream, &jException, INSTANCE, jFS, "org/apache/hadoop/fs/FileSystem", method, signature, jPath, jBufferSize)) { fprintf(stderr, "Call to org.apache.hadoop.fs.FileSystem::%s(%s) failed!\n", method, signature); errno = EINTERNAL; goto done; } } else { jboolean jOverWrite = 1; if (invokeMethod(env, (RetVal*)&jStream, &jException, INSTANCE, jFS, "org/apache/hadoop/fs/FileSystem", method, signature, jPath, jOverWrite, jBufferSize, jReplication, jBlockSize)) { fprintf(stderr, "Call to org.apache.hadoop.fs.FileSystem::%s(%s) failed!\n", method, signature); errno = EINTERNAL; goto done; } } file = malloc(sizeof(struct hdfsFile_internal)); if (!file) { errno = ENOMEM; return NULL; } file->file = (void*)jStream; file->type = ((flags & O_RDONLY) ? INPUT : OUTPUT); done: //Delete unnecessary local references (*env)->ReleaseStringUTFChars(env, jStrBufferSize, (*env)->GetStringUTFChars(env, jStrBufferSize, 0)); (*env)->ReleaseStringUTFChars(env, jStrReplication, (*env)->GetStringUTFChars(env, jStrReplication, 0)); (*env)->ReleaseStringUTFChars(env, jStrBlockSize, (*env)->GetStringUTFChars(env, jStrBlockSize, 0)); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jPath); return file;}int hdfsCloseFile(hdfsFS fs, hdfsFile file){ // JAVA EQUIVALENT: // file.close //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; jobject jStream = (jobject)(file ? file->file : NULL); //jException reference jthrowable jException; //Sanity check if (!file || file->type == UNINITIALIZED) { errno = EBADF; return -1; } //The interface whose 'close' method to be called const char* interface = (file->type == INPUT) ? "org/apache/hadoop/fs/FSDataInputStream" : "org/apache/hadoop/fs/FSDataOutputStream"; if (invokeMethod(env, NULL, &jException, INSTANCE, jStream, interface, "close", "()V") != 0) { fprintf(stderr, "Call to %s::close failed!\n", interface); errno = EINTERNAL; return -1; } //De-allocate memory free(file); return 0;}tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length){ // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(bR); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; jobject jInputStream = (jobject)(f ? f->file : NULL); jthrowable jException; jbyteArray jbRarray; jint noReadBytes = 0; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } //Error checking... make sure that this file is 'readable' if (f->type != INPUT) { fprintf(stderr, "Cannot read from a non-InputStream object!\n"); errno = EINVAL; return -1; } //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); if (invokeMethod(env, (RetVal*)&noReadBytes, &jException, INSTANCE, jInputStream, "org/apache/hadoop/fs/FSDataInputStream", "read", "([B)I", jbRarray) != 0) { fprintf(stderr, "Call to org.apache.hadoop.fs.FSDataInputStream::read failed!\n"); errno = EINTERNAL; noReadBytes = -1; } else { if(noReadBytes > 0) { (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); } //This is a valid case: there aren't any bytes left to read! errno = 0; } (*env)->ReleaseByteArrayElements(env, jbRarray, (*env)->GetByteArrayElements(env, jbRarray, 0), JNI_ABORT); return noReadBytes;} tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, tSize length){ // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(pos, bR, 0, length); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; jobject jInputStream = (jobject)(f ? f->file : NULL); jthrowable jException; jbyteArray jbRarray; jint noReadBytes = 0; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } //Error checking... make sure that this file is 'readable' if (f->type != INPUT) { fprintf(stderr, "Cannot read from a non-InputStream object!\n"); errno = EINVAL; return -1; } //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); if (invokeMethod(env, (RetVal*)&noReadBytes, &jException, INSTANCE, jInputStream, "org/apache/hadoop/fs/FSDataInputStream", "read", "(J[BII)I", position, jbRarray, 0, length) != 0) { fprintf(stderr, "Call to org.apache.hadoop.fs.FSDataInputStream::read failed!\n"); errno = EINTERNAL; noReadBytes = -1; } else { if(noReadBytes > 0) { (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); } //This is a valid case: there aren't any bytes left to read! errno = 0; } (*env)->ReleaseByteArrayElements(env, jbRarray, (*env)->GetByteArrayElements(env, jbRarray, 0), JNI_ABORT); return noReadBytes;}tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length){ // JAVA EQUIVALENT // byte b[] = str.getBytes(); // fso.write(b); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; jobject jOutputStream = (jobject)(f ? f->file : 0); jthrowable jException; jbyteArray jbWarray; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } if (length < 0) { errno = EINVAL; return -1; } //Error checking... make sure that this file is 'writable' if (f->type != OUTPUT) { fprintf(stderr, "Cannot write into a non-OutputStream object!\n"); errno = EINVAL; return -1; } // 'length' equals 'zero' is a valid use-case according to Posix! if (length != 0) { //Write the requisite bytes into the file jbWarray = (*env)->NewByteArray(env, length); (*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer); if (invokeMethod(env, NULL, &jException, INSTANCE, jOutputStream, "org/apache/hadoop/fs/FSDataOutputStream", "write", "([B)V", jbWarray)) { fprintf(stderr, "Call to org.apache.hadoop.fs.FSDataOutputStream::write failed!\n" ); errno = EINTERNAL; length = -1; } (*env)->ReleaseByteArrayElements(env, jbWarray, (*env)->GetByteArrayElements(env, jbWarray, 0), JNI_ABORT); } //Return no. of bytes succesfully written (libc way) //i.e. 'length' itself! ;-) return length;}int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos) { // JAVA EQUIVALENT // fis.seek(pos); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; jobject jInputStream = (jobject)(f ? f->file : 0); jthrowable jException; //Sanity check if (!f || f->type != INPUT) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jException, INSTANCE, jInputStream, "org/apache/hadoop/fs/FSDataInputStream", "seek", "(J)V", desiredPos) != 0) { fprintf(stderr, "Call to org.apache.hadoop.fs.FSDataInputStream::seek failed!\n"); errno = EINTERNAL; return -1; } return 0;}tOffset hdfsTell(hdfsFS fs, hdfsFile f){ // JAVA EQUIVALENT // pos = f.getPos(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; jobject jStream = (jobject)(f ? f->file : 0); jthrowable jException; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } const char* interface = (f->type == INPUT) ? "org/apache/hadoop/fs/FSDataInputStream" : "org/apache/hadoop/fs/FSDataOutputStream"; jlong currentPos = -1; if (invokeMethod(env,(RetVal*)¤tPos, &jException, INSTANCE, jStream, interface, "getPos", "()J") != 0) { fprintf(stderr, "Call to org.apache.hadoop.fs.FSDataInputStream::getPos failed!\n"); errno = EINTERNAL; return -1; } return (tOffset)currentPos;}int hdfsFlush(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // fos.flush(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; jobject jOutputStream = (jobject)(f ? f->file : 0); jthrowable jException; //Sanity check if (!f || f->type != OUTPUT) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jException, INSTANCE, jOutputStream, "org/apache/hadoop/fs/FSDataOutputStream", "flush", "()V") != 0) { fprintf(stderr, "Call to org.apache.hadoop.fs.FSDataInputStream::flush failed!\n" ); errno = EINTERNAL; return -1; } return 0;}int hdfsAvailable(hdfsFS fs, hdfsFile f){ // JAVA EQUIVALENT // fis.available(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); //Parameters jobject jFS = (jobject)fs; jobject jInputStream = (jobject)(f ? f->file : 0); jthrowable jException; //Sanity check if (!f || f->type != INPUT) { errno = EBADF; return -1; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -