dets_v8.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,591 行 · 第 1/4 页

ERL
1,591
字号
	_ ->            {new, Skip - size(Bin), L}    end.%% -> {NewHead, ok} | throw({Head, Error})do_perform_save(H) ->    FL = dets_utils:get_freelists(H),    B = term_to_binary(FL),    Size = size(B),    ?DEBUGF("size of freelist = ~p~n", [Size]),    ?DEBUGF("head.m = ~p~n", [H#head.m]),    ?DEBUGF("head.no_objects = ~p~n", [H#head.no_objects]),    {ok, Pos} = dets_utils:position(H, eof),    H1 = H#head{freelists_p = Pos},    W1 = {?FREELIST_POS, <<Pos:32>>},    W2 = {Pos, [<<0:32, Size:32, ?FREE:32>>, B]},        W3 = {?D_POS, <<(H1#head.m):32, 	            (H1#head.next):32, 	            (H1#head.keypos):32,	            (H1#head.no_objects):32,		    (H1#head.n):32>>},    {ClosedProperly, ClosedProperlyNeedCompacitng} = 	case H1#head.hash_bif of	    hash ->		{?CLOSED_PROPERLY2, ?CLOSED_PROPERLY2_NEED_COMPACTING};	    phash ->		{?CLOSED_PROPERLY_NEW_HASH, 		 ?CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING}	end,    W4 = 	if 	    Size > 1000, Size > H1#head.no_objects ->		{?CLOSED_PROPERLY_POS, 		 <<ClosedProperlyNeedCompacitng:32>>};	    true ->		{?CLOSED_PROPERLY_POS, <<ClosedProperly:32>>}	end,    W5 = {?FILE_FORMAT_VERSION_POS, <<?FILE_FORMAT_VERSION:32>>},    {H2, ok} = dets_utils:pwrite(H1, [W1,W2,W3,W4,W5]),    {ok, Pos2} = dets_utils:position(H2, eof),    ?DEBUGF("Writing file size ~p, eof at ~p~n", [Pos2+4, Pos2]),    dets_utils:pwrite(H2, [{Pos2, <<(Pos2 + 4):32>>}]).%% -> [term()] | throw({Head, Error})slot_objs(H, Slot) when Slot >= H#head.next ->    '$end_of_table';slot_objs(H, Slot) ->    {_Pos, Chain} = chain(H, Slot),    collect_chain(H, Chain).collect_chain(_H, 0) -> [];collect_chain(H, Pos) ->    {Next, _Sz, Term} = prterm(H, Pos, ?ReadAhead),    [Term | collect_chain(H, Next)].db_hash(Key, Head) ->    H = h(Key, Head#head.hash_bif),    Hash = H rem Head#head.m,    if	Hash < Head#head.n ->	    H rem (Head#head.m2); % H rem (2 * m)	true ->	    Hash    end.h(I, phash) -> erlang:phash(I, ?BIG) - 1;h(I, HF) -> erlang:HF(I, ?BIG) - 1. %% stupid BIF has 1 counts.no_slots(_Head) ->    undefined.table_parameters(_Head) ->    undefined.%% Re-hashing a segment, starting with SlotStart.%%%% On the average, half of the objects of the chain are put into a new%% chain. If the slot of the old chain is i, then the slot of the new%% chain is i+m.%% Note that the insertion of objects into the new chain is simplified%% by the fact that the chains are not sorted on key, which means that%% each moved object can be inserted first in the new chain.%% (It is also a fact that the objects with the same key are not sorted.)%%%% -> {ok, Writes} | throw({Head, Error})re_hash(Head, SlotStart) ->    {SlotPos, _4} = slot_position(SlotStart),    {ok, Bin} = dets_utils:pread(Head, SlotPos, 4*?SEGSZ, 0),    {Read, Cs} = split_bin(SlotPos, Bin, [], []),    re_hash_read(Head, [], Read, Cs).split_bin(Pos, <<P:32, B/binary>>, R, Cs) ->    if	P =:= 0 ->	    split_bin(Pos+4, B, R, Cs);	true ->	    split_bin(Pos+4, B, [{P,?ReadAhead} | R], [[Pos] | Cs])    end;split_bin(_Pos, <<>>, R, Cs) ->    {R, Cs}.re_hash_read(Head, Cs, R, RCs) ->    {ok, Bins} = dets_utils:pread(R, Head),    re_hash_read(Head, R, RCs, Bins, Cs, [], []).re_hash_read(Head, [{Pos, Size} | Ps], [C | Cs], 	     [<<Next:32, Sz:32, _Status:32, Bin0/binary>> | Bins], 	     DoneCs, R, RCs) ->    case size(Bin0) of	BinSz when BinSz >= Sz ->	    case catch binary_to_term(Bin0) of		{'EXIT', _Error} ->		    throw(dets_utils:corrupt_reason(Head, bad_object));		Term ->		    Key = element(Head#head.keypos, Term),		    New = h(Key, Head#head.hash_bif) rem Head#head.m2,		    NC = case New >= Head#head.m of			     true -> [{Pos,New} | C];			     false -> [Pos | C]			 end,		    if			Next =:= 0 ->			    NDoneCs = [NC | DoneCs], 			    re_hash_read(Head, Ps, Cs, Bins, NDoneCs, R, RCs);			true ->			    NR = [{Next,?ReadAhead} | R],			    NRCs = [NC | RCs],			    re_hash_read(Head, Ps, Cs, Bins, DoneCs, NR, NRCs)		    end	    end;	BinSz when Size =:= BinSz+?OHDSZ ->	    NR = [{Pos, Sz+?OHDSZ} | R],	    re_hash_read(Head, Ps, Cs, Bins, DoneCs, NR, [C | RCs]);	_BinSz ->	    throw({Head, {error, {premature_eof, Head#head.filename}}})    end;re_hash_read(Head, [], [], [], Cs, [], []) ->    re_hash_traverse_chains(Cs, Head, [], [], []);re_hash_read(Head, [], [], [], Cs, R, RCs) ->    re_hash_read(Head, Cs, R, RCs).re_hash_traverse_chains([C | Cs], Head, Rs, Ns, Ws) ->    case re_hash_find_new(C, Rs, start, start) of	false ->	    re_hash_traverse_chains(Cs, Head, Rs, Ns, Ws);	{NRs, FirstNew, LastNew} -> 	    LastInNew = case C of			    [{_,_} | _] -> true;			    _ -> false			end,	    N = {FirstNew, LastNew, LastInNew},	    NWs = re_hash_link(C, start, start, start, Ws),	    re_hash_traverse_chains(Cs, Head, NRs, [N | Ns], NWs)    end;re_hash_traverse_chains([], Head, Rs, Ns, Ws) ->    {ok, Bins} = dets_utils:pread(Rs, Head),    {ok, insert_new(Rs, Bins, Ns, Ws)}.re_hash_find_new([{Pos,NewSlot} | C], R, start, start) ->    {SPos, _4} = slot_position(NewSlot),    re_hash_find_new(C, [{SPos,4} | R], Pos, Pos);re_hash_find_new([{Pos,_SPos} | C], R, _FirstNew, LastNew) ->    re_hash_find_new(C, R, Pos, LastNew);re_hash_find_new([_Pos | C], R, FirstNew, LastNew) ->    re_hash_find_new(C, R, FirstNew, LastNew);re_hash_find_new([], _R, start, start) ->    false;re_hash_find_new([], R, FirstNew, LastNew) ->    {R, FirstNew, LastNew}.re_hash_link([{Pos,_SPos} | C], LastOld, start, _LastInNew, Ws) ->    re_hash_link(C, LastOld, Pos, true, Ws);re_hash_link([{Pos,_SPos} | C], LastOld, LastNew, false, Ws) ->    re_hash_link(C, LastOld, Pos, true, [{Pos,<<LastNew:32>>} | Ws]);re_hash_link([{Pos,_SPos} | C], LastOld, _LastNew, LastInNew, Ws) ->    re_hash_link(C, LastOld, Pos, LastInNew, Ws);re_hash_link([Pos | C], start, LastNew, true, Ws) ->    re_hash_link(C, Pos, LastNew, false, [{Pos,<<0:32>>} | Ws]);re_hash_link([Pos | C], LastOld, LastNew, true, Ws) ->    re_hash_link(C, Pos, LastNew, false, [{Pos,<<LastOld:32>>} | Ws]);re_hash_link([Pos | C], _LastOld, LastNew, LastInNew, Ws) ->    re_hash_link(C, Pos, LastNew, LastInNew, Ws);re_hash_link([], _LastOld, _LastNew, _LastInNew, Ws) ->    Ws.insert_new([{NewSlotPos,_4} | Rs], [<<P:32>> = PB | Bins], [N | Ns], Ws) ->    {FirstNew, LastNew, LastInNew} = N,    Ws1 = case P of	      0 when LastInNew ->		  Ws;	      0 ->		  [{LastNew, <<0:32>>} | Ws];	      _ ->		  [{LastNew, PB} | Ws]	  end,    NWs = [{NewSlotPos, <<FirstNew:32>>} | Ws1],    insert_new(Rs, Bins, Ns, NWs);insert_new([], [], [], Ws) ->    Ws.%% When writing the cache, a 'work list' is first created:%%   WorkList = [{Key, {Delete,Lookup,[Inserted]}}]%%   Delete = keep | delete%%   Lookup = skip | lookup%%   Inserted = {object(), No}%%   No = integer()%% If No =< 0 then there will be -No instances of object() on the file%% when the cache has been written. If No > 0 then No instances of%% object() will be added to the file.%% If Delete has the value 'delete', then all objects with the key Key%% have been deleted. (This could be viewed as a shorthand for {Object,0}%% for each object Object on the file not mentioned in some Inserted.)%% If Lookup has the value 'lookup', all objects with the key Key will%% be returned.%%%% -> {NewHead, [LookedUpObject], pwrite_list()} | throw({NewHead, Error})write_cache(Head) ->    #head{cache = C, type = Type} = Head,    case dets_utils:is_empty_cache(C) of	true -> {Head, [], []};	false ->	    {NewC, _MaxInserts, PerKey} = dets_utils:reset_cache(C),	    %% NoInsertedKeys is an upper limit on the number of new keys.	    {WL, NoInsertedKeys} = make_wl(PerKey, Type),	    Head1 = Head#head{cache = NewC},	    case may_grow(Head1, NoInsertedKeys, once) of		{Head2, ok} ->		    eval_work_list(Head2, WL);		HeadError ->		    throw(HeadError)	    end    end.make_wl(PerKey, Type) ->    make_wl(PerKey, Type, [], 0).make_wl([{Key,L} | PerKey], Type, WL, Ins) ->    [Cs | I] = wl(L, Type),    make_wl(PerKey, Type, [{Key,Cs} | WL], Ins+I);make_wl([], _Type, WL, Ins) ->    {WL, Ins}.wl(L, Type) ->    wl(L, Type, keep, skip, 0, []).wl([{_Seq, delete_key} | Cs], Type, _Del, Lookup, _I, _Objs) ->    wl(Cs, Type, delete, Lookup, 0, []);wl([{_Seq, {delete_object, Object}} | Cs], Type, Del, Lookup, I, Objs) ->    NObjs = lists:keydelete(Object, 1, Objs),    wl(Cs, Type, Del, Lookup, I, [{Object,0} | NObjs]);wl([{_Seq, {insert, Object}} | Cs], Type, _Del, Lookup, _I, _Objs)                     when Type =:= set ->    wl(Cs, Type, delete, Lookup, 1, [{Object,-1}]);wl([{_Seq, {insert, Object}} | Cs], Type, Del, Lookup, _I, Objs) ->    NObjs = 	case lists:keysearch(Object, 1, Objs) of	    {value, {_, 0}} ->		lists:keyreplace(Object, 1, Objs, {Object,-1});	    {value, {_, _C}} when Type =:= bag -> % C =:= 1; C =:= -1		Objs;	    {value, {_, C}} when C < 0 -> % when Type =:= duplicate_bag		lists:keyreplace(Object, 1, Objs, {Object,C-1});	    {value, {_, C}} -> % when C > 0, Type =:= duplicate_bag		lists:keyreplace(Object, 1, Objs, {Object,C+1});	    false when Del =:= delete ->		[{Object, -1} | Objs];	    false ->		[{Object, 1} | Objs]	end,    wl(Cs, Type, Del, Lookup, 1, NObjs);wl([{_Seq, {lookup,_Pid}=Lookup} | Cs], Type, Del, _Lookup, I, Objs) ->    wl(Cs, Type, Del, Lookup, I, Objs);wl([], _Type, Del, Lookup, I, Objs) ->    [{Del, Lookup, Objs} | I].%% -> {NewHead, ok} | {NewHead, Error}may_grow(Head, _N, _How) when Head#head.fixed =/= false ->    {Head, ok};may_grow(#head{access = read}=Head, _N, _How) ->    {Head, ok};may_grow(Head, _N, _How) when Head#head.next >= ?MAXOBJS ->    {Head, ok};may_grow(Head, N, How) ->    Extra = lists:min([2*?SEGSZ, Head#head.no_objects + N - Head#head.next]),    case catch may_grow1(Head, Extra, How) of	{error, Reason} -> % alloc may throw error	    {Head, {error, Reason}};	Reply ->	    Reply    end.may_grow1(Head, Extra, many_times) when Extra > ?SEGSZ ->    Reply = grow(Head, 1, undefined),    self() ! ?DETS_CALL(self(), may_grow),    Reply;may_grow1(Head, Extra, _How) ->        grow(Head, Extra, undefined).%% -> {Head, ok} | throw({Head, Error})grow(Head, Extra, _SegZero) when Extra =< 0 ->    {Head, ok};grow(Head, Extra, undefined) ->    grow(Head, Extra, seg_zero());grow(Head, Extra, SegZero) ->    #head{n = N, next = Next, m = M} = Head,    SegNum = ?SLOT2SEG(Next),    {Head0, Ws1} = allocate_segment(Head, SegZero, SegNum),    {Head1, ok} = dets_utils:pwrite(Head0, Ws1),    %% If re_hash fails, segp_cache has been called, but it does not matter.    {ok, Ws2} = re_hash(Head1, N),    {Head2, ok} = dets_utils:pwrite(Head1, Ws2),    NewHead =	if 	    N + ?SEGSZ =:= M ->		Head2#head{n = 0, next = Next + ?SEGSZ, m = 2 * M, m2 = 4 * M};	    true ->		Head2#head{n = N + ?SEGSZ, next = Next + ?SEGSZ}	end,    grow(NewHead, Extra - ?SEGSZ, SegZero).seg_zero() ->    <<0:(4*?SEGSZ)/unit:8>>.find_object(Head, Object) ->    Key = element(Head#head.keypos, Object),    Slot = db_hash(Key, Head),    find_object(Head, Object, Slot).    find_object(H, _Obj, Slot) when Slot >= H#head.next ->    false;find_object(H, Obj, Slot) ->    {_Pos, Chain} = chain(H, Slot),    case catch find_obj(H, Obj, Chain) of	{ok, Pos} ->	    {ok, Pos};	_Else ->	    false    end.find_obj(H, Obj, Pos) when Pos > 0 ->    {Next, _Sz, Term} = prterm(H, Pos, ?ReadAhead),    if 	Term == Obj ->	    {ok, Pos};	true ->	    find_obj(H, Obj, Next)    end.%% Given, a slot, return the {Pos, Chain} in the file where the%% objects hashed to this slot reside. Pos is the position in the%% file where the chain pointer is written and Chain is the position%% in the file where the first object resides.chain(Head, Slot) ->    Pos = ?SEGADDR(?SLOT2SEG(Slot)),    Segment = get_segp(Pos),    FinalPos = Segment + (4 * ?REM2(Slot, ?SEGSZ)),    {ok, <<Chain:32>>} = dets_utils:pread(Head, FinalPos, 4, 0),    {FinalPos, Chain}.%%%%%% Cache routines depending on the dets file format.%%%%% -> {Head, [LookedUpObject], pwrite_list()} | throw({Head, Error})eval_work_list(Head, WorkLists) ->    SWLs = tag_with_slot(WorkLists, Head, []),    P1 = dets_utils:family(SWLs),     {PerSlot, SlotPositions} = remove_slot_tag(P1, [], []),    {ok, Bins} = dets_utils:pread(SlotPositions, Head),    first_object(PerSlot, SlotPositions, Bins, Head, [], [], [], []).tag_with_slot([{K,_} = WL | WLs], Head, L) ->    tag_with_slot(WLs, Head, [{db_hash(K, Head), WL} | L]);tag_with_slot([], _Head, L) ->    L.remove_slot_tag([{S,SWLs} | SSWLs], Ls, SPs) ->    remove_slot_tag(SSWLs, [SWLs | Ls], [slot_position(S) | SPs]);remove_slot_tag([], Ls, SPs) ->    {Ls, SPs}.%% The initial chain pointers and the first object in each chain are%% read "in parallel", that is, with one call to file:pread/2 (two%% calls altogether). The following chain objects are read one by%% one. This is a compromise: if the chains are long and threads are%% active, it would be faster to keep a state for each chain and read%% the objects of the chains in parallel, but the overhead would be%% quite substantial.first_object([WorkLists | SPs], [{P1,_4} | Ss], [<<P2:32>> | Bs], Head,	      ObjsToRead, ToRead, Ls, LU) when P2 =:= 0 ->    L0 = [{old,P1}],    {L, NLU} = eval_slot(Head, ?ReadAhead, P2, WorkLists, L0, LU),    first_object(SPs, Ss, Bs, Head, ObjsToRead, ToRead, [L | Ls], NLU);first_object([WorkLists | SPs], [{P1,_4} | Ss], [<<P2:32>> | Bs], Head, 

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?