-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathServerRouter.go
More file actions
216 lines (169 loc) · 5.19 KB
/
ServerRouter.go
File metadata and controls
216 lines (169 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package main
import (
p2p "./p2pmessage"
"bufio"
"encoding/json"
"fmt"
"math/rand"
"net"
"os"
"strconv"
"time"
)
const (
HOST = ""
PORT = "5556"
TYPE = "tcp"
SERVER_ROUTER = ""
)
func checkErr(err error, message string) {
if err != nil {
fmt.Println(message, err.Error())
os.Exit(1)
}
}
type Metric struct {
Type string
Value int64
OP string
Units string
}
func MetricThread(metricChannel <-chan Metric, calcMetricsSignal <-chan bool) {
metricsByType := map[string][]Metric{}
for {
select {
case metricData := <-metricChannel:
if metricsByType[metricData.Type] == nil {
metricsByType[metricData.Type] = []Metric{}
}
metricsByType[metricData.Type] = append(metricsByType[metricData.Type], metricData)
case calcSignal := <-calcMetricsSignal:
if calcSignal {
finalMetrics := []string{}
//CALC METRICS
for metricType := range metricsByType {
metricDataForType := metricsByType[metricType]
operation := metricDataForType[0].OP
metricType := metricDataForType[0].Type
metricUnits := metricDataForType[0].Units
var totalValue int64 = 0
for _, metricData := range metricDataForType {
totalValue += metricData.Value
}
if operation == "TOTAL" {
finalMetrics = append(finalMetrics, operation+" - "+metricType+" = "+strconv.FormatInt(totalValue, 10)+" "+metricUnits)
} else if operation == "AVG" {
finalMetrics = append(finalMetrics, operation+" - "+metricType+" = "+strconv.FormatInt(totalValue/int64(len(metricDataForType)), 10)+" "+metricUnits)
}
}
fmt.Println("\n===== METRICS =====")
for _, finalMetric := range finalMetrics {
fmt.Println(finalMetric)
}
}
default:
}
}
}
var metricsChannel = make(chan Metric, 1)
var metricsSignalChannel = make(chan bool, 1)
//Tracks IP Addresses of all nodes
var nodeRegistry []string
var host string = ""
var serverPort string = ""
var sRouter_addr string
//Command line arguments: [HOST, PORT, OTHER_SERVER_ROUTER_ADDRESS]
func main() {
if len(os.Args) == 4 {
//Parse input arguments
host = os.Args[1]
serverPort = os.Args[2]
sRouter_addr = os.Args[3]
} else {
fmt.Println("No CMD args detected. Using default settings.")
host = HOST
fmt.Print("Enter port to use (leave empty for default - WILL NOT WORK WITH MULTIPLE ROUTERS): ")
reader := bufio.NewScanner(os.Stdin)
//Block until the enter key is pressed, then read any new content into <text>
reader.Scan()
serverPort = reader.Text()
if len(serverPort) == 0 {
serverPort = PORT
}
//Print out a prompt to the client
fmt.Print("Other Server Router Address (leave empty for default): ")
//Block until the enter key is pressed, then read any new content into <text>
reader.Scan()
sRouter_addr = reader.Text()
if len(sRouter_addr) == 0 {
sRouter_addr = SERVER_ROUTER
}
}
rand.Seed(time.Now().UnixNano() / int64(time.Millisecond))
//Initialize the client-server routing registry
nodeRegistry = []string{}
p2p.ListenerPort = serverPort
//Start concurrent session monitor
//go MetricThread(metricsChannel, metricsSignalChannel)
//Set up central listener
listener, err := net.Listen(TYPE, host+":"+p2p.ListenerPort)
checkErr(err, "Failed to create listener.")
defer listener.Close()
fmt.Println("[SERVERROUTER] LISTENING ON " + TYPE + "://" + host + ":" + p2p.ListenerPort)
/*go func() {
for {
metricsSignalChannel <- true
time.Sleep(5 * time.Second)
}
}()*/
for {
//Wait for a connection
connection, err := listener.Accept()
checkErr(err, "Error accepting connection.")
//Handle the connection in a separate thread
go handleConnection(connection)
}
fmt.Println("[SERVERROUTER] SHUTTING DOWN")
}
func handleConnection(connection net.Conn) {
//Wait for the initialization packet, which specifies whether the remote machine
//is a server or a client
defer connection.Close()
if sRouter_addr == "" {
fmt.Println("No other server routers found. Skipping this connection.")
return
}
var msg p2p.Message
dec := json.NewDecoder(connection)
for {
if err := dec.Decode(&msg); err != nil {
p2p.TrackEOF()
continue
}
msg.Conn = connection
switch msg.Type {
case p2p.IDENTIFY:
fmt.Println("[SERVERROUTER] IDENTIFY FROM: " + msg.Src_IP)
//Add sender to list of registered nodes
nodeRegistry = append(nodeRegistry, msg.Src_IP)
//Send acknowledgement packet back to node
msg.Reply(p2p.ACKNOWLEDGE, "", msg.Src_IP)
break
case p2p.FIND_PEER:
fmt.Println("[SERVERROUTER] FIND_PEER FROM: " + msg.Src_IP)
// Send request to other server router to pick a node for the p2p connection
findNodeResponse := p2p.Send(p2p.PICK_NODE, "", sRouter_addr)
findNodeResponse.Conn.Close()
// Send the IP of the picked node to the peer that originally requested a connection
msg.Reply(p2p.FIND_PEER_RESPONSE, findNodeResponse.MSG, msg.Src_IP)
break
case p2p.PICK_NODE:
fmt.Println("[SERVERROUTER] PICK_NODE FROM: " + msg.Src_IP)
//pick random peer
selected_peer_ip := nodeRegistry[rand.Intn(len(nodeRegistry))]
msg.Reply(p2p.PICK_NODE_RESPONSE, selected_peer_ip, msg.Src_IP)
break
}
}
fmt.Println("[SERVERROUTER] CLOSING CONNECTION THREAD TO TYPE: " + msg.MSG)
}