📄 sortexternal.pm
字号:
package KinoSearch::Util::SortExternal;use strict;use warnings;use KinoSearch::Util::ToolSet;use base qw( KinoSearch::Util::CClass );BEGIN { __PACKAGE__->init_instance_vars( # constructor args invindex => undef, seg_name => undef, mem_threshold => 2**24, );}our %instance_vars;sub new { my $class = shift; verify_args( \%instance_vars, @_ ); my %args = ( %instance_vars, @_ ); $class = ref($class) || $class; my $outstream = $args{invindex}->open_outstream("$args{seg_name}.srt"); return _new( $class, $outstream, @args{qw( invindex seg_name mem_threshold )} );}# Prepare to start fetching sorted results.sub sort_all { my $self = shift; # deal with any items in the cache right now if ( $self->_get_num_runs == 0 ) { # if we've never exceeded mem_threshold, sort in-memory $self->_sort_cache; } else { # create a run from whatever's in the cache right now $self->_sort_run; } # done adding elements, so close file and reopen as an instream $self->_get_outstream->close; my $filename = $self->_get_seg_name . ".srt"; my $instream = $self->_get_invindex()->open_instream($filename); $self->_set_instream($instream); # allow fetching now that we're set up $self->_enable_fetch;}sub close { shift->_get_instream()->close }1;__END____XS__MODULE = KinoSearch PACKAGE = KinoSearch::Util::SortExternalvoid_new(class, outstream_sv, invindex_sv, seg_name_sv, mem_threshold) char *class; SV *outstream_sv; SV *invindex_sv; SV *seg_name_sv; I32 mem_threshold;PREINIT: SortExternal *sortex;PPCODE: sortex = Kino_SortEx_new(outstream_sv, invindex_sv, seg_name_sv, mem_threshold); ST(0) = sv_newmortal(); sv_setref_pv( ST(0), class, (void*)sortex ); XSRETURN(1);=for commentAdd one or more items to the sort pool.=cutvoidfeed(sortex, ...) SortExternal *sortex;PREINIT: I32 i;PPCODE: for (i = 1; i < items; i++) { SV const * item_sv = ST(i); if (!SvPOK(item_sv)) continue; sortex->feed(sortex, SvPVX(item_sv), SvCUR(item_sv)); }=for commentFetch the next sorted item from the sort pool. sort_all must be called first.=cutSV*fetch(sortex) SortExternal *sortex;PREINIT: ByteBuf *bb;CODE: bb = sortex->fetch(sortex); if (bb == NULL) { RETVAL = newSV(0); } else { RETVAL = newSVpvn(bb->ptr, bb->size); Kino_BB_destroy(bb); }OUTPUT: RETVAL=for commentSort all items currently in memory.=cutvoid_sort_cache(sortex) SortExternal *sortex;PPCODE: Kino_SortEx_sort_cache(sortex);=for commentSort everything in memory and write the sorted elements to disk, creating aSortExRun C object.=cutvoid_sort_run(sortex); SortExternal *sortex;PPCODE: Kino_SortEx_sort_run(sortex);=for commentTurn on fetching.=cutvoid_enable_fetch(sortex) SortExternal *sortex;PPCODE: Kino_SortEx_enable_fetch(sortex); SV*_set_or_get(sortex, ...) SortExternal *sortex;ALIAS: _set_outstream = 1 _get_outstream = 2 _set_instream = 3 _get_instream = 4 _set_num_runs = 5 _get_num_runs = 6 _set_invindex = 7 _get_invindex = 8 _set_seg_name = 9 _get_seg_name = 10CODE:{ KINO_START_SET_OR_GET_SWITCH case 1: SvREFCNT_dec(sortex->outstream_sv); sortex->outstream_sv = newSVsv( ST(1) ); Kino_extract_struct(sortex->outstream_sv, sortex->outstream, OutStream*, "KinoSearch::Store::OutStream"); /* fall through */ case 2: RETVAL = newSVsv(sortex->outstream_sv); break; case 3: SvREFCNT_dec(sortex->instream_sv); sortex->instream_sv = newSVsv( ST(1) ); Kino_extract_struct(sortex->instream_sv, sortex->instream, InStream*, "KinoSearch::Store::InStream"); /* fall through */ case 4: RETVAL = newSVsv(sortex->instream_sv); break; case 5: Kino_confess("can't set num_runs"); /* fall through */ case 6: RETVAL = newSViv(sortex->num_runs); break; case 7: Kino_confess("can't set_invindex"); /* fall through */ case 8: RETVAL = newSVsv(sortex->invindex_sv); break; case 9: Kino_confess("can't set_seg_name"); /* fall through */ case 10: RETVAL = newSVsv(sortex->seg_name_sv); break; KINO_END_SET_OR_GET_SWITCH}OUTPUT: RETVALvoidDESTROY(sortex) SortExternal *sortex;PPCODE: Kino_SortEx_destroy(sortex);__H__#ifndef H_KINOSEARCH_UTIL_SORT_EXTERNAL#define H_KINOSEARCH_UTIL_SORT_EXTERNAL 1#include "EXTERN.h"#include "perl.h"#include "XSUB.h"#include "KinoSearchStoreInStream.h"#include "KinoSearchStoreOutStream.h"#include "KinoSearchUtilByteBuf.h"#include "KinoSearchUtilCClass.h"#include "KinoSearchUtilMemManager.h"typedef struct sortexrun { double start; double file_pos; double end; ByteBuf **cache; I32 cache_cap; I32 cache_elems; I32 cache_pos; I32 slice_size;} SortExRun;typedef struct sortexternal { ByteBuf **cache; /* item cache, both incoming and outgoing */ I32 cache_cap; /* allocated limit for cache */ I32 cache_elems; /* number of elems in cache */ I32 cache_pos; /* index of current element in cache */ ByteBuf **scratch; /* memory for use by mergesort */ I32 scratch_cap; /* allocated limit for scratch */ I32 mem_threshold; /* bytes of mem allowed for cache */ I32 cache_bytes; /* bytes of mem occupied by cache */ I32 run_cache_limit; /* bytes of mem allowed each run cache */ SortExRun **runs; I32 num_runs; SV *outstream_sv; OutStream *outstream; SV *instream_sv; InStream *instream; SV *invindex_sv; SV *seg_name_sv; void (*feed) (struct sortexternal*, char*, I32); ByteBuf* (*fetch)(struct sortexternal*);} SortExternal;SortExternal* Kino_SortEx_new(SV*, SV*, SV*, I32);void Kino_SortEx_feed(SortExternal*, char*, I32);ByteBuf* Kino_SortEx_fetch(SortExternal*);ByteBuf* Kino_SortEx_fetch_death(SortExternal*);void Kino_SortEx_enable_fetch(SortExternal*);void Kino_SortEx_sort_cache(SortExternal*);void Kino_SortEx_sort_run(SortExternal*);void Kino_SortEx_destroy(SortExternal*);#endif /* include guard */__C__#include "KinoSearchUtilSortExternal.h"static SortExRun* Kino_SortEx_new_run(double, double);static void Kino_SortEx_grow_bufbuf(ByteBuf***, I32, I32);static I32 Kino_SortEx_refill_run(SortExternal*, SortExRun*);static void Kino_SortEx_refill_cache(SortExternal*);static void Kino_SortEx_merge_runs(SortExternal*);static ByteBuf* Kino_SortEx_find_endpost(SortExternal*);static I32 Kino_SortEx_define_slice(SortExRun*, ByteBuf*);static void Kino_SortEx_mergesort(ByteBuf**, ByteBuf**, I32);static void Kino_SortEx_msort(ByteBuf**, ByteBuf**, U32, U32);static void Kino_SortEx_merge(ByteBuf**, U32, ByteBuf**, U32, ByteBuf**);static void Kino_SortEx_clear_cache(SortExternal*);static void Kino_SortEx_clear_run_cache(SortExRun*);static void Kino_SortEx_destroy_run(SortExRun*);#define KINO_PER_ITEM_OVERHEAD (sizeof(ByteBuf) + sizeof(ByteBuf*))SortExternal*Kino_SortEx_new(SV *outstream_sv, SV *invindex_sv, SV *seg_name_sv, I32 mem_threshold) { SortExternal *sortex; /* allocate */ Kino_New(0, sortex, 1, SortExternal); Kino_New(0, sortex->cache, 100, ByteBuf*); Kino_New(0, sortex->runs, 1, SortExRun*); /* init */ sortex->scratch = NULL; sortex->scratch_cap = 0; sortex->cache_cap = 100; sortex->cache_elems = 0; sortex->cache_pos = 0; sortex->cache_bytes = 0; sortex->num_runs = 0; sortex->instream_sv = &PL_sv_undef; sortex->feed = Kino_SortEx_feed; sortex->fetch = Kino_SortEx_fetch_death; /* assign */ sortex->outstream_sv = newSVsv(outstream_sv); Kino_extract_struct(outstream_sv, sortex->outstream, OutStream*, "KinoSearch::Store::OutStream"); sortex->invindex_sv = newSVsv(invindex_sv); sortex->seg_name_sv = newSVsv(seg_name_sv); sortex->mem_threshold = mem_threshold; /* derive */ sortex->run_cache_limit = mem_threshold / 2; return sortex;}/* Create a new SortExRun object */static SortExRun*Kino_SortEx_new_run(double start, double end) { SortExRun *run; /* allocate */ Kino_New(0, run, 1, SortExRun); Kino_New(0, run->cache, 100, ByteBuf*); /* init */ run->cache_cap = 100; run->cache_elems = 0; run->cache_pos = 0; /* assign */ run->start = start; run->file_pos = start; run->end = end; return run;}voidKino_SortEx_feed(SortExternal* sortex, char* ptr, I32 len) { /* add room for more cache elements if needed */ if (sortex->cache_elems == sortex->cache_cap) { /* add 100, plus 10% of the current capacity */ sortex->cache_cap = sortex->cache_cap + 100 + (sortex->cache_cap / 8); Kino_Renew(sortex->cache, sortex->cache_cap, ByteBuf*); } sortex->cache[ sortex->cache_elems ] = Kino_BB_new_string(ptr, len); sortex->cache_elems++; /* track memory consumed */ sortex->cache_bytes += KINO_PER_ITEM_OVERHEAD; sortex->cache_bytes += len + 1; /* check if it's time to flush the cache */ if (sortex->cache_bytes >= sortex->mem_threshold) Kino_SortEx_sort_run(sortex);}ByteBuf*Kino_SortEx_fetch(SortExternal *sortex) { if (sortex->cache_pos >= sortex->cache_elems) Kino_SortEx_refill_cache(sortex); if (sortex->cache_elems > 0) { return sortex->cache[ sortex->cache_pos++ ]; } else { return NULL; }}ByteBuf*Kino_SortEx_fetch_death(SortExternal *sortex) { ByteBuf *bb = NULL; Kino_confess("can't call fetch before sort_all"); return bb;}voidKino_SortEx_enable_fetch(SortExternal *sortex) { sortex->fetch = Kino_SortEx_fetch;}/* Allocate more memory to an array of pointers to pointers to ByteBufs, if * the current allocation isn't sufficient. */static voidKino_SortEx_grow_bufbuf(ByteBuf ***bb_buf, I32 current, I32 desired) { if (current < desired) Kino_Renew(*bb_buf, desired, ByteBuf*);}/* Sort the main cache. */voidKino_SortEx_sort_cache(SortExternal *sortex) { Kino_SortEx_grow_bufbuf(&sortex->scratch, sortex->scratch_cap, sortex->cache_elems); Kino_SortEx_mergesort(sortex->cache, sortex->scratch, sortex->cache_elems);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -