Skip to content

Commit 49dfe00

Browse files
committed
future: transformation into interface
Transformed `future` into public `Future` interface, also `future` become inner implementation. `setError` and `setResponse` became private methods of inner `future` type. Added Future benchmarks. Updated methods and functions with new interface.
1 parent 226e0e8 commit 49dfe00

File tree

13 files changed

+205
-76
lines changed

13 files changed

+205
-76
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
* Added missing IPROTO feature flags to greeting negotiation
1616
(iproto.IPROTO_FEATURE_IS_SYNC, iproto.IPROTO_FEATURE_INSERT_ARROW) (#466).
1717
* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496).
18+
* New `Future` interface (#470).
1819

1920
### Changed
2021

@@ -30,6 +31,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
3031
* Removed deprecated `Connection` methods, related interfaces and tests are updated (#479).
3132
* Replaced the use of optional types in crud with go-option library (#492).
3233
* Future.done replaced with Future.cond (sync.Cond) + Future.finished bool (#496).
34+
* `Future` transform into `future` that implements interface `Future` and become private,
35+
`SetError` and `SetResponse` become private (#470).
3336

3437
### Fixed
3538

MIGRATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ TODO
3232
singleTpl := tpl[0]
3333
```
3434
* Future.done replaced with Future.cond (sync.Cond) + Future.finished bool.
35+
* `Future` is an interface now.
3536

3637
## Migration from v1.x.x to v2.x.x
3738

connection.go

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,11 @@ type Connection struct {
214214
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
215215

216216
type futureList struct {
217-
first *Future
218-
last **Future
217+
first *future
218+
last **future
219219
}
220220

221-
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
221+
func (list *futureList) findFuture(reqid uint32, fetch bool) *future {
222222
root := &list.first
223223
for {
224224
fut := *root
@@ -240,7 +240,7 @@ func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
240240
}
241241
}
242242

243-
func (list *futureList) addFuture(fut *Future) {
243+
func (list *futureList) addFuture(fut *future) {
244244
*list.last = fut
245245
list.last = &fut.next
246246
}
@@ -250,7 +250,7 @@ func (list *futureList) clear(err error, conn *Connection) {
250250
list.first = nil
251251
list.last = &list.first
252252
for fut != nil {
253-
fut.SetError(err)
253+
fut.setError(err)
254254
conn.markDone(fut)
255255
fut, fut.next = fut.next, nil
256256
}
@@ -446,9 +446,9 @@ func (conn *Connection) Handle() interface{} {
446446
return conn.opts.Handle
447447
}
448448

449-
func (conn *Connection) cancelFuture(fut *Future, err error) {
449+
func (conn *Connection) cancelFuture(fut *future, err error) {
450450
if fut = conn.fetchFuture(fut.requestId); fut != nil {
451-
fut.SetError(err)
451+
fut.setError(err)
452452
conn.markDone(fut)
453453
}
454454
}
@@ -871,7 +871,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
871871
return
872872
}
873873

874-
var fut *Future = nil
874+
var fut *future = nil
875875
if code == iproto.IPROTO_EVENT {
876876
if event, err := readWatchEvent(&buf); err == nil {
877877
events <- event
@@ -887,8 +887,8 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
887887
conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header)
888888
} else {
889889
if fut = conn.fetchFuture(header.RequestId); fut != nil {
890-
if err := fut.SetResponse(header, &buf); err != nil {
891-
fut.SetError(fmt.Errorf("failed to set response: %w", err))
890+
if err := fut.setResponse(header, &buf); err != nil {
891+
fut.setError(fmt.Errorf("failed to set response: %w", err))
892892
}
893893
conn.markDone(fut)
894894
}
@@ -925,9 +925,9 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {
925925
}
926926
}
927927

928-
func (conn *Connection) newFuture(req Request) (fut *Future) {
928+
func (conn *Connection) newFuture(req Request) (fut *future) {
929929
ctx := req.Ctx()
930-
fut = NewFuture(req)
930+
fut = newFuture(req)
931931
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
932932
select {
933933
case conn.rlimit <- struct{}{}:
@@ -974,7 +974,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
974974
if ctx != nil {
975975
select {
976976
case <-ctx.Done():
977-
fut.SetError(fmt.Errorf("context is done (request ID %d): %w",
977+
fut.setError(fmt.Errorf("context is done (request ID %d): %w",
978978
fut.requestId, context.Cause(ctx)))
979979
shard.rmut.Unlock()
980980
return
@@ -1007,7 +1007,7 @@ func (conn *Connection) newFuture(req Request) (fut *Future) {
10071007

10081008
// This method removes a future from the internal queue if the context
10091009
// is "done" before the response is come.
1010-
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
1010+
func (conn *Connection) contextWatchdog(fut *future, ctx context.Context) {
10111011
select {
10121012
case <-fut.WaitChan():
10131013
case <-ctx.Done():
@@ -1032,7 +1032,7 @@ func (conn *Connection) decrementRequestCnt() {
10321032
}
10331033
}
10341034

1035-
func (conn *Connection) send(req Request, streamId uint64) *Future {
1035+
func (conn *Connection) send(req Request, streamId uint64) *future {
10361036
conn.incrementRequestCnt()
10371037

10381038
fut := conn.newFuture(req)
@@ -1056,7 +1056,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
10561056
return fut
10571057
}
10581058

1059-
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
1059+
func (conn *Connection) putFuture(fut *future, req Request, streamId uint64) {
10601060
shardn := fut.requestId & (conn.opts.Concurrency - 1)
10611061
shard := &conn.shard[shardn]
10621062
shard.bufmut.Lock()
@@ -1077,7 +1077,7 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10771077
shard.buf.Trunc(blen)
10781078
shard.bufmut.Unlock()
10791079
if f := conn.fetchFuture(reqid); f == fut {
1080-
fut.SetError(err)
1080+
fut.setError(err)
10811081
conn.markDone(fut)
10821082
} else if f != nil {
10831083
/* in theory, it is possible. In practice, you have
@@ -1092,7 +1092,7 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10921092
// packing error is more important than connection
10931093
// error, because it is indication of programmer's
10941094
// mistake.
1095-
fut.SetError(err)
1095+
fut.setError(err)
10961096
}
10971097
}
10981098
return
@@ -1109,28 +1109,28 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
11091109
RequestId: reqid,
11101110
Error: ErrorNo,
11111111
}
1112-
_ = fut.SetResponse(header, nil)
1112+
_ = fut.setResponse(header, nil)
11131113
conn.markDone(fut)
11141114
}
11151115
}
11161116
}
11171117

1118-
func (conn *Connection) markDone(fut *Future) {
1118+
func (conn *Connection) markDone(fut *future) {
11191119
if conn.rlimit != nil {
11201120
<-conn.rlimit
11211121
}
11221122
conn.decrementRequestCnt()
11231123
}
11241124

1125-
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
1125+
func (conn *Connection) fetchFuture(reqid uint32) (fut *future) {
11261126
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
11271127
shard.rmut.Lock()
11281128
fut = conn.getFutureImp(reqid, true)
11291129
shard.rmut.Unlock()
11301130
return fut
11311131
}
11321132

1133-
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
1133+
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *future {
11341134
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
11351135
pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
11361136
// futures with even requests id belong to requests list with nil context
@@ -1168,7 +1168,7 @@ func (conn *Connection) timeouts() {
11681168
} else {
11691169
fut.next = nil
11701170
}
1171-
fut.SetError(ClientError{
1171+
fut.setError(ClientError{
11721172
Code: ErrTimeouted,
11731173
Msg: fmt.Sprintf("client timeout for request %d", fut.requestId),
11741174
})
@@ -1232,11 +1232,10 @@ func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
12321232
//
12331233
// An error is returned if the request was formed incorrectly, or failed to
12341234
// create the future.
1235-
func (conn *Connection) Do(req Request) *Future {
1235+
func (conn *Connection) Do(req Request) Future {
12361236
if connectedReq, ok := req.(ConnectedRequest); ok {
12371237
if connectedReq.Conn() != conn {
1238-
fut := NewFuture(req)
1239-
fut.SetError(errUnknownRequest)
1238+
fut := NewFutureWithErr(req, errUnknownRequest)
12401239
return fut
12411240
}
12421241
}

connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import "time"
55
// Doer is an interface that performs requests asynchronously.
66
type Doer interface {
77
// Do performs a request asynchronously.
8-
Do(req Request) (fut *Future)
8+
Do(req Request) Future
99
}
1010

1111
type Connector interface {

future.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,19 @@ import (
66
"time"
77
)
88

9-
// Future is a handle for asynchronous request.
10-
type Future struct {
9+
// Future is an interface that handle asynchronous request.
10+
type Future interface {
11+
Get() ([]interface{}, error)
12+
GetTyped(result interface{}) error
13+
GetResponse() (Response, error)
14+
WaitChan() <-chan struct{}
15+
}
16+
17+
// future is inner implementation of Future interface.
18+
type future struct {
1119
requestId uint32
1220
req Request
13-
next *Future
21+
next *future
1422
timeout time.Duration
1523
mutex sync.Mutex
1624
resp Response
@@ -20,7 +28,9 @@ type Future struct {
2028
done chan struct{}
2129
}
2230

23-
func (fut *Future) wait() {
31+
var _ = Future(&future{})
32+
33+
func (fut *future) wait() {
2434
fut.mutex.Lock()
2535
defer fut.mutex.Unlock()
2636

@@ -29,7 +39,7 @@ func (fut *Future) wait() {
2939
}
3040
}
3141

32-
func (fut *Future) finish() {
42+
func (fut *future) finish() {
3343
fut.mutex.Lock()
3444
defer fut.mutex.Unlock()
3545

@@ -42,18 +52,34 @@ func (fut *Future) finish() {
4252
fut.cond.Broadcast()
4353
}
4454

45-
// NewFuture creates a new empty Future for a given Request.
46-
func NewFuture(req Request) (fut *Future) {
47-
fut = &Future{}
55+
// NewFutureWithErr returns Future with given error.
56+
func NewFutureWithErr(req Request, err error) Future {
57+
fut := newFuture(req)
58+
fut.setError(err)
59+
return fut
60+
}
61+
62+
// NewFutureWithResponse returns Future with given Response.
63+
func NewFutureWithResponse(req Request, header Header, body io.Reader) (Future, error) {
64+
fut := newFuture(req)
65+
if err := fut.setResponse(header, body); err != nil {
66+
return nil, err
67+
}
68+
return fut, nil
69+
}
70+
71+
// newFuture creates a new empty future for a given Request.
72+
func newFuture(req Request) (fut *future) {
73+
fut = &future{}
4874
fut.done = nil
4975
fut.finished = false
5076
fut.cond.L = &fut.mutex
5177
fut.req = req
5278
return fut
5379
}
5480

55-
// SetResponse sets a response for the future and finishes the future.
56-
func (fut *Future) SetResponse(header Header, body io.Reader) error {
81+
// setResponse sets a response for the future and finishes the future.
82+
func (fut *future) setResponse(header Header, body io.Reader) error {
5783
fut.mutex.Lock()
5884
defer fut.mutex.Unlock()
5985

@@ -78,8 +104,8 @@ func (fut *Future) SetResponse(header Header, body io.Reader) error {
78104
return nil
79105
}
80106

81-
// SetError sets an error for the future and finishes the future.
82-
func (fut *Future) SetError(err error) {
107+
// setError sets an error for the future and finishes the future.
108+
func (fut *future) setError(err error) {
83109
fut.mutex.Lock()
84110
defer fut.mutex.Unlock()
85111

@@ -103,7 +129,7 @@ func (fut *Future) SetError(err error) {
103129
//
104130
// "error" could be Error, if it is error returned by Tarantool,
105131
// or ClientError, if something bad happens in a client process.
106-
func (fut *Future) GetResponse() (Response, error) {
132+
func (fut *future) GetResponse() (Response, error) {
107133
fut.wait()
108134
return fut.resp, fut.err
109135
}
@@ -114,7 +140,7 @@ func (fut *Future) GetResponse() (Response, error) {
114140
//
115141
// "error" could be Error, if it is error returned by Tarantool,
116142
// or ClientError, if something bad happens in a client process.
117-
func (fut *Future) Get() ([]interface{}, error) {
143+
func (fut *future) Get() ([]interface{}, error) {
118144
fut.wait()
119145
if fut.err != nil {
120146
return nil, fut.err
@@ -126,7 +152,7 @@ func (fut *Future) Get() ([]interface{}, error) {
126152
// It is could be much faster than Get() function.
127153
//
128154
// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions).
129-
func (fut *Future) GetTyped(result interface{}) error {
155+
func (fut *future) GetTyped(result interface{}) error {
130156
fut.wait()
131157
if fut.err != nil {
132158
return fut.err
@@ -141,7 +167,7 @@ func init() {
141167
}
142168

143169
// WaitChan returns channel which becomes closed when response arrived or error occurred.
144-
func (fut *Future) WaitChan() <-chan struct{} {
170+
func (fut *future) WaitChan() <-chan struct{} {
145171
fut.mutex.Lock()
146172
defer fut.mutex.Unlock()
147173

0 commit comments

Comments
 (0)