dets_v8.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,591 行 · 第 1/4 页

ERL
1,591
字号
	      next = FH#fileheader.next,	      fptr = Fd,	      no_objects= FH#fileheader.no_objects,	      n = FH#fileheader.n,	      type = FH#fileheader.type,	      update_mode = saved,	      auto_save = infinity,             % not saved on file	      fixed = false,			% not saved on file	      freelists_p = FH#fileheader.freelist,	      hash_bif = HashAlg,	      keypos = FH#fileheader.keypos,	      min_no_slots = FH#fileheader.min_no_slots,	      max_no_slots = FH#fileheader.max_no_slots,	      version = ?FILE_FORMAT_VERSION,	      mod = ?MODULE,	      bump = ?BUMP,	      base = ?BASE},	    {ok, H, ExtraInfo};	Error ->	    Error    end.cache_segps(Fd, FileName, M) ->    NSegs = no_segs(M),    {ok, Bin} = dets_utils:pread_close(Fd, FileName, ?HEADSZ, 4 * NSegs),    Fun = fun(S, P) -> segp_cache(P, S), P+4 end,    lists:foldl(Fun, ?HEADSZ, bin2ints(Bin)).no_segs(NoSlots) ->    ?SLOT2SEG(NoSlots - 1) + 1.bin2ints(<<Int:32, B/binary>>) ->    [Int | bin2ints(B)];bin2ints(<<>>) ->    [].%%%%%% Repair, conversion and initialization of a dets file.%%%bulk_input(Head, InitFun, Cntrs) ->    bulk_input(Head, InitFun, Cntrs, make_ref()).bulk_input(Head, InitFun, Cntrs, Ref) ->    fun(close) ->	    ok;       (read) ->	    case catch {Ref, InitFun(read)} of		{Ref, end_of_input} ->		    end_of_input;		{Ref, {L0, NewInitFun}} when is_list(L0),                                              is_function(NewInitFun) ->		    Kp = Head#head.keypos,		    case catch bulk_objects(L0, Head, Cntrs, Kp, []) of			{'EXIT', _Error} ->			    _ = (catch NewInitFun(close)),			    {error, invalid_objects_list};			L ->			    {L, bulk_input(Head, NewInitFun, Cntrs, Ref)}		    end;		{Ref, Value} ->		    {error, {init_fun, Value}};		Error ->		    throw({thrown, Error})	    end    end.bulk_objects([T | Ts], Head, Cntrs, Kp, L) ->    BT = term_to_binary(T),    Sz = size(BT),    LogSz = sz2pos(Sz+?OHDSZ),    count_object(Cntrs, LogSz),    Key = element(Kp, T),    bulk_objects(Ts, Head, Cntrs, Kp, [make_object(Head, Key, LogSz, BT) | L]);bulk_objects([], _Head, _Cntrs, _Kp, L) ->    L.-define(FSCK_SEGMENT, 10000).-define(DCT(D, CT), [D | CT]).-define(VNEW(N, E), erlang:make_tuple(N, E)).-define(VSET(I, V, E), setelement(I, V, E)).-define(VGET(I, V), element(I, V)).%% OldVersion not used, assuming later versions have been converted already.output_objs(OldVersion, Head, SlotNumbers, Cntrs) ->    fun(close) ->	    {ok, 0, Head};       ([]) ->	    output_objs(OldVersion, Head, SlotNumbers, Cntrs);       (L) ->	    %% Descending sizes.	    Count = lists:sort(ets:tab2list(Cntrs)),	    RCount = lists:reverse(Count),	    NoObjects = lists:foldl(fun({_Sz,No}, A) -> A + No end, 0, Count),	    {_, MinSlots, _} = SlotNumbers,	    if		%% Using number of objects for bags and duplicate bags		%% is not ideal; number of (unique) keys should be		%% used instead. The effect is that there will be more		%% segments than "necessary".		MinSlots =/= bulk_init,		abs(?SLOT2SEG(NoObjects) - ?SLOT2SEG(MinSlots)) > 5,		(NoObjects < ?MAXOBJS) ->		    {try_again, NoObjects};		true ->                    Head1 = Head#head{no_objects = NoObjects},		    SegSz = actual_seg_size(),		    {_, End, _} = dets_utils:alloc(Head, SegSz-1),		    %% Now {LogSize,NoObjects} in Cntrs is replaced by		    %% {LogSize,Position,{FileName,FileDescriptor},NoObjects}.		    {Head2, CT} = allocate_all_objects(Head1, RCount, Cntrs),		    [E | Es] = bin2term(L, []),		    {NE, Acc, DCT1} = 			output_slots(E, Es, [E], Head2, ?DCT(0, CT)),		    NDCT = write_all_sizes(DCT1, Cntrs),		    Max = ets:info(Cntrs, size),                    output_objs2(NE, Acc, Head2, Cntrs, NDCT, End, Max,Max)	    end    end.output_objs2(E, Acc, Head, Cntrs, DCT, End, 0, MaxNoChunks) ->    NDCT = write_all_sizes(DCT, Cntrs),    output_objs2(E, Acc, Head, Cntrs, NDCT, End, MaxNoChunks, MaxNoChunks);output_objs2(E, Acc, Head, Cntrs, DCT, End, ChunkI, MaxNoChunks) ->    fun(close) ->	    DCT1 = output_slot(Acc, Head, DCT),	    NDCT = write_all_sizes(DCT1, Cntrs),	    ?DCT(NoDups, CT) = NDCT,	    [SegAddr | []] = ?VGET(size(CT), CT),            FinalZ = End - SegAddr,            [{?FSCK_SEGMENT, _, {FileName, Fd}, _}] = 		ets:lookup(Cntrs, ?FSCK_SEGMENT),	    ok = dets_utils:fwrite(Fd, FileName, 				   dets_utils:make_zeros(FinalZ)),            NewHead = Head#head{no_objects = Head#head.no_objects - NoDups},	    {ok, NoDups, NewHead};       (L) ->	    Es = bin2term(L, []),	    {NE, NAcc, NDCT} = output_slots(E, Es, Acc, Head, DCT),	    output_objs2(NE, NAcc, Head, Cntrs, NDCT, End, 			 ChunkI-1, MaxNoChunks)    end.%% By allocating bigger objects before smaller ones, holes in the%% buddy system memory map are avoided. Unfortunately, the segments%% are always allocated first, so if there are objects bigger than a%% segment, there is a hole to handle. (Haven't considered placing the%% segments among other objects of the same size.)allocate_all_objects(Head, Count, Cntrs) ->    SegSize = actual_seg_size(),    {Head1, HSz, HN, HA} = alloc_hole(Count, Head, SegSize),    {Max, _} = hd(Count),    CT = ?VNEW(Max+1, not_used),    {Head2, NCT} = allocate_all(Head1, Count, Cntrs, CT),    Head3 = free_hole(Head2, HSz, HN, HA),    {Head3, NCT}.alloc_hole([{LSize,_} | _], Head, SegSz) when ?POW(LSize-1) > SegSz ->    {_, SegAddr, _} = dets_utils:alloc(Head, SegSz-1),    Size = ?POW(LSize-1)-1,    {_, Addr, _} = dets_utils:alloc(Head, Size),    N = (Addr - SegAddr) div SegSz,    Head1 = dets_utils:alloc_many(Head, SegSz, N, SegAddr),    {Head1, SegSz-1, N, SegAddr};alloc_hole(_Count, Head, _SegSz) ->    {Head, 0, 0, 0}.free_hole(Head, _Size, 0, _Addr) ->    Head;free_hole(Head, Size, N, Addr) ->    {Head1, _} = dets_utils:free(Head, Addr, Size),    free_hole(Head1, Size, N-1, Addr+Size+1).%% One (temporary) file for each buddy size, write all objects of that%% size to the file.allocate_all(Head, [{LSize,NoObjects} | Count], Cntrs, CT) ->    Size = ?POW(LSize-1)-1,    {_Head, Addr, _} = dets_utils:alloc(Head, Size),    NewHead = dets_utils:alloc_many(Head, Size+1, NoObjects, Addr),    {FileName, Fd} = temp_file(Head, LSize),    true = ets:insert(Cntrs, {LSize, Addr, {FileName, Fd}, NoObjects}),    NCT = ?VSET(LSize, CT, [Addr | []]),    allocate_all(NewHead, Count, Cntrs, NCT);allocate_all(Head, [], Cntrs, CT) ->    %% Note that space for the segments has been allocated already.    %% And one file for the segments...    {FileName, Fd} = temp_file(Head, ?FSCK_SEGMENT),    Addr = ?SEGADDR(?SEGARRSZ),    true = ets:insert(Cntrs, {?FSCK_SEGMENT, Addr, {FileName, Fd}, 0}),    NCT = ?VSET(size(CT), CT, [Addr | []]),    {Head, NCT}.temp_file(Head, N) ->    TmpName = lists:concat([Head#head.filename, '.', N]),    {ok, Fd} = dets_utils:open(TmpName, [raw, binary, write]),    {TmpName, Fd}.bin2term([<<Slot:32, LogSize:8, BinTerm/binary>> | BTs], L) ->    bin2term(BTs, [{Slot, LogSize, BinTerm} | L]);bin2term([], L) ->    lists:reverse(L).write_all_sizes(?DCT(D, CT), Cntrs) ->    ?DCT(D, write_sizes(1, size(CT), CT, Cntrs)).write_sizes(Sz, Sz, CT, Cntrs) ->    write_size(Sz, ?FSCK_SEGMENT, CT, Cntrs);write_sizes(Sz, MaxSz, CT, Cntrs) ->    NCT = write_size(Sz, Sz, CT, Cntrs),    write_sizes(Sz+1, MaxSz, NCT, Cntrs).write_size(Sz, I, CT, Cntrs) ->    case ?VGET(Sz, CT) of	not_used ->	    CT;	[Addr | L] ->	    {FileName, Fd} = ets:lookup_element(Cntrs, I, 3),	    case file:write(Fd, lists:reverse(L)) of		ok ->		    ?VSET(Sz, CT, [Addr | []]);		Error ->		    dets_utils:file_error(FileName, Error)	    end    end.output_slots(E, [E1 | Es], Acc, Head, DCT)                        when element(1, E) =:= element(1, E1) ->    output_slots(E1, Es, [E1 | Acc], Head, DCT);output_slots(_E, [E | L], Acc, Head, DCT) ->    NDCT = output_slot(Acc, Head, DCT),    output_slots(E, L, [E], Head, NDCT);output_slots(E, [], Acc, _Head, DCT) ->    {E, Acc, DCT}.output_slot([E], _Head, ?DCT(D, CT)) ->    ?DCT(D, output_slot([{foo, E}], 0, foo, CT));output_slot(Es0, Head, ?DCT(D, CT)) ->    Kp = Head#head.keypos,    Fun = fun({_Slot, _LSize, BinTerm} = E) -> 		  Key = element(Kp, binary_to_term(BinTerm)),		  {Key, E}	  end,    Es = lists:map(Fun, Es0),    NEs = case Head#head.type of	      set ->		  [{Key0,_} = E | L0] = lists:sort(Es),		  choose_one(lists:sort(L0), Key0, [E]);	      bag -> 		  lists:usort(Es);	      duplicate_bag -> 		  lists:sort(Es)	  end,    Dups = D + length(Es) - length(NEs),    ?DCT(Dups, output_slot(NEs, 0, foo, CT)).choose_one([{Key,_} | Es], Key, L) ->    choose_one(Es, Key, L);choose_one([{Key,_} = E | Es], _Key, L) ->    choose_one(Es, Key, [E | L]);choose_one([], _Key, L) ->    L.output_slot([E | Es], Next, _Slot, CT) ->    {_Key, {Slot, LSize, BinTerm}} = E,    Size = size(BinTerm),    Size2 = ?POW(LSize-1),    Pad = <<0:(Size2-Size-?OHDSZ)/unit:8>>,    BinObject = [<<Next:32, Size:32, ?ACTIVE:32>>, BinTerm | Pad],    [Addr | L] = ?VGET(LSize, CT),    NCT = ?VSET(LSize, CT, [Addr+Size2 | [BinObject | L]]),    output_slot(Es, Addr, Slot, NCT);output_slot([], Next, Slot, CT) ->    I = size(CT),    [Addr | L] = ?VGET(I, CT),    {Pos, _} = slot_position(Slot),    NoZeros = Pos - Addr,    BinObject = if 		    NoZeros > 100 ->			[dets_utils:make_zeros(NoZeros) | <<Next:32>>];		    true ->			<<0:NoZeros/unit:8,Next:32>>   	        end,    Size = NoZeros+4,    ?VSET(I, CT, [Addr+Size | [BinObject | L]]).%% Does not close Fd.fsck_input(Head, Fd, Cntrs, _FileHeader) ->    %% The file is not compressed, so the object size cannot exceed    %% the filesize, for all objects.    MaxSz = case file:position(Fd, eof) of                {ok, Pos} ->                    Pos;                _ ->                    (1 bsl 32) - 1            end,    State0 = fsck_read(?BASE, Fd, []),    fsck_input1(Head, State0, Fd, MaxSz, Cntrs).fsck_input1(Head, State, Fd, MaxSz, Cntrs) ->    fun(close) ->	    ok;       (read) ->	    case State of		done ->		    end_of_input;		{done, L} ->		    R = count_input(Cntrs, L, []),		    {R, fsck_input1(Head, done, Fd, MaxSz, Cntrs)};		{cont, L, Bin, Pos} ->		    R = count_input(Cntrs, L, []),                    FR = fsck_objs(Bin, Head#head.keypos, Head, []),		    NewState = fsck_read(FR, Pos, Fd, MaxSz, Head),		    {R, fsck_input1(Head, NewState, Fd, MaxSz, Cntrs)}	    end    end.%% The ets table Cntrs is used for counting objects per size.count_input(Cntrs, [[LogSz | B] | Ts], L) ->    count_object(Cntrs, LogSz),    count_input(Cntrs, Ts, [B | L]);count_input(_Cntrs, [], L) ->    L.count_object(Cntrs, LogSz) ->    case catch ets:update_counter(Cntrs, LogSz, 1) of	N when is_integer(N) -> ok;	_Badarg -> true = ets:insert(Cntrs, {LogSz, 1})    end.fsck_read(Pos, F, L) ->    case file:position(F, Pos) of	{ok, _} ->	    read_more_bytes(<<>>, 0, Pos, F, L);	_Error ->	    {done, L}    end.fsck_read({more, Bin, Sz, L}, Pos, F, MaxSz, Head) when Sz > MaxSz ->    FR = skip_bytes(Bin, ?BUMP, Head#head.keypos, Head, L),    fsck_read(FR, Pos, F, MaxSz, Head);fsck_read({more, Bin, Sz, L}, Pos, F, _MaxSz, _Head) ->    read_more_bytes(Bin, Sz, Pos, F, L);fsck_read({new, Skip, L}, Pos, F, _MaxSz, _Head) ->    NewPos = Pos + Skip,    fsck_read(NewPos, F, L).read_more_bytes(B, Min, Pos, F, L) ->    Max = if 	      Min < ?CHUNK_SIZE -> ?CHUNK_SIZE; 	      true -> Min 	  end,    case dets_utils:read_n(F, Max) of	eof ->	    {done, L};	Bin ->	    NewPos = Pos + size(Bin),	    {cont, L, list_to_binary([B, Bin]), NewPos}    end.fsck_objs(Bin = <<_N:32, Sz:32, Status:32, Tail/binary>>, Kp, Head, L) ->    if 	Status =:= ?ACTIVE ->	    case Tail of		<<BinTerm:Sz/binary, Tail2/binary>> ->		    case catch element(Kp, binary_to_term(BinTerm)) of			{'EXIT', _} ->			    skip_bytes(Bin, ?BUMP, Kp, Head, L);			Key ->			    LogSz = sz2pos(Sz+?OHDSZ),			    Obj = make_object(Head, Key, LogSz, BinTerm),			    NL = [[LogSz | Obj] | L],			    Skip = ?POW(LogSz-1) - Sz - ?OHDSZ,			    skip_bytes(Tail2, Skip, Kp, Head, NL)		    end;		_ ->                    {more, Bin, Sz, L}	    end;	true -> 	    skip_bytes(Bin, ?BUMP, Kp, Head, L)    end;fsck_objs(Bin, _Kp, _Head, L) ->    {more, Bin, 0, L}.    %% Version 8 has to know about version 9.make_object(Head, Key, _LogSz, BT) when Head#head.version =:= 9 ->    Slot = dets_v9:db_hash(Key, Head),    <<Slot:32, BT/binary>>;make_object(Head, Key, LogSz, BT) ->    Slot = db_hash(Key, Head),    <<Slot:32, LogSz:8, BT/binary>>.%% Inlined.skip_bytes(Bin, Skip, Kp, Head, L) ->    case Bin of	<<_:Skip/binary, Tail/binary>> ->	    fsck_objs(Tail, Kp, Head, L);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?