📄 rmpfs.c
字号:
// stop prefetching the url because we're performing a 'real' seek pfsHandle.prefetchDone[pfsCookie->slot] = TRUE; perform_real_seek: RMDBGLOG((SEEKDBG, "real seek(%llu, %lu) for slot %lu\n", *position, (RMuint32)whence, pfsCookie->slot)); if (!pfsHandle.forceSyncReads[pfsCookie->slot]) { RMDBGLOG((ENABLE, "force sync reads for slot %lu due to seeking\n", pfsCookie->slot)); pfsHandle.forceSyncReads[pfsCookie->slot] = TRUE; } status = RMSeekFile(pfsHandle.fileHandleTable[pfsCookie->slot], *position, whence); if (status != RM_OK) { RMDBGLOG((ENABLE, "error seeking\n")); error = -1; goto exit; } RMGetCurrentPositionOfFile(pfsHandle.fileHandleTable[pfsCookie->slot], ¤tPosition); exit: pfsCookie->position = currentPosition; *position = currentPosition; RMDBGLOG((SEEKDBG, "current position %llu\n", pfsCookie->position)); LEAVE_CS(pfsHandle.criticalSection[pfsCookie->slot]); return error;}static RMint32 pfs_close(void *cookie){ struct _pfs_cookie *pfsCookie = (struct _pfs_cookie *)cookie; RMDBGLOG((LOCALDBG, "pfs_close(slot:%lu)\n", pfsCookie->slot)); ENTER_CS(pfsHandle.criticalSection[pfsCookie->slot]); pfsHandle.inUse[pfsCookie->slot] = FALSE; pfsHandle.forceSyncReads[pfsCookie->slot] = FALSE; LEAVE_CS(pfsHandle.criticalSection[pfsCookie->slot]); RMFree(cookie); return 0;}static RMbool is_url_in_cache(RMascii *url, RMuint32 *slot){ RMuint32 i; if (!pfsHandle.pfsProfile) { RMDBGLOG((ENABLE, "need to call rmpfs_open first\n")); return FALSE; } for (i = 0; i < pfsHandle.pfsProfile->prefetch_slots; i++) { if (RMCompareAscii(url, pfsHandle.fileNameTable[i])) { RMDBGLOG((LOCALDBG, "found '%s' at slot %lu\n", url, i)); *slot = i; return TRUE; } } return FALSE;}RMFileOps ops = { pfs_read, pfs_write, pfs_seek, pfs_close};static RMfile open_for_pfs(const RMascii *filename, RMfileOpenMode mode, struct stream_options_s *options){ RMuint32 slot; RMfile file = NULL; struct _pfs_cookie *cookie; RMDBGLOG((LOCALDBG, "_open_for_pfs(url:'%s')\n", filename)); if (!is_url_in_cache((RMascii*)filename, &slot)) { RMDBGLOG((ENABLE, "url not prefetched\n")); return NULL; } RMDBGLOG((LOCALDBG, "found url in slot %lu\n", slot));#ifdef WITH_THREADS // wait until we're ready SEMAPHORE_V(pfsHandle.pfsOpenSemaphore[slot]); RMDBGLOG((LOCALDBG, "open_for_pfs[%lu]: ...done\n", slot));#else if (!pfsHandle.prefetchDone[slot]) { RMDBGLOG((ENABLE, "\nopen_for_pfs called before prefetch for the given url was completed\n" "prefetch will be unusable for this url\n")); return NULL; }#endif if (pfsHandle.inUse[slot]) { RMDBGLOG((ENABLE, "\nurl already opened, prefetch disabled for this handle\n")); return NULL; } if (pfsHandle.fileHandleTable[slot]) { RMDBGLOG((LOCALDBG, "internal open\n")); cookie = (struct _pfs_cookie *) RMMalloc(sizeof(struct _pfs_cookie)); cookie->slot = slot; cookie->position = 0; file = RMOpenFileCookie((void*)cookie, mode, &ops); /* used to signal ERROR if rmpfs_close_slot is called (this is just for robustness) */ ENTER_CS(pfsHandle.criticalSection[slot]); pfsHandle.inUse[slot] = TRUE; LEAVE_CS(pfsHandle.criticalSection[slot]); } else RMDBGLOG((LOCALDBG, "external open\n")); // we might want to open several times the same url, so unblock the semaphore SEMAPHORE_P(pfsHandle.pfsOpenSemaphore[slot]); return file;}/* previously it was in rminputstream.c, in order to not break set_rua_test/test compilation without adding the required libraries (CARDEA) to the Makefile, we moved the functions from rminputstream to here.*//* this is the open function used by rmpfs.c */static RMfile open_url(RMascii *url){ RMfile file; RMascii *session_header; struct stream_options_s stream_options; RMDBGLOG((LOCALDBG, "open_url\n")); init_stream_options(&stream_options);/* #### Begin CARDEA code #### */ session_header = get_ms_session_header(url); if (session_header) { RMDBGLOG((ENABLE, "got CARDEA session header '%s'\n", session_header)); if (RMFAILED(set_http_options(&stream_options, RM_HTTP_CUSTOM_HEADER, session_header))) { RMDBGLOG((ENABLE, "Error setting CARDEA session parameter.\n")); return NULL; } }/* #### End CARDEA code #### *//* DTCP initialisation should be here */ RMDBGLOG((LOCALDBG, "about to call open_stream for '%s'\n", url)); file = _open_stream(url, RM_FILE_OPEN_READ, &stream_options); if (!file) RMDBGLOG((ENABLE, "open error!\n")); return file;}/* exported functions */RMstatus rmpfs_open(struct pfs_profile *profile){ RMuint32 i; if (!profile) return RM_ERROR; if (!profile->prefetch_area_size) return RM_ERROR; if (!profile->prefetch_slots) return RM_ERROR; pfsHandle.pfsProfile = profile; pfsHandle.url_prefetch_size = pfsHandle.pfsProfile->prefetch_area_size / pfsHandle.pfsProfile->prefetch_slots; // round down pfsHandle.url_prefetch_size = (pfsHandle.url_prefetch_size / PREFETCH_CHUNK_SIZE) * PREFETCH_CHUNK_SIZE; if (pfsHandle.pfsProfile->prefetch_slots >= MAX_NUMBER_OF_CACHEABLE_URL) { RMDBGLOG((ENABLE, "can't cache more than %lu URLs\n", MAX_NUMBER_OF_CACHEABLE_URL)); return RM_ERROR; } RMDBGLOG((LOCALDBG, "rmpfs_open: buffer @%p, size %lu, urlSize %lu, numURL %lu\n", pfsHandle.pfsProfile->prefetch_area, pfsHandle.pfsProfile->prefetch_area_size, pfsHandle.url_prefetch_size, pfsHandle.pfsProfile->prefetch_slots)); for (i = 0; i < pfsHandle.pfsProfile->prefetch_slots; i++) { pfsHandle.prefetchDone[i] = FALSE; pfsHandle.fileHandleTable[i] = NULL; pfsHandle.fileNameTable[i][0] = '\0'; pfsHandle.prefetchedSize[i] = 0; pfsHandle.urlAllocated[i] = FALSE; pfsHandle.stopPrefetch[i] = FALSE; pfsHandle.inUse[i] = FALSE; pfsHandle.forceSyncReads[i] = FALSE;#ifdef WITH_THREADS // create semaphore to block open_for_pfs calls until we have opened the url pfsHandle.pfsOpenSemaphore[i] = RMCreateSemaphore(0); // create semaphore to block pfs_read calls until requested data has been prefetched pfsHandle.pfsReadSemaphore[i] = RMCreateSemaphore(0); // critical sections pfsHandle.criticalSection[i] = RMCreateCriticalSection();#endif } set_custom_open_for_pfs(open_for_pfs); //saveFile = fopen("read.dump", "wb");#ifdef WITH_THREADS RMDBGLOG((ENABLE, "rmpfs is thread-safe\n"));#else RMDBGLOG((ENABLE, "rmpfs is NOT thread-safe\n"));#endif return RM_OK;}RMstatus rmpfs_close(void){ RMuint32 i; RMDBGLOG((LOCALDBG, "rmpfs_close\n")); for (i = 0; i < pfsHandle.pfsProfile->prefetch_slots; i++) { if (pfsHandle.fileHandleTable[i]) { RMDBGLOG((ENABLE, "slot %lu was not properly closed, closing\n")); RMCloseFile(pfsHandle.fileHandleTable[i]); pfsHandle.fileHandleTable[i] = NULL; } #ifdef WITH_THREADS RMDeleteSemaphore(pfsHandle.pfsOpenSemaphore[i]); RMDeleteSemaphore(pfsHandle.pfsReadSemaphore[i]); RMDeleteCriticalSection(pfsHandle.criticalSection[i]);#endif } pfsHandle.pfsProfile = NULL; set_custom_open_for_pfs(NULL); return RM_OK;}RMstatus rmpfs_allocate_slot_to_url(RMascii *url, RMuint32 slot){ RMDBGLOG((LOCALDBG, "rmpfs_allocate_slot_to_url(slot:%lu, url:'%s')\n", slot, (RMuint8*)url)); if (!pfsHandle.pfsProfile) { RMDBGLOG((ENABLE, "need to call rmpfs_open first\n")); return RM_ERROR; } if ((slot > (MAX_NUMBER_OF_CACHEABLE_URL)) || (slot > (pfsHandle.pfsProfile->prefetch_slots - 1))) { RMDBGLOG((ENABLE, "requested slot is out of range\n")); return RM_ERROR; } if (pfsHandle.fileHandleTable[slot]) { RMDBGLOG((ENABLE, "slot already in use\n")); return RM_ERROR; } RMCopyAscii(pfsHandle.fileNameTable[slot], url); pfsHandle.stopPrefetch[slot] = FALSE; pfsHandle.urlAllocated[slot] = TRUE; return RM_OK;}RMstatus rmpfs_prefetch_slot(RMuint32 slot){ RMfile file; RMstatus status = RM_OK; RMDBGLOG((LOCALDBG, "rmpfs_prefetch_slot(slot:%lu)=%s\n", slot, pfsHandle.fileNameTable[slot])); if (!pfsHandle.pfsProfile) { RMDBGLOG((ENABLE, "need to call rmpfs_open first\n")); return RM_ERROR; } if ((slot > (MAX_NUMBER_OF_CACHEABLE_URL)) || (slot > (pfsHandle.pfsProfile->prefetch_slots - 1))) { RMDBGLOG((ENABLE, "requested slot is out of range\n")); return RM_ERROR; } if (pfsHandle.fileHandleTable[slot]) { RMDBGLOG((ENABLE, "slot already in use\n")); return RM_ERROR; } if (!pfsHandle.urlAllocated[slot]) { RMDBGLOG((ENABLE, "seems the slot has not been allocated\n")); return RM_ERROR; } file = open_url(pfsHandle.fileNameTable[slot]); if (file) { RMuint32 count; RMuint32 bytesLeft = pfsHandle.url_prefetch_size; RMuint32 bytesToRead = RMmin(pfsHandle.url_prefetch_size, PREFETCH_CHUNK_SIZE); RMuint8 *buffer = pfsHandle.pfsProfile->prefetch_area + (pfsHandle.url_prefetch_size * slot); pfsHandle.fileHandleTable[slot] = file; // unblock open_for_pfs SEMAPHORE_P(pfsHandle.pfsOpenSemaphore[slot]); /* using prefetchDone here is correct even if it can be modified by another thread since we have a second check inside a critical section */ while (bytesLeft && !pfsHandle.prefetchDone[slot]) { ENTER_CS(pfsHandle.criticalSection[slot]); if (pfsHandle.stopPrefetch[slot]) { RMDBGLOG((ENABLE, "prefetching for slot %lu interrupted\n", slot)); pfsHandle.prefetchDone[slot] = TRUE; } if (!pfsHandle.prefetchDone[slot]) { status = RMReadFile(file, buffer + pfsHandle.prefetchedSize[slot], bytesToRead, &count); if (status == RM_ERRORENDOFFILE) { RMDBGLOG((LOCALDBG, "[%lu] EOF\n", slot)); pfsHandle.prefetchDone[slot] = TRUE; } else if (status != RM_OK) { // do not do anything (just like EOF), pfs_read will fail by itself when switching to file reading RMDBGLOG((ENABLE, "error prefetch slot %lu, aborting\n", slot)); pfsHandle.prefetchDone[slot] = TRUE; } bytesLeft -= count; pfsHandle.prefetchedSize[slot] += count; bytesToRead = RMmin(bytesLeft, PREFETCH_CHUNK_SIZE); // unblock pfs_read SEMAPHORE_P(pfsHandle.pfsReadSemaphore[slot]); } LEAVE_CS(pfsHandle.criticalSection[slot]); RMDBGPRINT((ENABLE, "%ld ", slot)); } pfsHandle.prefetchDone[slot] = TRUE; RMDBGLOG((LOCALDBG, "prefetch[%lu] end: cached %lu bytes\n", slot, pfsHandle.prefetchedSize[slot])); if ((status == RM_OK) || (status == RM_ERRORENDOFFILE)) return status; } else status = RM_ERROR; RMDBGLOG((ENABLE, "rmpfs_prefetch_slot[%lu] error!\n", slot)); return status;}RMstatus rmpfs_close_slot(RMuint32 slot){ struct ms_url_ctx *url_ctx = NULL; RMDBGLOG((LOCALDBG, "rmpfs_close_slot %lu\n", slot)); if (!pfsHandle.pfsProfile) { RMDBGLOG((ENABLE, "need to call rmpfs_open first\n")); return RM_ERROR; } if (pfsHandle.inUse[slot]) { RMDBGLOG((ENABLE, "slot is in use, can't close\n")); return RM_ERROR; } // stop the prefetching thread ENTER_CS(pfsHandle.criticalSection[slot]); pfsHandle.stopPrefetch[slot] = TRUE; if (pfsHandle.fileHandleTable[slot]) { RMCloseFile(pfsHandle.fileHandleTable[slot]); } else RMDBGLOG((ENABLE, "not open!\n")); if ( (url_ctx = find_cardea_url(pfsHandle.fileNameTable[slot])) != NULL) destroy_cardea_license_data(url_ctx); pfsHandle.fileNameTable[slot][0] = '\0'; pfsHandle.fileHandleTable[slot] = NULL; pfsHandle.prefetchedSize[slot] = 0; pfsHandle.prefetchDone[slot] = FALSE; pfsHandle.urlAllocated[slot] = FALSE; pfsHandle.inUse[slot] = FALSE; pfsHandle.forceSyncReads[slot] = FALSE; LEAVE_CS(pfsHandle.criticalSection[slot]); return RM_OK;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -