Skip to content

Commit bde272f

Browse files
code cleanup
1 parent e853e7b commit bde272f

File tree

1 file changed

+187
-102
lines changed

1 file changed

+187
-102
lines changed

src/barnacle.gleam

Lines changed: 187 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,18 @@ import gleam/result
1111
import gleam/set
1212
import gleam/string
1313

14+
// ----- Setup functions ----- //
15+
1416
pub opaque type Barnacle(error) {
1517
Barnacle(
16-
discover_nodes: fn() -> Result(List(atom.Atom), error),
18+
strategy: Strategy(error),
1719
poll_interval: Int,
1820
listener: Option(Subject(BarnacleResponse(error))),
1921
)
2022
}
2123

2224
fn default_barnacle() -> Barnacle(a) {
23-
Barnacle(discover_nodes: fn() { Ok([]) }, poll_interval: 5000, listener: None)
24-
}
25-
26-
pub fn local_epmd() -> Barnacle(Nil) {
27-
Barnacle(..default_barnacle(), discover_nodes: local_epmd.discover_nodes)
28-
}
29-
30-
pub fn epmd(nodes: List(atom.Atom)) -> Barnacle(Nil) {
31-
Barnacle(..default_barnacle(), discover_nodes: fn() { Ok(nodes) })
25+
Barnacle(strategy: default_strategy(), poll_interval: 5000, listener: None)
3226
}
3327

3428
pub fn with_poll_interval(
@@ -45,55 +39,120 @@ pub fn with_listener(
4539
Barnacle(..barnacle, listener: Some(listener))
4640
}
4741

48-
fn spec(
49-
barnacle: Barnacle(error),
50-
parent: Option(Subject(Subject(Message(error)))),
51-
) {
52-
actor.Spec(init_timeout: 10_000, loop: handle_message, init: fn() {
53-
case refresh_nodes(barnacle) {
54-
Ok(_) -> {
55-
let self = process.new_subject()
56-
let selector =
57-
process.new_selector()
58-
|> process.selecting(self, function.identity)
42+
// ----- Strategies ----- //
5943

60-
option.map(parent, process.send(_, self))
44+
pub opaque type Strategy(error) {
45+
Strategy(
46+
discover_nodes: fn() -> Result(List(atom.Atom), error),
47+
connect_nodes: fn(List(atom.Atom)) ->
48+
Result(List(atom.Atom), List(#(atom.Atom, node.ConnectError))),
49+
disconnect_nodes: fn(List(atom.Atom)) ->
50+
Result(List(atom.Atom), List(#(atom.Atom, NodeDisconnectError))),
51+
list_nodes: fn() -> Result(List(atom.Atom), error),
52+
)
53+
}
6154

62-
let timer =
63-
process.send_after(self, barnacle.poll_interval, Refresh(None))
55+
pub type NodeDisconnectError {
56+
FailedToDisconnect
57+
LocalNodeIsNotAlive
58+
}
6459

65-
actor.Ready(
66-
selector: selector,
67-
state: State(self:, barnacle:, timer: Some(timer)),
68-
)
69-
}
70-
Error(err) ->
71-
actor.Failed(case err {
72-
ConnectError(_) -> "Failed to connect to nodes"
73-
DisconnectError(_) -> "Failed to disconnect from nodes"
74-
StrategyError(err) ->
75-
"Failed to discover nodes: " <> string.inspect(err)
76-
})
60+
fn default_strategy() -> Strategy(error) {
61+
Strategy(
62+
discover_nodes: fn() { Ok([]) },
63+
connect_nodes:,
64+
disconnect_nodes:,
65+
list_nodes:,
66+
)
67+
}
68+
69+
pub fn new_strategy(
70+
discover_nodes: fn() -> Result(List(atom.Atom), error),
71+
) -> Strategy(error) {
72+
Strategy(..default_strategy(), discover_nodes:)
73+
}
74+
75+
pub fn with_connect_nodes_function(
76+
strategy: Strategy(error),
77+
connect_nodes: fn(List(atom.Atom)) ->
78+
Result(List(atom.Atom), List(#(atom.Atom, node.ConnectError))),
79+
) -> Strategy(error) {
80+
Strategy(..strategy, connect_nodes:)
81+
}
82+
83+
pub fn with_disconnect_nodes_function(
84+
strategy: Strategy(error),
85+
disconnect_nodes: fn(List(atom.Atom)) ->
86+
Result(List(atom.Atom), List(#(atom.Atom, NodeDisconnectError))),
87+
) -> Strategy(error) {
88+
Strategy(..strategy, disconnect_nodes:)
89+
}
90+
91+
pub fn with_list_nodes_function(
92+
strategy: Strategy(error),
93+
list_nodes: fn() -> Result(List(atom.Atom), error),
94+
) -> Strategy(error) {
95+
Strategy(..strategy, list_nodes:)
96+
}
97+
98+
// ----- Default Strategy ----- //
99+
100+
@external(erlang, "barnacle_ffi", "disconnect_from_node")
101+
fn disconnect_node(node: atom.Atom) -> Result(node.Node, NodeDisconnectError)
102+
103+
fn list_nodes() -> Result(List(atom.Atom), error) {
104+
[node.self(), ..node.visible()]
105+
|> list.map(node.to_atom)
106+
|> Ok
107+
}
108+
109+
fn connect_nodes(nodes: List(atom.Atom)) {
110+
nodes
111+
|> list.map(fn(node) {
112+
case node.connect(node) {
113+
Ok(_) -> Ok(node)
114+
Error(err) -> Error(#(node, err))
77115
}
78116
})
117+
|> result_apply
79118
}
80119

81-
pub fn start(
82-
barnacle: Barnacle(error),
83-
parent: Option(Subject(Subject(Message(error)))),
84-
) {
85-
barnacle
86-
|> spec(parent)
87-
|> actor.start_spec
120+
fn disconnect_nodes(nodes: List(atom.Atom)) {
121+
nodes
122+
|> list.map(fn(node) {
123+
case disconnect_node(node) {
124+
Ok(_) -> Ok(node)
125+
Error(err) -> Error(#(node, err))
126+
}
127+
})
128+
|> result_apply
88129
}
89130

90-
pub fn child_spec(
91-
barnacle: Barnacle(error),
92-
parent: Option(Subject(Subject(Message(error)))),
93-
) {
94-
supervisor.worker(fn(_) { start(barnacle, parent) })
131+
// ----- Built-in Strategies ----- //
132+
133+
pub fn custom(strategy: Strategy(error)) -> Barnacle(error) {
134+
Barnacle(..default_barnacle(), strategy:)
135+
}
136+
137+
pub fn local_epmd() -> Barnacle(Nil) {
138+
Barnacle(
139+
..default_barnacle(),
140+
strategy: Strategy(
141+
..default_strategy(),
142+
discover_nodes: local_epmd.discover_nodes,
143+
),
144+
)
145+
}
146+
147+
pub fn epmd(nodes: List(atom.Atom)) -> Barnacle(Nil) {
148+
Barnacle(
149+
..default_barnacle(),
150+
strategy: Strategy(..default_strategy(), discover_nodes: fn() { Ok(nodes) }),
151+
)
95152
}
96153

154+
// ----- Actor ----- //
155+
97156
pub type RefreshResult(error) =
98157
Result(List(atom.Atom), RefreshError(error))
99158

@@ -117,13 +176,36 @@ type State(error) {
117176
)
118177
}
119178

120-
pub type NodeDisconnectError {
121-
FailedToDisconnect
122-
LocalNodeIsNotAlive
179+
pub fn start(
180+
barnacle: Barnacle(error),
181+
parent: Option(Subject(Subject(Message(error)))),
182+
) {
183+
barnacle
184+
|> spec(parent)
185+
|> actor.start_spec
123186
}
124187

125-
@external(erlang, "barnacle_ffi", "disconnect_from_node")
126-
fn disconnect_node(node: atom.Atom) -> Result(node.Node, NodeDisconnectError)
188+
pub fn child_spec(
189+
barnacle: Barnacle(error),
190+
parent: Option(Subject(Subject(Message(error)))),
191+
) {
192+
supervisor.worker(fn(_) { start(barnacle, parent) })
193+
}
194+
195+
pub fn refresh(
196+
subject: Subject(Message(error)),
197+
return: Option(Subject(RefreshResult(error))),
198+
) {
199+
process.send(subject, Refresh(return))
200+
}
201+
202+
pub fn stop(subject: Subject(Message(error)), return: Option(Subject(Nil))) {
203+
process.send(subject, Stop(return))
204+
}
205+
206+
pub fn shutdown(subject: Subject(Message(error)), return: Option(Subject(Nil))) {
207+
process.send(subject, Shutdown(return))
208+
}
127209

128210
pub type RefreshError(error) {
129211
StrategyError(error)
@@ -160,11 +242,42 @@ fn handle_message(message: Message(error), state: State(error)) {
160242
}
161243
}
162244

245+
fn spec(
246+
barnacle: Barnacle(error),
247+
parent: Option(Subject(Subject(Message(error)))),
248+
) {
249+
actor.Spec(init_timeout: 10_000, loop: handle_message, init: fn() {
250+
case refresh_nodes(barnacle) {
251+
Ok(_) -> {
252+
let self = process.new_subject()
253+
let selector =
254+
process.new_selector()
255+
|> process.selecting(self, function.identity)
256+
257+
option.map(parent, process.send(_, self))
258+
259+
let timer =
260+
process.send_after(self, barnacle.poll_interval, Refresh(None))
261+
262+
actor.Ready(
263+
selector: selector,
264+
state: State(self:, barnacle:, timer: Some(timer)),
265+
)
266+
}
267+
Error(err) ->
268+
actor.Failed(case err {
269+
ConnectError(_) -> "Failed to connect to nodes"
270+
DisconnectError(_) -> "Failed to disconnect from nodes"
271+
StrategyError(err) ->
272+
"Failed to discover nodes: " <> string.inspect(err)
273+
})
274+
}
275+
})
276+
}
277+
163278
fn send_response(maybe_client: Option(Subject(a)), response: a) -> Nil {
164-
case maybe_client {
165-
Some(client) -> process.send(client, response)
166-
None -> Nil
167-
}
279+
option.map(maybe_client, process.send(_, response))
280+
Nil
168281
}
169282

170283
fn cancel_timer(timer: Option(process.Timer)) -> Nil {
@@ -174,69 +287,41 @@ fn cancel_timer(timer: Option(process.Timer)) -> Nil {
174287

175288
fn refresh_nodes(barnacle: Barnacle(error)) -> RefreshResult(error) {
176289
use available_nodes <- result.try(
177-
barnacle.discover_nodes()
290+
barnacle.strategy.discover_nodes()
178291
|> result.map(fn(nodes) {
179292
set.from_list(nodes)
180293
|> set.delete(node.self() |> node.to_atom)
181294
})
182295
|> result.map_error(StrategyError),
183296
)
184297

298+
use current_nodes_list <- result.try(
299+
barnacle.strategy.list_nodes()
300+
|> result.map_error(StrategyError),
301+
)
302+
185303
let current_nodes =
186-
node.visible()
187-
|> list.map(node.to_atom)
304+
current_nodes_list
188305
|> set.from_list
189306

190307
let nodes_to_add = set.difference(available_nodes, current_nodes)
191308
let nodes_to_remove = set.difference(current_nodes, available_nodes)
192309

193-
let connect_results = connect_nodes(nodes_to_add |> set.to_list)
310+
let connect_results =
311+
barnacle.strategy.connect_nodes(nodes_to_add |> set.to_list)
194312
use _ <- result.try(connect_results |> result.map_error(ConnectError))
195313

196-
let disconnect_results = disconnect_nodes(nodes_to_remove |> set.to_list)
314+
let disconnect_results =
315+
barnacle.strategy.disconnect_nodes(nodes_to_remove |> set.to_list)
197316
use _ <- result.try(disconnect_results |> result.map_error(DisconnectError))
198317

199-
Ok(node.visible() |> list.map(node.to_atom))
200-
}
201-
202-
fn connect_nodes(nodes: List(atom.Atom)) {
203-
nodes
204-
|> list.map(fn(node) {
205-
case node.connect(node) {
206-
Ok(_) -> Ok(Nil)
207-
Error(err) -> Error(#(node, err))
208-
}
209-
})
210-
|> result_apply
211-
}
212-
213-
fn disconnect_nodes(nodes: List(atom.Atom)) {
214-
nodes
215-
|> list.map(fn(node) {
216-
case disconnect_node(node) {
217-
Ok(_) -> Ok(Nil)
218-
Error(err) -> Error(#(node, err))
219-
}
220-
})
221-
|> result_apply
318+
barnacle.strategy.list_nodes()
319+
|> result.map_error(StrategyError)
222320
}
223321

224322
fn result_apply(results: List(Result(a, b))) -> Result(List(a), List(b)) {
225-
results
226-
|> list.fold(Ok([]), fn(acc, result) {
227-
case result {
228-
Ok(val) -> {
229-
case acc {
230-
Ok(vals) -> Ok([val, ..vals])
231-
Error(_) -> acc
232-
}
233-
}
234-
Error(err) -> {
235-
case acc {
236-
Ok(_) -> Error([err])
237-
Error(errs) -> Error([err, ..errs])
238-
}
239-
}
240-
}
241-
})
323+
case result.partition(results) {
324+
#(vals, []) -> Ok(vals)
325+
#(_, errs) -> Error(errs)
326+
}
242327
}

0 commit comments

Comments
 (0)