Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 80 additions & 34 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@
-define(WASTE_FP, "waste").
-define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd").
-define(ARCHIVE_FILEX, "bak").
% Note that archive means "no longer active", it is an indication of
% removable waste not of backup.
-define(TEST_KC, {[], infinity}).
-define(SHUTDOWN_LOOPS, 10).
-define(SHUTDOWN_PAUSE, 10000).
Expand Down Expand Up @@ -682,7 +685,7 @@ handle_call(roll, _From, State = #state{is_snapshot = Snap}) when
}}
end;
handle_call(
{backup, BackupPath}, _from, State
{backup, BackupPath}, _From, State
) when
State#state.is_snapshot == true
->
Expand All @@ -699,20 +702,21 @@ handle_call(
ExtendedBaseFN = BaseFN ++ "." ++ ?JOURNAL_FILEX,
BackupName = filename:join(BackupJFP, BaseFN),
true = leveled_cdb:finished_rolling(PidR),
case
Link =
file:make_link(
FN ++ "." ++ ?JOURNAL_FILEX,
BackupName ++ "." ++ ?JOURNAL_FILEX
)
of
),
case Link of
ok ->
ok;
{error, eexist} ->
ok
end,
{[{SQN, BackupName, PidR, LastKey} | ManAcc], [
ExtendedBaseFN | FTRAcc
]};
{
[{SQN, BackupName, PidR, LastKey} | ManAcc],
[ExtendedBaseFN | FTRAcc]
};
false ->
?STD_LOG(i0021, [FN, SQN, State#state.journal_sqn]),
{ManAcc, FTRAcc}
Expand Down Expand Up @@ -1263,7 +1267,7 @@ close_allmanifest([H | ManifestT]) ->
) ->
leveled_imanifest:manifest().
%% @doc
%% Open all the files in the manifets, and updating the manifest with the PIDs
%% Open all the files in the manifest, and updating the manifest with the PIDs
%% of the opened files
open_all_manifest([], RootPath, CDBOpts) ->
?STD_LOG(i0011, []),
Expand All @@ -1273,51 +1277,93 @@ open_all_manifest([], RootPath, CDBOpts) ->
true
);
open_all_manifest(Man0, RootPath, CDBOpts) ->
OnDiskJournalSet =
sets:from_list(get_all_completejournals(RootPath), [{version, 2}]),
Man1 = leveled_imanifest:to_list(Man0),
[{HeadSQN, HeadFN, _IgnorePid, HeadLK} | ManifestTail] = Man1,
OpenJournalFun =
fun(ManEntry) ->
fun(ManEntry, Acc) ->
{LowSQN, FN, _, LK_RO} = ManEntry,
CFN = FN ++ "." ++ ?JOURNAL_FILEX,
PFN = FN ++ "." ++ ?PENDING_FILEX,
case filelib:is_file(CFN) of
true ->
{ok, Pid} =
leveled_cdb:cdb_reopen_reader(CFN, LK_RO, CDBOpts),
{LowSQN, FN, Pid, LK_RO};
{
{LowSQN, FN, Pid, LK_RO},
sets:del_element(CFN, Acc)
};
false ->
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
{ok, Pid} = W,
{ok, Pid} = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
ok = leveled_cdb:cdb_roll(Pid),
LK_WR = leveled_cdb:cdb_lastkey(Pid),
{LowSQN, FN, Pid, LK_WR}
{
{LowSQN, FN, Pid, LK_WR},
Acc
}
end
end,
OpenedTailAsList = lists:map(OpenJournalFun, ManifestTail),
{
OpenedTailAsList,
FilteredOnDiskJournalSet
} =
lists:mapfoldl(OpenJournalFun, OnDiskJournalSet, ManifestTail),
OpenedTail = leveled_imanifest:from_list(OpenedTailAsList),
CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX,
PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX,
case filelib:is_file(CompleteHeadFN) of
true ->
?STD_LOG(i0012, [HeadFN]),
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
LastKey = {LastSQN, _, _} = leveled_cdb:cdb_lastkey(HeadR),
ManToHead =
StartedManifest =
case filelib:is_file(CompleteHeadFN) of
true ->
?STD_LOG(i0012, [HeadFN]),
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
LastKey = {LastSQN, _, _} = leveled_cdb:cdb_lastkey(HeadR),
ManToHead =
leveled_imanifest:add_entry(
OpenedTail,
{HeadSQN, HeadFN, HeadR, LastKey},
true
),
NewManEntry =
start_new_activejournal(LastSQN + 1, RootPath, CDBOpts),
leveled_imanifest:add_entry(ManToHead, NewManEntry, true);
false ->
{ok, HeadW} =
leveled_cdb:cdb_open_writer(PendingHeadFN, CDBOpts),
leveled_imanifest:add_entry(
OpenedTail,
{HeadSQN, HeadFN, HeadR, LastKey},
true
),
NewManEntry =
start_new_activejournal(LastSQN + 1, RootPath, CDBOpts),
leveled_imanifest:add_entry(ManToHead, NewManEntry, true);
false ->
{ok, HeadW} =
leveled_cdb:cdb_open_writer(PendingHeadFN, CDBOpts),
leveled_imanifest:add_entry(
OpenedTail, {HeadSQN, HeadFN, HeadW, HeadLK}, true
)
end.
OpenedTail, {HeadSQN, HeadFN, HeadW, HeadLK}, true
)
end,
lists:foreach(
fun(FN) ->
NewName =
filename:flatten([filename:rootname(FN), "." ++ ?ARCHIVE_FILEX]),
?STD_LOG(i0029, [FN]),
file:rename(FN, NewName)
end,
sets:to_list(
sets:del_element(CompleteHeadFN, FilteredOnDiskJournalSet)
)
),
StartedManifest.

-spec get_all_completejournals(string()) -> list(file:filename()).
get_all_completejournals(RootPath) ->
JFiles = list_dir(filepath(RootPath, journal_dir)),
CFiles = list_dir(filepath(RootPath, journal_compact_dir)),
lists:filter(
fun(FN) ->
filename:extension(FN) == ("." ++ ?JOURNAL_FILEX)
end,
JFiles ++ CFiles
).

-spec list_dir(string()) -> list(file:filename()).
list_dir(Path) ->
{ok, Files} = file:list_dir(Path),
lists:map(
fun(FN) -> filename:join(Path, FN) end, Files
).

start_new_activejournal(SQN, RootPath, CDBOpts) ->
Filename = filepath(RootPath, SQN, new_journal),
Expand Down
3 changes: 3 additions & 0 deletions src/leveled_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@
{debug, <<"Shutdown complete for cloned Inker for reason ~w">>},
i0028 =>
{debug, <<"Shutdown complete for Inker for reason ~w">>},
i0029 =>
{warning,
<<"Journal with FN=~s renamed to backup as not active in manifest">>},
ic001 =>
{info, <<"Closed for reason ~w so maybe leaving garbage">>},
ic002 =>
Expand Down
45 changes: 43 additions & 2 deletions test/end_to_end/recovery_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -859,15 +859,56 @@ aae_missingjournal(_Config) ->
{async, AllHeadF2} =
leveled_bookie:book_returnfolder(
Bookie2,
{foldheads_allkeys, ?RIAK_TAG, FoldHeadsFun, true, true, false,
false, false}
{
foldheads_allkeys,
?RIAK_TAG,
FoldHeadsFun,
true,
true,
false,
false,
false
}
),
HeadL2 = length(AllHeadF2()),
io:format("Fold head returned ~w objects~n", [HeadL2]),
true = HeadL2 < HeadL1,
true = HeadL2 > 0,

ok = leveled_bookie:book_close(Bookie2),

% Add extra journal file - check it gets switched to .bak
FNXJ = RootPath ++ "/journal/journal_files/extra_file",
FNXC = RootPath ++ "/journal/journal_files/post_compact/extra_file",
ok = file:write_file(FNXJ ++ ".cdb", <<"NotaCDB">>),
ok = file:write_file(FNXC ++ ".cdb", <<"NotaCDB">>),
{ok, _} = file:read_file_info(FNXJ ++ ".cdb"),
{ok, _} = file:read_file_info(FNXC ++ ".cdb"),

{ok, Bookie3} = leveled_bookie:book_start(StartOpts),
{async, AllHeadF3} =
leveled_bookie:book_returnfolder(
Bookie3,
{
foldheads_allkeys,
?RIAK_TAG,
FoldHeadsFun,
true,
true,
false,
false,
false
}
),
HeadL3 = length(AllHeadF3()),
true = HeadL3 == HeadL2,

ok = leveled_bookie:book_close(Bookie3),

{error, enoent} = file:read_file_info(FNXJ ++ ".cdb"),
{error, enoent} = file:read_file_info(FNXJ ++ ".cdb"),
{ok, <<"NotaCDB">>} = file:read_file(FNXJ ++ ".bak"),
{ok, <<"NotaCDB">>} = file:read_file(FNXC ++ ".bak"),
testutil:reset_filestructure().

simple_cachescoring(_Config) ->
Expand Down