Skip to content

Commit f051870

Browse files
committed
refactored
1 parent 69999e2 commit f051870

File tree

7 files changed

+186
-110
lines changed

7 files changed

+186
-110
lines changed

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"type": "go",
1010
"request": "launch",
1111
"mode": "auto",
12-
"program": "${workspaceFolder}/cmd/yahoo-live/main.go"
12+
"program": "${workspaceFolder}/examples/simple_message_handler.go"
1313
}
1414
]
1515
}

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Yahoo Live Go Websocket
2+
3+
This package allow you to connect and retrieve websocket updates from Yahoo Finance tickers.
4+
5+
NOTE: Delay may wary and but can be a few seconds after real execution time. Should therefor not be used in applications that require fast and latest messages from the exchange.
6+
7+
## Installation
8+
9+
```bash
10+
go get github.com/open-wallstreet/yahoo-live
11+
```
12+
13+
### Example usage
14+
15+
See `examples` folder for more info
16+
17+
```go
18+
import (
19+
"fmt"
20+
"time"
21+
22+
"github.com/open-wallstreet/yahoo-live/pkg/yahoo"
23+
"github.com/open-wallstreet/yahoo-live/proto"
24+
"go.uber.org/zap"
25+
)
26+
27+
func main() {
28+
logger, _ = zap.NewDevelopment()
29+
con, err := yahoo.NewWebsocket(logger.Sugar(), []string{"KIND-SDB.ST"})
30+
if err != nil {
31+
panic(err)
32+
}
33+
con.AddMessageHandler(on_msg)
34+
con.Wait()
35+
}
36+
func on_msg(message *proto.Yaticker) {
37+
println(fmt.Sprintf("%s: %s", time.Unix(message.Time/1000, 0).String(), message.String()))
38+
}
39+
```
Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package main
22

33
import (
4+
"fmt"
45
"os"
6+
"time"
57

6-
"github.com/open-wallstreet/yahoo-live/internal/yahoo"
7-
"github.com/open-wallstreet/yahoo-live/proto"
8+
"github.com/open-wallstreet/yahoo-live/pkg/yahoo"
9+
"github.com/open-wallstreet/yahoo-live/pkg/yahoo/proto"
810
"go.uber.org/zap"
911
)
1012

@@ -21,10 +23,16 @@ func createLogger() *zap.SugaredLogger {
2123
}
2224

2325
func on_msg(message *proto.Yaticker) {
24-
println(message.String())
26+
println(fmt.Sprintf("%s: %s", time.Unix(message.Time/1000, 0).String(), message.String()))
2527
}
2628

2729
func main() {
2830
logger := createLogger()
29-
yahoo.NewYahooWebsocket(logger, []string{"AMZN", "AAPL", "TSLA", "A", "AA"}, on_msg)
31+
con, err := yahoo.NewWebsocket(logger, []string{"KIND-SDB.ST"})
32+
if err != nil {
33+
panic(err)
34+
}
35+
con.AddMessageHandler(on_msg)
36+
37+
con.Wait()
3038
}

internal/yahoo/websocket.go

Lines changed: 0 additions & 102 deletions
This file was deleted.
Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/yahoo/websocket.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package yahoo
2+
3+
import (
4+
"encoding/base64"
5+
"encoding/json"
6+
"log"
7+
"os"
8+
"os/signal"
9+
10+
"github.com/gorilla/websocket"
11+
yaproto "github.com/open-wallstreet/yahoo-live/pkg/yahoo/proto"
12+
"go.uber.org/zap"
13+
"google.golang.org/protobuf/proto"
14+
)
15+
16+
const url = "wss://streamer.finance.yahoo.com/"
17+
18+
type Subscription struct {
19+
Subscribe []string `json:"subscribe"`
20+
}
21+
22+
type SocketConnection struct {
23+
logger *zap.SugaredLogger
24+
interruptSignal chan os.Signal
25+
done chan struct{}
26+
on_message_handlers []func(message *yaproto.Yaticker)
27+
connection *websocket.Conn
28+
}
29+
30+
// Add new message handler function that will be called for all new messages websocket receives
31+
func (s *SocketConnection) AddMessageHandler(f func(message *yaproto.Yaticker)) {
32+
s.on_message_handlers = append(s.on_message_handlers, f)
33+
}
34+
35+
// Wait runs infinite for loop until a interuptSignal or close singal
36+
// has been received by the message handlers. Will excited after running Close()
37+
func (s *SocketConnection) Wait() {
38+
for {
39+
select {
40+
case <-s.done:
41+
return
42+
case <-s.interruptSignal:
43+
log.Println("interrupt")
44+
// Cleanly close the connection by sending a close message and then
45+
// waiting (with timeout) for the server to close the connection.
46+
err := s.connection.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
47+
if err != nil {
48+
s.logger.Errorf("write close:", err)
49+
return
50+
}
51+
<-s.done
52+
return
53+
}
54+
55+
}
56+
}
57+
58+
// Closes websocket connection and send done signal for gracefull shutdown
59+
func (s *SocketConnection) Close() {
60+
s.connection.Close()
61+
close(s.done)
62+
}
63+
64+
func (s *SocketConnection) handleMessages() {
65+
defer s.Close()
66+
for {
67+
_, message, err := s.connection.ReadMessage()
68+
if err != nil {
69+
switch err.(type) {
70+
case *websocket.CloseError:
71+
s.logger.Info("received close message")
72+
return
73+
default:
74+
s.logger.Errorf("read: %v", err)
75+
return
76+
}
77+
78+
}
79+
msg, err := Base64Decode(message)
80+
if err != nil {
81+
s.logger.Errorf("failed to decode: %v", err)
82+
return
83+
}
84+
ticker := &yaproto.Yaticker{}
85+
if err := proto.Unmarshal(msg, ticker); err != nil {
86+
s.logger.Errorf("failed to parse %v", msg)
87+
}
88+
for _, handler := range s.on_message_handlers {
89+
handler(ticker)
90+
}
91+
}
92+
}
93+
94+
// Creates a new yahoo.SocketConnection and subscribe to all tickers listed in array
95+
func NewWebsocket(logger *zap.SugaredLogger, tickers []string) (*SocketConnection, error) {
96+
interrupt := make(chan os.Signal, 1)
97+
signal.Notify(interrupt, os.Interrupt)
98+
connection, _, err := websocket.DefaultDialer.Dial(url, nil)
99+
if err != nil {
100+
logger.Panicf("failed to connect to websocket %v", err)
101+
}
102+
socketConnection := &SocketConnection{
103+
logger: logger,
104+
interruptSignal: interrupt,
105+
on_message_handlers: []func(message *yaproto.Yaticker){},
106+
connection: connection,
107+
done: make(chan struct{}),
108+
}
109+
110+
go socketConnection.handleMessages()
111+
112+
msg := Subscription{
113+
Subscribe: tickers,
114+
}
115+
116+
b, err := json.Marshal(msg)
117+
if err != nil {
118+
logger.Errorf("failed to marshal subscription message:", err)
119+
return nil, err
120+
}
121+
err = connection.WriteMessage(websocket.TextMessage, b)
122+
if err != nil {
123+
logger.Infof("failed to send subscription message:", err)
124+
return nil, err
125+
}
126+
return socketConnection, nil
127+
}
128+
129+
func Base64Decode(str []byte) ([]byte, error) {
130+
return base64.StdEncoding.DecodeString(string(str))
131+
}

proto/yaticker.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
syntax = "proto3";
22

3-
option go_package = "proto/";
3+
option go_package = "pkg/yahoo/proto/";
44

55
message yaticker {
66

0 commit comments

Comments
 (0)