dets_v8.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,591 行 · 第 1/4 页
ERL
1,591 行
next = FH#fileheader.next, fptr = Fd, no_objects= FH#fileheader.no_objects, n = FH#fileheader.n, type = FH#fileheader.type, update_mode = saved, auto_save = infinity, % not saved on file fixed = false, % not saved on file freelists_p = FH#fileheader.freelist, hash_bif = HashAlg, keypos = FH#fileheader.keypos, min_no_slots = FH#fileheader.min_no_slots, max_no_slots = FH#fileheader.max_no_slots, version = ?FILE_FORMAT_VERSION, mod = ?MODULE, bump = ?BUMP, base = ?BASE}, {ok, H, ExtraInfo}; Error -> Error end.cache_segps(Fd, FileName, M) -> NSegs = no_segs(M), {ok, Bin} = dets_utils:pread_close(Fd, FileName, ?HEADSZ, 4 * NSegs), Fun = fun(S, P) -> segp_cache(P, S), P+4 end, lists:foldl(Fun, ?HEADSZ, bin2ints(Bin)).no_segs(NoSlots) -> ?SLOT2SEG(NoSlots - 1) + 1.bin2ints(<<Int:32, B/binary>>) -> [Int | bin2ints(B)];bin2ints(<<>>) -> [].%%%%%% Repair, conversion and initialization of a dets file.%%%bulk_input(Head, InitFun, Cntrs) -> bulk_input(Head, InitFun, Cntrs, make_ref()).bulk_input(Head, InitFun, Cntrs, Ref) -> fun(close) -> ok; (read) -> case catch {Ref, InitFun(read)} of {Ref, end_of_input} -> end_of_input; {Ref, {L0, NewInitFun}} when is_list(L0), is_function(NewInitFun) -> Kp = Head#head.keypos, case catch bulk_objects(L0, Head, Cntrs, Kp, []) of {'EXIT', _Error} -> _ = (catch NewInitFun(close)), {error, invalid_objects_list}; L -> {L, bulk_input(Head, NewInitFun, Cntrs, Ref)} end; {Ref, Value} -> {error, {init_fun, Value}}; Error -> throw({thrown, Error}) end end.bulk_objects([T | Ts], Head, Cntrs, Kp, L) -> BT = term_to_binary(T), Sz = size(BT), LogSz = sz2pos(Sz+?OHDSZ), count_object(Cntrs, LogSz), Key = element(Kp, T), bulk_objects(Ts, Head, Cntrs, Kp, [make_object(Head, Key, LogSz, BT) | L]);bulk_objects([], _Head, _Cntrs, _Kp, L) -> L.-define(FSCK_SEGMENT, 10000).-define(DCT(D, CT), [D | CT]).-define(VNEW(N, E), erlang:make_tuple(N, E)).-define(VSET(I, V, E), setelement(I, V, E)).-define(VGET(I, V), element(I, V)).%% OldVersion not used, assuming later versions have been converted already.output_objs(OldVersion, Head, SlotNumbers, Cntrs) -> fun(close) -> {ok, 0, Head}; ([]) -> output_objs(OldVersion, Head, SlotNumbers, Cntrs); (L) -> %% Descending sizes. Count = lists:sort(ets:tab2list(Cntrs)), RCount = lists:reverse(Count), NoObjects = lists:foldl(fun({_Sz,No}, A) -> A + No end, 0, Count), {_, MinSlots, _} = SlotNumbers, if %% Using number of objects for bags and duplicate bags %% is not ideal; number of (unique) keys should be %% used instead. The effect is that there will be more %% segments than "necessary". MinSlots =/= bulk_init, abs(?SLOT2SEG(NoObjects) - ?SLOT2SEG(MinSlots)) > 5, (NoObjects < ?MAXOBJS) -> {try_again, NoObjects}; true -> Head1 = Head#head{no_objects = NoObjects}, SegSz = actual_seg_size(), {_, End, _} = dets_utils:alloc(Head, SegSz-1), %% Now {LogSize,NoObjects} in Cntrs is replaced by %% {LogSize,Position,{FileName,FileDescriptor},NoObjects}. {Head2, CT} = allocate_all_objects(Head1, RCount, Cntrs), [E | Es] = bin2term(L, []), {NE, Acc, DCT1} = output_slots(E, Es, [E], Head2, ?DCT(0, CT)), NDCT = write_all_sizes(DCT1, Cntrs), Max = ets:info(Cntrs, size), output_objs2(NE, Acc, Head2, Cntrs, NDCT, End, Max,Max) end end.output_objs2(E, Acc, Head, Cntrs, DCT, End, 0, MaxNoChunks) -> NDCT = write_all_sizes(DCT, Cntrs), output_objs2(E, Acc, Head, Cntrs, NDCT, End, MaxNoChunks, MaxNoChunks);output_objs2(E, Acc, Head, Cntrs, DCT, End, ChunkI, MaxNoChunks) -> fun(close) -> DCT1 = output_slot(Acc, Head, DCT), NDCT = write_all_sizes(DCT1, Cntrs), ?DCT(NoDups, CT) = NDCT, [SegAddr | []] = ?VGET(size(CT), CT), FinalZ = End - SegAddr, [{?FSCK_SEGMENT, _, {FileName, Fd}, _}] = ets:lookup(Cntrs, ?FSCK_SEGMENT), ok = dets_utils:fwrite(Fd, FileName, dets_utils:make_zeros(FinalZ)), NewHead = Head#head{no_objects = Head#head.no_objects - NoDups}, {ok, NoDups, NewHead}; (L) -> Es = bin2term(L, []), {NE, NAcc, NDCT} = output_slots(E, Es, Acc, Head, DCT), output_objs2(NE, NAcc, Head, Cntrs, NDCT, End, ChunkI-1, MaxNoChunks) end.%% By allocating bigger objects before smaller ones, holes in the%% buddy system memory map are avoided. Unfortunately, the segments%% are always allocated first, so if there are objects bigger than a%% segment, there is a hole to handle. (Haven't considered placing the%% segments among other objects of the same size.)allocate_all_objects(Head, Count, Cntrs) -> SegSize = actual_seg_size(), {Head1, HSz, HN, HA} = alloc_hole(Count, Head, SegSize), {Max, _} = hd(Count), CT = ?VNEW(Max+1, not_used), {Head2, NCT} = allocate_all(Head1, Count, Cntrs, CT), Head3 = free_hole(Head2, HSz, HN, HA), {Head3, NCT}.alloc_hole([{LSize,_} | _], Head, SegSz) when ?POW(LSize-1) > SegSz -> {_, SegAddr, _} = dets_utils:alloc(Head, SegSz-1), Size = ?POW(LSize-1)-1, {_, Addr, _} = dets_utils:alloc(Head, Size), N = (Addr - SegAddr) div SegSz, Head1 = dets_utils:alloc_many(Head, SegSz, N, SegAddr), {Head1, SegSz-1, N, SegAddr};alloc_hole(_Count, Head, _SegSz) -> {Head, 0, 0, 0}.free_hole(Head, _Size, 0, _Addr) -> Head;free_hole(Head, Size, N, Addr) -> {Head1, _} = dets_utils:free(Head, Addr, Size), free_hole(Head1, Size, N-1, Addr+Size+1).%% One (temporary) file for each buddy size, write all objects of that%% size to the file.allocate_all(Head, [{LSize,NoObjects} | Count], Cntrs, CT) -> Size = ?POW(LSize-1)-1, {_Head, Addr, _} = dets_utils:alloc(Head, Size), NewHead = dets_utils:alloc_many(Head, Size+1, NoObjects, Addr), {FileName, Fd} = temp_file(Head, LSize), true = ets:insert(Cntrs, {LSize, Addr, {FileName, Fd}, NoObjects}), NCT = ?VSET(LSize, CT, [Addr | []]), allocate_all(NewHead, Count, Cntrs, NCT);allocate_all(Head, [], Cntrs, CT) -> %% Note that space for the segments has been allocated already. %% And one file for the segments... {FileName, Fd} = temp_file(Head, ?FSCK_SEGMENT), Addr = ?SEGADDR(?SEGARRSZ), true = ets:insert(Cntrs, {?FSCK_SEGMENT, Addr, {FileName, Fd}, 0}), NCT = ?VSET(size(CT), CT, [Addr | []]), {Head, NCT}.temp_file(Head, N) -> TmpName = lists:concat([Head#head.filename, '.', N]), {ok, Fd} = dets_utils:open(TmpName, [raw, binary, write]), {TmpName, Fd}.bin2term([<<Slot:32, LogSize:8, BinTerm/binary>> | BTs], L) -> bin2term(BTs, [{Slot, LogSize, BinTerm} | L]);bin2term([], L) -> lists:reverse(L).write_all_sizes(?DCT(D, CT), Cntrs) -> ?DCT(D, write_sizes(1, size(CT), CT, Cntrs)).write_sizes(Sz, Sz, CT, Cntrs) -> write_size(Sz, ?FSCK_SEGMENT, CT, Cntrs);write_sizes(Sz, MaxSz, CT, Cntrs) -> NCT = write_size(Sz, Sz, CT, Cntrs), write_sizes(Sz+1, MaxSz, NCT, Cntrs).write_size(Sz, I, CT, Cntrs) -> case ?VGET(Sz, CT) of not_used -> CT; [Addr | L] -> {FileName, Fd} = ets:lookup_element(Cntrs, I, 3), case file:write(Fd, lists:reverse(L)) of ok -> ?VSET(Sz, CT, [Addr | []]); Error -> dets_utils:file_error(FileName, Error) end end.output_slots(E, [E1 | Es], Acc, Head, DCT) when element(1, E) =:= element(1, E1) -> output_slots(E1, Es, [E1 | Acc], Head, DCT);output_slots(_E, [E | L], Acc, Head, DCT) -> NDCT = output_slot(Acc, Head, DCT), output_slots(E, L, [E], Head, NDCT);output_slots(E, [], Acc, _Head, DCT) -> {E, Acc, DCT}.output_slot([E], _Head, ?DCT(D, CT)) -> ?DCT(D, output_slot([{foo, E}], 0, foo, CT));output_slot(Es0, Head, ?DCT(D, CT)) -> Kp = Head#head.keypos, Fun = fun({_Slot, _LSize, BinTerm} = E) -> Key = element(Kp, binary_to_term(BinTerm)), {Key, E} end, Es = lists:map(Fun, Es0), NEs = case Head#head.type of set -> [{Key0,_} = E | L0] = lists:sort(Es), choose_one(lists:sort(L0), Key0, [E]); bag -> lists:usort(Es); duplicate_bag -> lists:sort(Es) end, Dups = D + length(Es) - length(NEs), ?DCT(Dups, output_slot(NEs, 0, foo, CT)).choose_one([{Key,_} | Es], Key, L) -> choose_one(Es, Key, L);choose_one([{Key,_} = E | Es], _Key, L) -> choose_one(Es, Key, [E | L]);choose_one([], _Key, L) -> L.output_slot([E | Es], Next, _Slot, CT) -> {_Key, {Slot, LSize, BinTerm}} = E, Size = size(BinTerm), Size2 = ?POW(LSize-1), Pad = <<0:(Size2-Size-?OHDSZ)/unit:8>>, BinObject = [<<Next:32, Size:32, ?ACTIVE:32>>, BinTerm | Pad], [Addr | L] = ?VGET(LSize, CT), NCT = ?VSET(LSize, CT, [Addr+Size2 | [BinObject | L]]), output_slot(Es, Addr, Slot, NCT);output_slot([], Next, Slot, CT) -> I = size(CT), [Addr | L] = ?VGET(I, CT), {Pos, _} = slot_position(Slot), NoZeros = Pos - Addr, BinObject = if NoZeros > 100 -> [dets_utils:make_zeros(NoZeros) | <<Next:32>>]; true -> <<0:NoZeros/unit:8,Next:32>> end, Size = NoZeros+4, ?VSET(I, CT, [Addr+Size | [BinObject | L]]).%% Does not close Fd.fsck_input(Head, Fd, Cntrs, _FileHeader) -> %% The file is not compressed, so the object size cannot exceed %% the filesize, for all objects. MaxSz = case file:position(Fd, eof) of {ok, Pos} -> Pos; _ -> (1 bsl 32) - 1 end, State0 = fsck_read(?BASE, Fd, []), fsck_input1(Head, State0, Fd, MaxSz, Cntrs).fsck_input1(Head, State, Fd, MaxSz, Cntrs) -> fun(close) -> ok; (read) -> case State of done -> end_of_input; {done, L} -> R = count_input(Cntrs, L, []), {R, fsck_input1(Head, done, Fd, MaxSz, Cntrs)}; {cont, L, Bin, Pos} -> R = count_input(Cntrs, L, []), FR = fsck_objs(Bin, Head#head.keypos, Head, []), NewState = fsck_read(FR, Pos, Fd, MaxSz, Head), {R, fsck_input1(Head, NewState, Fd, MaxSz, Cntrs)} end end.%% The ets table Cntrs is used for counting objects per size.count_input(Cntrs, [[LogSz | B] | Ts], L) -> count_object(Cntrs, LogSz), count_input(Cntrs, Ts, [B | L]);count_input(_Cntrs, [], L) -> L.count_object(Cntrs, LogSz) -> case catch ets:update_counter(Cntrs, LogSz, 1) of N when is_integer(N) -> ok; _Badarg -> true = ets:insert(Cntrs, {LogSz, 1}) end.fsck_read(Pos, F, L) -> case file:position(F, Pos) of {ok, _} -> read_more_bytes(<<>>, 0, Pos, F, L); _Error -> {done, L} end.fsck_read({more, Bin, Sz, L}, Pos, F, MaxSz, Head) when Sz > MaxSz -> FR = skip_bytes(Bin, ?BUMP, Head#head.keypos, Head, L), fsck_read(FR, Pos, F, MaxSz, Head);fsck_read({more, Bin, Sz, L}, Pos, F, _MaxSz, _Head) -> read_more_bytes(Bin, Sz, Pos, F, L);fsck_read({new, Skip, L}, Pos, F, _MaxSz, _Head) -> NewPos = Pos + Skip, fsck_read(NewPos, F, L).read_more_bytes(B, Min, Pos, F, L) -> Max = if Min < ?CHUNK_SIZE -> ?CHUNK_SIZE; true -> Min end, case dets_utils:read_n(F, Max) of eof -> {done, L}; Bin -> NewPos = Pos + size(Bin), {cont, L, list_to_binary([B, Bin]), NewPos} end.fsck_objs(Bin = <<_N:32, Sz:32, Status:32, Tail/binary>>, Kp, Head, L) -> if Status =:= ?ACTIVE -> case Tail of <<BinTerm:Sz/binary, Tail2/binary>> -> case catch element(Kp, binary_to_term(BinTerm)) of {'EXIT', _} -> skip_bytes(Bin, ?BUMP, Kp, Head, L); Key -> LogSz = sz2pos(Sz+?OHDSZ), Obj = make_object(Head, Key, LogSz, BinTerm), NL = [[LogSz | Obj] | L], Skip = ?POW(LogSz-1) - Sz - ?OHDSZ, skip_bytes(Tail2, Skip, Kp, Head, NL) end; _ -> {more, Bin, Sz, L} end; true -> skip_bytes(Bin, ?BUMP, Kp, Head, L) end;fsck_objs(Bin, _Kp, _Head, L) -> {more, Bin, 0, L}. %% Version 8 has to know about version 9.make_object(Head, Key, _LogSz, BT) when Head#head.version =:= 9 -> Slot = dets_v9:db_hash(Key, Head), <<Slot:32, BT/binary>>;make_object(Head, Key, LogSz, BT) -> Slot = db_hash(Key, Head), <<Slot:32, LogSz:8, BT/binary>>.%% Inlined.skip_bytes(Bin, Skip, Kp, Head, L) -> case Bin of <<_:Skip/binary, Tail/binary>> -> fsck_objs(Tail, Kp, Head, L);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?