forked from myugan/firecracker-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
139 lines (108 loc) · 3.87 KB
/
utils.py
File metadata and controls
139 lines (108 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import random
import string
import signal
import requests
import subprocess
import socket
def run(cmd, **kwargs):
"""Execute a shell command with configurable options.
Provides a convenient wrapper around subprocess.run with sensible defaults
for shell command execution.
Args:
cmd (str): Shell command to execute
**kwargs: Additional arguments to pass to subprocess.run
Defaults if not specified:
- shell=True: Enable shell command interpretation
- capture_output=True: Capture stdout/stderr
- text=True: Return string output instead of bytes
Returns:
subprocess.CompletedProcess: Process completion information including:
- returncode: Exit code of the command
- stdout: Standard output (if captured)
- stderr: Standard error (if captured)
Note:
Default behavior can be overridden by passing explicit kwargs
"""
default_kwargs = {
'shell': True,
'capture_output': True,
'text': True
}
default_kwargs.update(kwargs)
return subprocess.run(cmd, **default_kwargs)
def safe_kill(pid: int, sig: signal.Signals = signal.SIGTERM) -> bool:
"""Safely kill a process."""
try:
os.kill(pid, sig)
return True
except ProcessLookupError:
return True
except PermissionError:
return False
def generate_id() -> str:
"""Generate a random ID for the MicroVM instance.
Returns:
str: A random identifier (exactly 8 lowercase alphanumeric characters)
"""
chars = string.ascii_lowercase + string.digits
generated_id = ''.join(random.choice(chars) for _ in range(8))
return generated_id
def requires_id(func):
"""Decorator to check if VMM ID is provided."""
def wrapper(*args, **kwargs):
id = kwargs.get('id') or (len(args) > 1 and args[1])
if not id:
raise RuntimeError("VMM ID required")
return func(*args, **kwargs)
return wrapper
def validate_hostname(hostname):
"""Validate hostname according to RFC 1123."""
import re
if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?$', hostname):
raise ValueError(f"Invalid hostname: {hostname}")
def validate_ip_address(ip_addr: str) -> bool:
"""Validate an IP address according to standard format rules.
Args:
ip_addr (str): The IP address to validate
Returns:
bool: True if the IP address is valid
Raises:
Exception: If the IP address is invalid, with a descriptive message
"""
if not ip_addr:
raise Exception("IP address cannot be empty")
try:
# Test IP address format
socket.inet_aton(ip_addr)
# Check if the IP has exactly 4 parts
ip_parts = ip_addr.split('.')
if len(ip_parts) != 4:
raise Exception(f"Invalid IP address format: {ip_addr}")
# Check if any part is outside the valid range
for part in ip_parts:
if not (0 <= int(part) <= 255):
raise Exception(f"IP address contains invalid octet: {part}")
# Check if it's a reserved address (like .0 ending)
if ip_parts[-1] == '0':
raise Exception(
f"IP address with .0 suffix is reserved: {ip_addr}"
)
return True
except (socket.error, ValueError):
raise Exception(f"Invalid IP address: {ip_addr}")
def get_public_ip(timeout: int = 5):
"""Get the public IP address."""
URLS = [
"https://ifconfig.me",
"https://ipinfo.io/ip",
"https://api.ipify.org"
]
for url in URLS:
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response.text.strip()
except requests.RequestException:
continue
raise RuntimeError("Failed to get public IP")