dets_v8.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,591 行 · 第 1/4 页
ERL
1,591 行
_ -> {new, Skip - size(Bin), L} end.%% -> {NewHead, ok} | throw({Head, Error})do_perform_save(H) -> FL = dets_utils:get_freelists(H), B = term_to_binary(FL), Size = size(B), ?DEBUGF("size of freelist = ~p~n", [Size]), ?DEBUGF("head.m = ~p~n", [H#head.m]), ?DEBUGF("head.no_objects = ~p~n", [H#head.no_objects]), {ok, Pos} = dets_utils:position(H, eof), H1 = H#head{freelists_p = Pos}, W1 = {?FREELIST_POS, <<Pos:32>>}, W2 = {Pos, [<<0:32, Size:32, ?FREE:32>>, B]}, W3 = {?D_POS, <<(H1#head.m):32, (H1#head.next):32, (H1#head.keypos):32, (H1#head.no_objects):32, (H1#head.n):32>>}, {ClosedProperly, ClosedProperlyNeedCompacitng} = case H1#head.hash_bif of hash -> {?CLOSED_PROPERLY2, ?CLOSED_PROPERLY2_NEED_COMPACTING}; phash -> {?CLOSED_PROPERLY_NEW_HASH, ?CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING} end, W4 = if Size > 1000, Size > H1#head.no_objects -> {?CLOSED_PROPERLY_POS, <<ClosedProperlyNeedCompacitng:32>>}; true -> {?CLOSED_PROPERLY_POS, <<ClosedProperly:32>>} end, W5 = {?FILE_FORMAT_VERSION_POS, <<?FILE_FORMAT_VERSION:32>>}, {H2, ok} = dets_utils:pwrite(H1, [W1,W2,W3,W4,W5]), {ok, Pos2} = dets_utils:position(H2, eof), ?DEBUGF("Writing file size ~p, eof at ~p~n", [Pos2+4, Pos2]), dets_utils:pwrite(H2, [{Pos2, <<(Pos2 + 4):32>>}]).%% -> [term()] | throw({Head, Error})slot_objs(H, Slot) when Slot >= H#head.next -> '$end_of_table';slot_objs(H, Slot) -> {_Pos, Chain} = chain(H, Slot), collect_chain(H, Chain).collect_chain(_H, 0) -> [];collect_chain(H, Pos) -> {Next, _Sz, Term} = prterm(H, Pos, ?ReadAhead), [Term | collect_chain(H, Next)].db_hash(Key, Head) -> H = h(Key, Head#head.hash_bif), Hash = H rem Head#head.m, if Hash < Head#head.n -> H rem (Head#head.m2); % H rem (2 * m) true -> Hash end.h(I, phash) -> erlang:phash(I, ?BIG) - 1;h(I, HF) -> erlang:HF(I, ?BIG) - 1. %% stupid BIF has 1 counts.no_slots(_Head) -> undefined.table_parameters(_Head) -> undefined.%% Re-hashing a segment, starting with SlotStart.%%%% On the average, half of the objects of the chain are put into a new%% chain. If the slot of the old chain is i, then the slot of the new%% chain is i+m.%% Note that the insertion of objects into the new chain is simplified%% by the fact that the chains are not sorted on key, which means that%% each moved object can be inserted first in the new chain.%% (It is also a fact that the objects with the same key are not sorted.)%%%% -> {ok, Writes} | throw({Head, Error})re_hash(Head, SlotStart) -> {SlotPos, _4} = slot_position(SlotStart), {ok, Bin} = dets_utils:pread(Head, SlotPos, 4*?SEGSZ, 0), {Read, Cs} = split_bin(SlotPos, Bin, [], []), re_hash_read(Head, [], Read, Cs).split_bin(Pos, <<P:32, B/binary>>, R, Cs) -> if P =:= 0 -> split_bin(Pos+4, B, R, Cs); true -> split_bin(Pos+4, B, [{P,?ReadAhead} | R], [[Pos] | Cs]) end;split_bin(_Pos, <<>>, R, Cs) -> {R, Cs}.re_hash_read(Head, Cs, R, RCs) -> {ok, Bins} = dets_utils:pread(R, Head), re_hash_read(Head, R, RCs, Bins, Cs, [], []).re_hash_read(Head, [{Pos, Size} | Ps], [C | Cs], [<<Next:32, Sz:32, _Status:32, Bin0/binary>> | Bins], DoneCs, R, RCs) -> case size(Bin0) of BinSz when BinSz >= Sz -> case catch binary_to_term(Bin0) of {'EXIT', _Error} -> throw(dets_utils:corrupt_reason(Head, bad_object)); Term -> Key = element(Head#head.keypos, Term), New = h(Key, Head#head.hash_bif) rem Head#head.m2, NC = case New >= Head#head.m of true -> [{Pos,New} | C]; false -> [Pos | C] end, if Next =:= 0 -> NDoneCs = [NC | DoneCs], re_hash_read(Head, Ps, Cs, Bins, NDoneCs, R, RCs); true -> NR = [{Next,?ReadAhead} | R], NRCs = [NC | RCs], re_hash_read(Head, Ps, Cs, Bins, DoneCs, NR, NRCs) end end; BinSz when Size =:= BinSz+?OHDSZ -> NR = [{Pos, Sz+?OHDSZ} | R], re_hash_read(Head, Ps, Cs, Bins, DoneCs, NR, [C | RCs]); _BinSz -> throw({Head, {error, {premature_eof, Head#head.filename}}}) end;re_hash_read(Head, [], [], [], Cs, [], []) -> re_hash_traverse_chains(Cs, Head, [], [], []);re_hash_read(Head, [], [], [], Cs, R, RCs) -> re_hash_read(Head, Cs, R, RCs).re_hash_traverse_chains([C | Cs], Head, Rs, Ns, Ws) -> case re_hash_find_new(C, Rs, start, start) of false -> re_hash_traverse_chains(Cs, Head, Rs, Ns, Ws); {NRs, FirstNew, LastNew} -> LastInNew = case C of [{_,_} | _] -> true; _ -> false end, N = {FirstNew, LastNew, LastInNew}, NWs = re_hash_link(C, start, start, start, Ws), re_hash_traverse_chains(Cs, Head, NRs, [N | Ns], NWs) end;re_hash_traverse_chains([], Head, Rs, Ns, Ws) -> {ok, Bins} = dets_utils:pread(Rs, Head), {ok, insert_new(Rs, Bins, Ns, Ws)}.re_hash_find_new([{Pos,NewSlot} | C], R, start, start) -> {SPos, _4} = slot_position(NewSlot), re_hash_find_new(C, [{SPos,4} | R], Pos, Pos);re_hash_find_new([{Pos,_SPos} | C], R, _FirstNew, LastNew) -> re_hash_find_new(C, R, Pos, LastNew);re_hash_find_new([_Pos | C], R, FirstNew, LastNew) -> re_hash_find_new(C, R, FirstNew, LastNew);re_hash_find_new([], _R, start, start) -> false;re_hash_find_new([], R, FirstNew, LastNew) -> {R, FirstNew, LastNew}.re_hash_link([{Pos,_SPos} | C], LastOld, start, _LastInNew, Ws) -> re_hash_link(C, LastOld, Pos, true, Ws);re_hash_link([{Pos,_SPos} | C], LastOld, LastNew, false, Ws) -> re_hash_link(C, LastOld, Pos, true, [{Pos,<<LastNew:32>>} | Ws]);re_hash_link([{Pos,_SPos} | C], LastOld, _LastNew, LastInNew, Ws) -> re_hash_link(C, LastOld, Pos, LastInNew, Ws);re_hash_link([Pos | C], start, LastNew, true, Ws) -> re_hash_link(C, Pos, LastNew, false, [{Pos,<<0:32>>} | Ws]);re_hash_link([Pos | C], LastOld, LastNew, true, Ws) -> re_hash_link(C, Pos, LastNew, false, [{Pos,<<LastOld:32>>} | Ws]);re_hash_link([Pos | C], _LastOld, LastNew, LastInNew, Ws) -> re_hash_link(C, Pos, LastNew, LastInNew, Ws);re_hash_link([], _LastOld, _LastNew, _LastInNew, Ws) -> Ws.insert_new([{NewSlotPos,_4} | Rs], [<<P:32>> = PB | Bins], [N | Ns], Ws) -> {FirstNew, LastNew, LastInNew} = N, Ws1 = case P of 0 when LastInNew -> Ws; 0 -> [{LastNew, <<0:32>>} | Ws]; _ -> [{LastNew, PB} | Ws] end, NWs = [{NewSlotPos, <<FirstNew:32>>} | Ws1], insert_new(Rs, Bins, Ns, NWs);insert_new([], [], [], Ws) -> Ws.%% When writing the cache, a 'work list' is first created:%% WorkList = [{Key, {Delete,Lookup,[Inserted]}}]%% Delete = keep | delete%% Lookup = skip | lookup%% Inserted = {object(), No}%% No = integer()%% If No =< 0 then there will be -No instances of object() on the file%% when the cache has been written. If No > 0 then No instances of%% object() will be added to the file.%% If Delete has the value 'delete', then all objects with the key Key%% have been deleted. (This could be viewed as a shorthand for {Object,0}%% for each object Object on the file not mentioned in some Inserted.)%% If Lookup has the value 'lookup', all objects with the key Key will%% be returned.%%%% -> {NewHead, [LookedUpObject], pwrite_list()} | throw({NewHead, Error})write_cache(Head) -> #head{cache = C, type = Type} = Head, case dets_utils:is_empty_cache(C) of true -> {Head, [], []}; false -> {NewC, _MaxInserts, PerKey} = dets_utils:reset_cache(C), %% NoInsertedKeys is an upper limit on the number of new keys. {WL, NoInsertedKeys} = make_wl(PerKey, Type), Head1 = Head#head{cache = NewC}, case may_grow(Head1, NoInsertedKeys, once) of {Head2, ok} -> eval_work_list(Head2, WL); HeadError -> throw(HeadError) end end.make_wl(PerKey, Type) -> make_wl(PerKey, Type, [], 0).make_wl([{Key,L} | PerKey], Type, WL, Ins) -> [Cs | I] = wl(L, Type), make_wl(PerKey, Type, [{Key,Cs} | WL], Ins+I);make_wl([], _Type, WL, Ins) -> {WL, Ins}.wl(L, Type) -> wl(L, Type, keep, skip, 0, []).wl([{_Seq, delete_key} | Cs], Type, _Del, Lookup, _I, _Objs) -> wl(Cs, Type, delete, Lookup, 0, []);wl([{_Seq, {delete_object, Object}} | Cs], Type, Del, Lookup, I, Objs) -> NObjs = lists:keydelete(Object, 1, Objs), wl(Cs, Type, Del, Lookup, I, [{Object,0} | NObjs]);wl([{_Seq, {insert, Object}} | Cs], Type, _Del, Lookup, _I, _Objs) when Type =:= set -> wl(Cs, Type, delete, Lookup, 1, [{Object,-1}]);wl([{_Seq, {insert, Object}} | Cs], Type, Del, Lookup, _I, Objs) -> NObjs = case lists:keysearch(Object, 1, Objs) of {value, {_, 0}} -> lists:keyreplace(Object, 1, Objs, {Object,-1}); {value, {_, _C}} when Type =:= bag -> % C =:= 1; C =:= -1 Objs; {value, {_, C}} when C < 0 -> % when Type =:= duplicate_bag lists:keyreplace(Object, 1, Objs, {Object,C-1}); {value, {_, C}} -> % when C > 0, Type =:= duplicate_bag lists:keyreplace(Object, 1, Objs, {Object,C+1}); false when Del =:= delete -> [{Object, -1} | Objs]; false -> [{Object, 1} | Objs] end, wl(Cs, Type, Del, Lookup, 1, NObjs);wl([{_Seq, {lookup,_Pid}=Lookup} | Cs], Type, Del, _Lookup, I, Objs) -> wl(Cs, Type, Del, Lookup, I, Objs);wl([], _Type, Del, Lookup, I, Objs) -> [{Del, Lookup, Objs} | I].%% -> {NewHead, ok} | {NewHead, Error}may_grow(Head, _N, _How) when Head#head.fixed =/= false -> {Head, ok};may_grow(#head{access = read}=Head, _N, _How) -> {Head, ok};may_grow(Head, _N, _How) when Head#head.next >= ?MAXOBJS -> {Head, ok};may_grow(Head, N, How) -> Extra = lists:min([2*?SEGSZ, Head#head.no_objects + N - Head#head.next]), case catch may_grow1(Head, Extra, How) of {error, Reason} -> % alloc may throw error {Head, {error, Reason}}; Reply -> Reply end.may_grow1(Head, Extra, many_times) when Extra > ?SEGSZ -> Reply = grow(Head, 1, undefined), self() ! ?DETS_CALL(self(), may_grow), Reply;may_grow1(Head, Extra, _How) -> grow(Head, Extra, undefined).%% -> {Head, ok} | throw({Head, Error})grow(Head, Extra, _SegZero) when Extra =< 0 -> {Head, ok};grow(Head, Extra, undefined) -> grow(Head, Extra, seg_zero());grow(Head, Extra, SegZero) -> #head{n = N, next = Next, m = M} = Head, SegNum = ?SLOT2SEG(Next), {Head0, Ws1} = allocate_segment(Head, SegZero, SegNum), {Head1, ok} = dets_utils:pwrite(Head0, Ws1), %% If re_hash fails, segp_cache has been called, but it does not matter. {ok, Ws2} = re_hash(Head1, N), {Head2, ok} = dets_utils:pwrite(Head1, Ws2), NewHead = if N + ?SEGSZ =:= M -> Head2#head{n = 0, next = Next + ?SEGSZ, m = 2 * M, m2 = 4 * M}; true -> Head2#head{n = N + ?SEGSZ, next = Next + ?SEGSZ} end, grow(NewHead, Extra - ?SEGSZ, SegZero).seg_zero() -> <<0:(4*?SEGSZ)/unit:8>>.find_object(Head, Object) -> Key = element(Head#head.keypos, Object), Slot = db_hash(Key, Head), find_object(Head, Object, Slot). find_object(H, _Obj, Slot) when Slot >= H#head.next -> false;find_object(H, Obj, Slot) -> {_Pos, Chain} = chain(H, Slot), case catch find_obj(H, Obj, Chain) of {ok, Pos} -> {ok, Pos}; _Else -> false end.find_obj(H, Obj, Pos) when Pos > 0 -> {Next, _Sz, Term} = prterm(H, Pos, ?ReadAhead), if Term == Obj -> {ok, Pos}; true -> find_obj(H, Obj, Next) end.%% Given, a slot, return the {Pos, Chain} in the file where the%% objects hashed to this slot reside. Pos is the position in the%% file where the chain pointer is written and Chain is the position%% in the file where the first object resides.chain(Head, Slot) -> Pos = ?SEGADDR(?SLOT2SEG(Slot)), Segment = get_segp(Pos), FinalPos = Segment + (4 * ?REM2(Slot, ?SEGSZ)), {ok, <<Chain:32>>} = dets_utils:pread(Head, FinalPos, 4, 0), {FinalPos, Chain}.%%%%%% Cache routines depending on the dets file format.%%%%% -> {Head, [LookedUpObject], pwrite_list()} | throw({Head, Error})eval_work_list(Head, WorkLists) -> SWLs = tag_with_slot(WorkLists, Head, []), P1 = dets_utils:family(SWLs), {PerSlot, SlotPositions} = remove_slot_tag(P1, [], []), {ok, Bins} = dets_utils:pread(SlotPositions, Head), first_object(PerSlot, SlotPositions, Bins, Head, [], [], [], []).tag_with_slot([{K,_} = WL | WLs], Head, L) -> tag_with_slot(WLs, Head, [{db_hash(K, Head), WL} | L]);tag_with_slot([], _Head, L) -> L.remove_slot_tag([{S,SWLs} | SSWLs], Ls, SPs) -> remove_slot_tag(SSWLs, [SWLs | Ls], [slot_position(S) | SPs]);remove_slot_tag([], Ls, SPs) -> {Ls, SPs}.%% The initial chain pointers and the first object in each chain are%% read "in parallel", that is, with one call to file:pread/2 (two%% calls altogether). The following chain objects are read one by%% one. This is a compromise: if the chains are long and threads are%% active, it would be faster to keep a state for each chain and read%% the objects of the chains in parallel, but the overhead would be%% quite substantial.first_object([WorkLists | SPs], [{P1,_4} | Ss], [<<P2:32>> | Bs], Head, ObjsToRead, ToRead, Ls, LU) when P2 =:= 0 -> L0 = [{old,P1}], {L, NLU} = eval_slot(Head, ?ReadAhead, P2, WorkLists, L0, LU), first_object(SPs, Ss, Bs, Head, ObjsToRead, ToRead, [L | Ls], NLU);first_object([WorkLists | SPs], [{P1,_4} | Ss], [<<P2:32>> | Bs], Head,
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?