📄 libasync.c
字号:
#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ if(first_ce->oldbuf != first_ce->myaiocb.aio_buf || first_ce->oldfd != first_ce->myaiocb.aio_fildes || first_ce->oldsize != first_ce->myaiocb.aio_nbytes) printf("It changed in flight2\n"); retval=aio_return(&first_ce->myaiocb);#else if(first_ce->oldbuf != first_ce->myaiocb64.aio_buf || first_ce->oldfd != first_ce->myaiocb64.aio_fildes || first_ce->oldsize != first_ce->myaiocb64.aio_nbytes) printf("It changed in flight2\n"); retval=aio_return64(&first_ce->myaiocb64);#endif#else if(first_ce->oldbuf != first_ce->myaiocb.aio_buf || first_ce->oldfd != first_ce->myaiocb.aio_fildes || first_ce->oldsize != first_ce->myaiocb.aio_nbytes) printf("It changed in flight2\n"); retval=aio_return(&first_ce->myaiocb);#endif#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ if(retval < first_ce->myaiocb.aio_nbytes)#else if(retval < first_ce->myaiocb64.aio_nbytes)#endif#else if(retval < first_ce->myaiocb.aio_nbytes)#endif { printf("aio_return error5: ret %d %d\n",retval,errno);#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("aio_return error5: fd %d offset %lld buffer %lx size %d Opcode %d\n", first_ce->myaiocb.aio_fildes, first_ce->myaiocb.aio_offset, (long)(first_ce->myaiocb.aio_buf), first_ce->myaiocb.aio_nbytes, first_ce->myaiocb.aio_lio_opcode#else printf("aio_return error5: fd %d offset %lld buffer %lx size %d Opcode %d\n", first_ce->myaiocb64.aio_fildes, first_ce->myaiocb64.aio_offset, (long)(first_ce->myaiocb64.aio_buf), first_ce->myaiocb64.aio_nbytes, first_ce->myaiocb64.aio_lio_opcode#endif#else printf("aio_return error5: fd %d offset %ld buffer %lx size %d Opcode %d\n", first_ce->myaiocb.aio_fildes, first_ce->myaiocb.aio_offset, (long)(first_ce->myaiocb.aio_buf), first_ce->myaiocb.aio_nbytes, first_ce->myaiocb.aio_lio_opcode#endif ); } if(retval > 0) {#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ *ubuffer=(char *)first_ce->myaiocb.aio_buf;#else *ubuffer=(char *)first_ce->myaiocb64.aio_buf;#endif#else *ubuffer=(char *)first_ce->myaiocb.aio_buf;#endif }else *ubuffer=(char *)0; first_ce->direct=1; /* do not delete the buffer */ takeoff_cache(gc,first_ce); putoninuse(gc,first_ce); } return((int)retval); }/************************************************************************ * The caller is now finished with the data that was provided so * the library is now free to return the memory to the pool for later * reuse. ************************************************************************/voidasync_release(gc)struct cache *gc;{ takeoffinuse(gc);}/************************************************************************ * Put the buffer on the inuse list. When the user is finished with * the buffer it will call back into async_release and the items on the * inuse list will be deallocated. ************************************************************************/voidputoninuse(gc,entry)struct cache *gc;struct cache_ent *entry;{ if(gc->inuse_head) entry->forward=gc->inuse_head; else entry->forward=0; gc->inuse_head=entry;}/************************************************************************ * This is called when the application is finished with the data that * was provided. The memory may now be returned to the pool. ************************************************************************/voidtakeoffinuse(gc)struct cache *gc;{ struct cache_ent *ce; if(gc->inuse_head==0) printf("Takeoffinuse error\n"); ce=gc->inuse_head; gc->inuse_head=gc->inuse_head->forward; if(gc->inuse_head !=0) printf("Error in take off inuse\n"); free((void*)(ce->real_address)); free(ce);}/************************************************************************* * This routine is a generic async writer assist funtion. It takes * the same calling parameters as write() but also extends the * interface to include: * * offset ..... offset in the file. * depth ..... How much read-ahead do you want. * *************************************************************************/size_tasync_write(gc,fd,buffer,size,offset,depth)struct cache *gc;long long fd,size;char *buffer;off64_t offset;long long depth;{ struct cache_ent *ce; size_t ret; ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,0LL,(char *)0,(char *)0); ce->direct=0; /* not direct. Lib supplies buffer and must free it */#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ mbcopy(buffer,(char *)(ce->myaiocb.aio_buf),(size_t)size);#else mbcopy(buffer,(char *)(ce->myaiocb64.aio_buf),(size_t)size);#endif#else mbcopy(buffer,(char *)(ce->myaiocb.aio_buf),(size_t)size);#endif async_put_on_write_queue(gc,ce); /* printf("asw: fd %d offset %lld, size %d\n",ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); */ again:#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ret=aio_write(&ce->myaiocb);#else ret=aio_write64(&ce->myaiocb64);#endif#else ret=aio_write(&ce->myaiocb);#endif if(ret==-1) { if(errno==EAGAIN) { async_wait_for_write(gc); goto again; } if(errno==0) { /* Compensate for bug in async library */ async_wait_for_write(gc); goto again; } else { printf("Error in aio_write: ret %d errno %d count %lld\n",ret,errno,gc->w_count); /* printf("aio_write_no_copy: fd %d buffer %x offset %lld size %d\n", ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_buf, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); */ exit(177); } } return((ssize_t)size);}/************************************************************************* * Allocate a write aiocb and write buffer of the size specified. Also * put some extra buffer padding so that VX_DIRECT can do its job when * needed. *************************************************************************/struct cache_ent *allocate_write_buffer(gc,fd,offset,size,op,w_depth,direct,buffer,free_addr)struct cache *gc;long long fd,size,op;off64_t offset;long long w_depth;long long direct;char *buffer,*free_addr;{ struct cache_ent *ce; long temp; if(fd==0LL) { printf("Setting up write buffer insane\n"); exit(178); } if(gc->w_count > w_depth) async_wait_for_write(gc); ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent)); if(ce == (struct cache_ent *)0) { printf("Malloc failed 1\n"); exit(179); } bzero(ce,sizeof(struct cache_ent));#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ce->myaiocb.aio_fildes=(int)fd; ce->myaiocb.aio_offset=(off64_t)offset; if(!direct) { ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb.aio_buf=(volatile void *)temp; }else { ce->myaiocb.aio_buf=(volatile void *)buffer; ce->real_address=(char *)free_addr; } if(ce->myaiocb.aio_buf == 0)#else ce->myaiocb64.aio_fildes=(int)fd; ce->myaiocb64.aio_offset=(off64_t)offset; if(!direct) { ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb64.aio_buf=(volatile void *)temp; } else { ce->myaiocb64.aio_buf=(volatile void *)buffer; ce->real_address=(char *)free_addr; } if(ce->myaiocb64.aio_buf == 0)#endif#else ce->myaiocb.aio_fildes=(int)fd; ce->myaiocb.aio_offset=(off_t)offset; if(!direct) { ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb.aio_buf=(volatile void *)temp; } else { ce->myaiocb.aio_buf=(volatile void *)buffer; ce->real_address=(char *)free_addr; } if(ce->myaiocb.aio_buf == 0)#endif { printf("Malloc failed 2\n"); exit(180); }#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ce->myaiocb.aio_reqprio=0; ce->myaiocb.aio_nbytes=(size_t)size; ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb.aio_lio_opcode=(int)op;#else ce->myaiocb64.aio_reqprio=0; ce->myaiocb64.aio_nbytes=(size_t)size; ce->myaiocb64.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb64.aio_lio_opcode=(int)op;#endif#else ce->myaiocb.aio_reqprio=0; ce->myaiocb.aio_nbytes=(size_t)size; ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb.aio_lio_opcode=(int)op;#endif ce->fd=(int)fd; return(ce);}/************************************************************************* * Put it on the outbound queue. *************************************************************************/voidasync_put_on_write_queue(gc,ce)struct cache *gc;struct cache_ent *ce;{ ce->forward=0; ce->back=gc->w_tail; if(gc->w_tail) gc->w_tail->forward = ce; gc->w_tail= ce; if(!gc->w_head) gc->w_head=ce; gc->w_count++; return;}/************************************************************************* * Cleanup all outstanding writes *************************************************************************/voidasync_write_finish(gc)struct cache *gc;{ while(gc->w_head) { /*printf("async_write_finish: Waiting for buffer %x to finish\n",gc->w_head->myaiocb64.aio_buf);*/ async_wait_for_write(gc); }}/************************************************************************* * Wait for an I/O to finish *************************************************************************/voidasync_wait_for_write(gc)struct cache *gc;{ struct cache_ent *ce; size_t ret,retval; if(gc->w_head==0) return; ce=gc->w_head; gc->w_head=ce->forward; gc->w_count--; ce->forward=0; if(ce==gc->w_tail) gc->w_tail=0; /*printf("Wait for buffer %x offset %lld size %d to finish\n", ce->myaiocb64.aio_buf, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); printf("write count %lld \n",gc->w_count); */#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ while((ret=aio_error(&ce->myaiocb))== EINPROGRESS) { async_suspend(ce); }#else while((ret=aio_error64(&ce->myaiocb64))== EINPROGRESS) { async_suspend(ce); }#endif#else while((ret=aio_error(&ce->myaiocb))== EINPROGRESS) { async_suspend(ce); }#endif if(ret) { printf("aio_error 5: ret %d %d\n",ret,errno);#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("fd %d offset %lld size %d\n", ce->myaiocb.aio_fildes, ce->myaiocb.aio_offset, ce->myaiocb.aio_nbytes);#else printf("fd %d offset %lld size %d\n", ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes);#endif#else printf("fd %d offset %lld size %d\n", ce->myaiocb.aio_fildes, ce->myaiocb.aio_offset, ce->myaiocb.aio_nbytes);#endif exit(181); }#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ retval=aio_return(&ce->myaiocb);#else#if defined(__CrayX1__) retval=aio_return64((aiocb64_t *)&ce->myaiocb64);#else retval=aio_return64((struct aiocb64 *)&ce->myaiocb64);#endif#endif#else retval=aio_return(&ce->myaiocb);#endif if((int)retval < 0) { printf("aio_return error: %d\n",errno); } if(!ce->direct) { /* printf("Freeing buffer %x\n",ce->real_address);*/ free((void *)(ce->real_address)); free((void *)ce); }}/************************************************************************* * This routine is a generic async writer assist funtion. It takes * the same calling parameters as write() but also extends the * interface to include: * * offset ..... offset in the file. * depth ..... How much read-ahead do you want. * free_addr .. address of memory to free after write is completed. * *************************************************************************/size_tasync_write_no_copy(gc,fd,buffer,size,offset,depth,free_addr)struct cache *gc;long long fd,size;char *buffer;off64_t offset;long long depth;char *free_addr;{ struct cache_ent *ce; size_t ret; long long direct = 1; ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,direct,buffer,free_addr); ce->direct=0; /* have library de-allocate the buffer */ async_put_on_write_queue(gc,ce); /* printf("awnc: fd %d offset %lld, size %d\n",ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); */again:#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ret=aio_write(&ce->myaiocb);#else ret=aio_write64(&ce->myaiocb64);#endif#else ret=aio_write(&ce->myaiocb);#endif if(ret==-1) { if(errno==EAGAIN) { async_wait_for_write(gc); goto again; } if(errno==0) { /* Compensate for bug in async library */ async_wait_for_write(gc); goto again; } else { printf("Error in aio_write: ret %d errno %d\n",ret,errno);#ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("aio_write_no_copy: fd %d buffer %lx offset %lld size %d\n", ce->myaiocb.aio_fildes, (long)(ce->myaiocb.aio_buf), ce->myaiocb.aio_offset, ce->myaiocb.aio_nbytes);#else printf("aio_write_no_copy: fd %d buffer %lx offset %lld size %d\n", ce->myaiocb64.aio_fildes, (long)(ce->myaiocb64.aio_buf), ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes);#endif#else printf("aio_write_no_copy: fd %d buffer %lx offset %ld size %d\n", ce->myaiocb.aio_fildes, (long)(ce->myaiocb.aio_buf), ce->myaiocb.aio_offset, ce->myaiocb.aio_nbytes);#endif exit(182); } } else { return((ssize_t)size); }}void mbcopy(source, dest, len)char *source,*dest;size_t len;{ int i; for(i=0;i<len;i++) *dest++=*source++;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -