forked from aarond10/https_dns_proxy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDnsTcpClient.py
More file actions
65 lines (56 loc) · 2.31 KB
/
DnsTcpClient.py
File metadata and controls
65 lines (56 loc) · 2.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import socket
class DnsTcpClient:
ROBOT_LIBRARY_SCOPE = 'GLOBAL'
google_dns_request_byte_parts = [
b'\x00\x1C', # 2 byte: DNS request length: 28 byte
b'\x00\x01\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00', # 12 byte
b'\x06google\x03com\x00', # 12 byte
b'\x00\x01\x00\x01' # 4 byte
]
def __init__(self):
self.client_socket = None
def open_tcp_client_connection(self, host, port):
try:
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.settimeout(10)
self.client_socket.connect((host, int(port)))
print(f"Successfully connected to {host}:{port}")
except Exception as e:
raise Exception(f"Failed to open TCP connection: {e}")
def send_tcp_request_parts(self, *parts):
if not self.client_socket:
raise Exception("No TCP connection open. Call 'Open Tcp Client Connection' first.")
try:
msg = b''
for part in parts:
msg += DnsTcpClient.google_dns_request_byte_parts[int(part) - 1]
self.client_socket.sendall(msg)
print(f"Sent parts {' '.join(parts)}, bytes: {len(msg)}")
except Exception as e:
raise Exception(f"Failed to send message: {e}")
def receive_tcp_response(self):
if not self.client_socket:
raise Exception("No TCP connection open. Call 'Open Tcp Client Connection' first.")
msg = b''
run = True
dnslen = 0
while run:
try:
data = self.client_socket.recv(1024)
if not data:
raise Exception(f"Connection closed!")
print(f"Received {len(data)} bytes")
msg += data
if not dnslen:
dnslen = msg[0] * 256 + msg[1]
print(f"DNS response length: {dnslen} bytes")
if len(msg) == dnslen + 2:
run = False
except Exception as e:
raise Exception(f"Failed to receive message: {e}") from e
return msg[2:]
def close_tcp_client_connection(self):
if self.client_socket:
self.client_socket.close()
self.client_socket = None
print("TCP connection closed.")