Skip to content

Commit ff627a5

Browse files
author
wulei
committed
refactor code & fix bug
1 parent 0af81a8 commit ff627a5

File tree

3 files changed

+79
-43
lines changed

3 files changed

+79
-43
lines changed

c_src/elmdb_nif.c

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ static void lmdb_close(lmdb_env_t* lmdb) {
4444
if (lmdb->layers) {
4545
DBG("\tfree dbnames & destroy kh");
4646
const char* dbname = NULL;
47-
MDB_dbi dbi;
4847
enif_rwlock_rwlock(lmdb->layers_rwlock);
48+
MDB_dbi dbi;
4949
kh_foreach(lmdb->layers, dbname, dbi, {
5050
DBG("\tfree dbi: %s", dbname);
51-
//mdb_dbi_close(lmdb->env, dbi);
51+
mdb_dbi_close(lmdb->env, dbi);
5252
free((void*)dbname);
5353
});
5454
kh_destroy(layer, lmdb->layers);
@@ -57,6 +57,8 @@ static void lmdb_close(lmdb_env_t* lmdb) {
5757
}
5858
if (lmdb->env) {
5959
DBG("\tclose env!");
60+
// All transactions, databases, and cursors
61+
// must already be closed before calling this function.
6062
mdb_env_close(lmdb->env);
6163
lmdb->env = NULL;
6264
}
@@ -70,33 +72,34 @@ static void lmdb_close(lmdb_env_t* lmdb) {
7072

7173
static void lmdb_dtor(ErlNifEnv* env, void* obj) {
7274
__UNUSED(env);
73-
INFO_LOG("destroy...... lmdb.env -> %p", obj);
75+
INFO_LOG("env-dtor is destroying lmdb.env -> %p", obj);
7476
lmdb_env_t *lmdb = (lmdb_env_t*)obj;
7577
lmdb_close(lmdb);
7678
}
7779

7880
typedef struct lmdb_cursor_s {
7981
MDB_cursor *cur;
8082
MDB_cursor_op op;
81-
MDB_txn *txn;
82-
MDB_dbi dbi;
83+
unsigned int dbflag;
8384
lmdb_env_t* lmdb;
8485
} lmdb_cursor_t;
8586

8687
static void lmdb_cursor_close(lmdb_cursor_t* cursor) {
8788
if (cursor && cursor->cur) {
8889
DBG("close cursor...");
90+
MDB_txn *txn = mdb_cursor_txn(cursor->cur);
91+
DBG("closed db's txn: %p", txn);
8992
mdb_cursor_close(cursor->cur);
90-
mdb_txn_commit(cursor->txn);
91-
mdb_dbi_close(cursor->lmdb->env, cursor->dbi);
93+
mdb_txn_abort(txn);
94+
// If the transaction is aborted the dbi will be closed automatically
9295

9396
enif_release_resource(cursor->lmdb);
9497
cursor->cur = NULL;
9598
}
9699
}
97100
static void lmdb_cursor_dtor(ErlNifEnv* env, void* obj) {
98101
__UNUSED(env);
99-
INFO_LOG("destroy...... lmdb.env -> %p", obj);
102+
INFO_LOG("cursor-dtor is destroying, with lmdb.env: %p", obj);
100103
lmdb_cursor_t *cursor = (lmdb_cursor_t*)obj;
101104
lmdb_cursor_close(cursor);
102105
}
@@ -236,6 +239,7 @@ static ERL_NIF_TERM elmdb_drop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
236239
CHECK(mdb_txn_begin(handle->env, NULL, 0, &txn), err2);
237240
MDB_dbi dbi;
238241
CHECK(mdb_dbi_open(txn, dbname, 0, &dbi), err1);
242+
// 1 to delete DB from the environment and close the DB handle
239243
CHECK(mdb_drop(txn, dbi, 1), err1);
240244
CHECK(mdb_txn_commit(txn), err1);
241245

@@ -356,7 +360,6 @@ static ERL_NIF_TERM elmdb_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
356360
MDB_txn *txn = NULL;
357361
CHECK(mdb_txn_begin(handle->env, NULL, 0, &txn), err2);
358362

359-
MDB_dbi dbi;
360363
char dbname[SUBDB_NAME_SZ] = {0};
361364
memcpy(dbname, layBin.data, layBin.size);
362365

@@ -366,10 +369,6 @@ static ERL_NIF_TERM elmdb_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
366369
if ((it=kh_get(layer,handle->layers, dbname)) == kh_end(handle->layers)) {
367370
DBG("the layer(%s) not found, create it.", dbname);
368371
dbiFlags |= mykey.type;
369-
int absent = 0;
370-
khiter_t k = kh_put(layer, handle->layers, dbname, &absent);
371-
if (absent) kh_key(handle->layers, k) = strndup(dbname, sizeof(dbname));
372-
kh_value(handle->layers, k) = dbiFlags;
373372
dbiFlags |= MDB_CREATE;
374373
}
375374
else {
@@ -382,8 +381,16 @@ static ERL_NIF_TERM elmdb_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
382381
goto err1;
383382
}
384383
}
384+
385+
MDB_dbi dbi;
385386
// must not be called from multiple concurrent transactions in the same process
386387
CHECK(mdb_dbi_open(txn, dbname, dbiFlags, &dbi), err1);
388+
if (dbiFlags & MDB_CREATE) {
389+
int absent = 0;
390+
it = kh_put(layer, handle->layers, dbname, &absent);
391+
if (absent) kh_key(handle->layers, it) = strndup(dbname, sizeof(dbname));
392+
kh_value(handle->layers, it) = mykey.type;
393+
}
387394
enif_rwlock_rwunlock(handle->layers_rwlock);
388395

389396
//mdb_dbi_flags(txn, dbi, &dbiFlags);
@@ -393,13 +400,18 @@ static ERL_NIF_TERM elmdb_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
393400
val.mv_data = valTerm.data;
394401

395402
CHECK(mdb_put(txn, dbi, &mykey.key, &val, 0), err2);
396-
CHECK(mdb_txn_commit(txn), err2);
403+
CHECK(mdb_txn_commit(txn), err3);
404+
// After a successful commit dbi will reside in the shared environment,
405+
// and may be used by other transactions.
397406
return argv[0];
398407

399408
err1:
400409
enif_rwlock_rwunlock(handle->layers_rwlock);
401410
err2:
402411
mdb_txn_abort(txn);
412+
// If the transaction is aborted, dbi will be closed automatically.
413+
414+
err3:
403415
return err;
404416
}
405417

@@ -755,6 +767,19 @@ static ERL_NIF_TERM elmdb_to_map(ErlNifEnv* env, int argc, const ERL_NIF_TERM ar
755767
return enif_raise_exception(env, err);
756768
}
757769

770+
static ERL_NIF_TERM elmdb_close_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
771+
__UNUSED(argc);
772+
lmdb_cursor_t *cursor = NULL;
773+
if (!enif_get_resource(env, argv[0], lmdbCursorResType, (void**)&cursor)) {
774+
return enif_make_badarg(env);
775+
}
776+
if (cursor->cur == NULL) return enif_raise_exception(env, enif_make_string(env, "closed cursor", ERL_NIF_LATIN1));
777+
778+
lmdb_cursor_close(cursor);
779+
780+
return ATOM_OK;
781+
}
782+
758783
static ERL_NIF_TERM elmdb_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
759784
__UNUSED(argc);
760785
lmdb_cursor_t *cursor = NULL;
@@ -763,14 +788,11 @@ static ERL_NIF_TERM elmdb_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
763788
}
764789
if (cursor->cur == NULL) return enif_raise_exception(env, enif_make_string(env, "closed cursor", ERL_NIF_LATIN1));
765790

766-
unsigned int dbflag = 0;
767791
int ret = 0;
768-
ERL_NIF_TERM err;
769-
CHECK(mdb_dbi_flags(cursor->txn, cursor->dbi, &dbflag), err1);
770792
MDB_val key, val;
771793
if ((ret = mdb_cursor_get(cursor->cur, &key, &val, cursor->op)) != MDB_NOTFOUND) {
772794
ERL_NIF_TERM keyTerm;
773-
if (dbflag & MDB_INTEGERKEY) {
795+
if (cursor->dbflag & MDB_INTEGERKEY) {
774796
keyTerm = enif_make_int64(env, *((ErlNifSInt64*)key.mv_data));
775797
}
776798
else {
@@ -786,8 +808,6 @@ static ERL_NIF_TERM elmdb_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
786808
}
787809

788810
return enif_make_atom(env, "end_of_table");
789-
err1:
790-
return enif_raise_exception(env, err);
791811
}
792812

793813
static ERL_NIF_TERM elmdb_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
@@ -820,7 +840,7 @@ static ERL_NIF_TERM elmdb_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
820840
MDB_txn *txn = NULL;
821841
CHECK(mdb_txn_begin(handle->env, NULL, MDB_RDONLY, &txn), err2);
822842
MDB_dbi dbi;
823-
CHECK(mdb_dbi_open(txn, dbname, 0, &dbi), err2);
843+
CHECK(mdb_dbi_open(txn, dbname, 0, &dbi), err1);
824844
DBG("open sub-db: %d for %s", dbi, dbname);
825845
unsigned int dbflag = 0;
826846
CHECK(mdb_dbi_flags(txn, dbi, &dbflag), err1);
@@ -831,8 +851,7 @@ static ERL_NIF_TERM elmdb_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
831851
*cursor = (lmdb_cursor_t) {
832852
.cur = cur,
833853
.op = MDB_FIRST,
834-
.txn = txn,
835-
.dbi = dbi,
854+
.dbflag = dbflag,
836855
.lmdb = handle,
837856
};
838857
enif_keep_resource(handle);
@@ -842,9 +861,8 @@ static ERL_NIF_TERM elmdb_iter(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
842861
return term;
843862

844863
err1:
845-
mdb_dbi_close(handle->env, dbi);
846-
err2:
847864
mdb_txn_abort(txn);
865+
err2:
848866
return enif_raise_exception(env, err);
849867
}
850868

@@ -934,8 +952,9 @@ static ErlNifFunc nif_funcs[] = {
934952
{"range", 3, elmdb_range, ERL_NIF_DIRTY_JOB_IO_BOUND},
935953
{"range", 4, elmdb_range, ERL_NIF_DIRTY_JOB_IO_BOUND},
936954
{"to_map", 2, elmdb_to_map, ERL_NIF_DIRTY_JOB_IO_BOUND},
937-
{"iter", 2, elmdb_iter, ERL_NIF_DIRTY_JOB_IO_BOUND},
938-
{"next", 1, elmdb_next, ERL_NIF_DIRTY_JOB_IO_BOUND},
955+
{"iter", 2, elmdb_iter, ERL_NIF_DIRTY_JOB_IO_BOUND},
956+
{"close_iter", 1, elmdb_close_iter,ERL_NIF_DIRTY_JOB_IO_BOUND},
957+
{"next", 1, elmdb_next, ERL_NIF_DIRTY_JOB_IO_BOUND},
939958
{"hello", 1, hello, 0}
940959
};
941960

src/elmdb.erl

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
-export([range/4]).
1818
-export([ls/1]).
1919
-export([foldl/4]).
20-
-export([iter/2]).
21-
-export([next/1]).
2220

2321
-on_load(on_load/0).
22+
2423
on_load() ->
2524
PrivDir = case code:priv_dir(?MODULE) of
2625
{error, _} ->
@@ -99,13 +98,16 @@ ls(_LmdbRes) ->
9998
erlang:nif_error({not_loaded, ?MODULE}).
10099

101100
foldl(LmdbRes, Layer, Acc0, Fun) ->
101+
io:format(user, "LAYER=~ts, by~p~n", [Layer, self()]),
102102
LFun = fun({Key, Value}, Acc) ->
103103
Fun({{Layer, Key}, Value}, Acc)
104104
end,
105105
Cursor = iter(LmdbRes, Layer),
106106
travel(next(Cursor), Cursor, Acc0, LFun).
107107

108-
travel(end_of_table, _Cursor, Acc, _Fun) ->
108+
travel(end_of_table, Cursor, Acc, _Fun) ->
109+
io:format(user, "MUST close cursor here, but NOT, TODO ~w~n", [end_of_table]),
110+
close_iter(Cursor),
109111
Acc;
110112
travel({Key, Value}, Cursor, Acc, Fun) ->
111113
NewAcc = Fun({Key, Value}, Acc),
@@ -115,6 +117,10 @@ travel({Key, Value}, Cursor, Acc, Fun) ->
115117
iter(_LmdbRes, _Layer) ->
116118
erlang:nif_error({not_loaded, ?MODULE}).
117119

120+
-spec close_iter(reference()) -> ok.
121+
close_iter(_LmdbRes) ->
122+
erlang:nif_error({not_loaded, ?MODULE}).
123+
118124
-spec next(reference()) -> {binary(), binary()}.
119125
next(_Cursor) ->
120126
erlang:nif_error({not_loaded, ?MODULE}).

test/elmdb_foldl_test.erl

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,23 @@ startup() ->
1010
D.
1111

1212
teardown(D) ->
13-
[elmdb:drop(D, L) || L <- elmdb:ls(D)],
13+
?debugFmt("drop all tables of ~p",[D]),
14+
% [elmdb:drop(D, L) || L <- elmdb:ls(D)],
1415
%%ok = elmdb:close(D),
15-
16-
elmdb:dispose(D).
16+
?debugFmt("dispose db: ~p", [D]),
17+
elmdb:dispose(D),
18+
ok.
1719

1820
fold_db(D) ->
19-
[fold_normal_db(D), fold_empty_db(D)].
21+
{inorder,
22+
[?_test(fold_normal_db(D))
23+
, ?_test(fold_empty_db(D))
24+
]
25+
}
26+
.
2027

2128
fold_normal_db(D) ->
22-
?debugMsg("fold......"),
29+
?debugFmt("fold_normal_db........~w", [D]),
2330
Layer = "accum",
2431
I1 = <<1:32/integer>>,
2532
I2 = <<2:32/integer>>,
@@ -31,15 +38,19 @@ fold_normal_db(D) ->
3138
?debugFmt("K = ~p",[K]),
3239
<<I:32/integer>> = V,
3340
I + Acc end,
34-
[?_assertEqual(6, elmdb:foldl(D, Layer, 0, Fn))].
41+
?debugFmt("Fn ~w~n", [Fn]),
42+
%Res = elmdb:foldl(D, Layer, 0, Fn),
43+
?debugMsg("after fold!"),
44+
%?_assertEqual(6, Res).
45+
?assertEqual(6, elmdb:foldl(D, Layer, 0, Fn)).
46+
3547
fold_empty_db(D) ->
36-
Layer = "empty",
37-
elmdb:put(D, {Layer, <<"i1">>}, <<1:32/integer>>),
38-
elmdb:del(D, {Layer, <<"i1">>}),
39-
Fn = fun({_K,V}, Acc) ->
40-
<<I:32/integer>> = V,
41-
I + Acc end,
42-
[?_assertEqual(0, elmdb:foldl(D, Layer, 0, Fn))].
48+
?debugFmt("fold_empty_db........~w, by ~p~n", [D, self()]),
49+
[elmdb:drop(D, L) || L <- elmdb:ls(D)],
50+
?debugFmt("drop all... by ~p~n", [self()]),
51+
%Cnt = lists:sum(elmdb:ls(D)),
52+
%?_assertEqual(0, Cnt).
53+
?assertEqual(0, lists:sum(elmdb:ls(D))).
4354

4455
travel_test_() ->
4556
[{"Try to foldl all elements",

0 commit comments

Comments
 (0)