-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevent_log_reader.py
More file actions
247 lines (203 loc) · 8.52 KB
/
event_log_reader.py
File metadata and controls
247 lines (203 loc) · 8.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import subprocess
import re
from datetime import datetime
def get_usb_events():
"""
Read Windows Event Logs for USB insertion/removal events.
Uses PowerShell Get-WinEvent for reliable event log access.
"""
events = []
# Method 1: Kernel-PnP Configuration log (most reliable for USB events)
events.extend(_read_kernel_pnp_events())
# Method 2: System log for storage events
events.extend(_read_system_storage_events())
# Method 3: Setupapi logs (device installation history)
events.extend(_read_setupapi_events())
# Sort by time (most recent first)
events.sort(key=lambda x: x['time'], reverse=True)
# Remove duplicates based on time and device
seen = set()
unique_events = []
for event in events:
key = (event['time'][:19], event['device_id'], event['action'])
if key not in seen:
seen.add(key)
unique_events.append(event)
return unique_events
def _read_kernel_pnp_events():
"""
Read Microsoft-Windows-Kernel-PnP/Configuration log for device events.
Event IDs: 400 (configured), 410 (started), 420 (deleted)
"""
events = []
ps_cmd = '''
try {
Get-WinEvent -LogName "Microsoft-Windows-Kernel-PnP/Configuration" -MaxEvents 500 -ErrorAction Stop |
Where-Object { $_.Message -match "USB|USBSTOR" } |
ForEach-Object {
$action = switch ($_.Id) {
400 { "configured" }
410 { "started" }
420 { "removed" }
430 { "pending" }
default { "unknown" }
}
"$($_.TimeCreated.ToString('yyyy-MM-ddTHH:mm:ss'))|$($_.Id)|$action|$($_.Message -replace '\\r?\\n', ' ')"
}
} catch {
# Log not available or access denied
}
'''
try:
result = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_cmd],
capture_output=True, text=True, timeout=60
)
if result.returncode == 0 and result.stdout.strip():
for line in result.stdout.strip().split('\n'):
if '|' in line:
parts = line.split('|', 3)
if len(parts) >= 4:
time_str, event_id, action, message = parts
device_id = _extract_device_id(message)
if device_id and ("USBSTOR" in device_id or "USB\\VID" in device_id):
# Map action to inserted/removed
if action in ["configured", "started"]:
action = "inserted"
elif action == "removed":
action = "removed"
else:
action = "activity"
events.append({
'time': time_str,
'action': action,
'device_id': device_id,
'event_id': event_id,
'source': 'Kernel-PnP'
})
except subprocess.TimeoutExpired:
print("Timeout reading Kernel-PnP events")
except Exception as e:
print(f"Error reading Kernel-PnP events: {e}")
return events
def _read_system_storage_events():
"""
Read System log for storage/disk related events.
"""
events = []
ps_cmd = '''
try {
Get-WinEvent -LogName "System" -MaxEvents 1000 -ErrorAction Stop |
Where-Object {
($_.ProviderName -match "disk|partition|storage|volmgr") -and
($_.Message -match "USB|removable|USBSTOR")
} |
ForEach-Object {
"$($_.TimeCreated.ToString('yyyy-MM-ddTHH:mm:ss'))|$($_.Id)|$($_.ProviderName)|$($_.Message -replace '\\r?\\n', ' ')"
}
} catch {
# Log not available
}
'''
try:
result = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_cmd],
capture_output=True, text=True, timeout=60
)
if result.returncode == 0 and result.stdout.strip():
for line in result.stdout.strip().split('\n'):
if '|' in line:
parts = line.split('|', 3)
if len(parts) >= 4:
time_str, event_id, provider, message = parts
device_id = _extract_device_id(message)
events.append({
'time': time_str,
'action': 'activity',
'device_id': device_id or 'USB Storage',
'event_id': event_id,
'source': provider
})
except subprocess.TimeoutExpired:
print("Timeout reading System events")
except Exception as e:
print(f"Error reading System events: {e}")
return events
def _read_setupapi_events():
"""
Read device installation timestamps from setupapi logs.
These logs contain first-time device installation info.
"""
events = []
# Read SetupAPI device log
setupapi_paths = [
r"C:\Windows\INF\setupapi.dev.log",
r"C:\Windows\setupapi.log"
]
for log_path in setupapi_paths:
try:
with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Find USB device installations
# Pattern: >>> [Device Install (Hardware initiated) - USB\VID_...]
pattern = r'>>>.*\[Device Install.*USB\\VID_([0-9A-Fa-f]+)&PID_([0-9A-Fa-f]+)\\([^\]]+)\]'
# Also look for timestamps
current_section = None
current_time = None
for line in content.split('\n'):
# Check for section header
section_match = re.search(r'>>>.*\[(Device Install.*USB.*)\]', line, re.IGNORECASE)
if section_match:
current_section = section_match.group(1)
# Check for timestamp
time_match = re.search(r'>>>.*Section start (\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})', line)
if time_match and current_section and 'USB' in current_section:
time_str = time_match.group(1).replace('/', '-').replace(' ', 'T')
# Extract device ID
dev_match = re.search(r'USB\\VID_([0-9A-Fa-f]+)&PID_([0-9A-Fa-f]+)\\([^\]\s]+)', current_section)
if dev_match:
vid, pid, serial = dev_match.groups()
device_id = f"USB\\VID_{vid}&PID_{pid}\\{serial}"
events.append({
'time': time_str,
'action': 'first_install',
'device_id': device_id,
'event_id': 'SetupAPI',
'source': 'SetupAPI'
})
current_section = None
except FileNotFoundError:
continue
except Exception as e:
print(f"Error reading {log_path}: {e}")
return events
def _extract_device_id(message):
"""
Extract device ID (USBSTOR or USB VID/PID path) from event message.
"""
# Try to find USBSTOR path
usbstor_match = re.search(r'USBSTOR\\[^\s\\]+\\[^\s\\]+', message)
if usbstor_match:
return usbstor_match.group(0)
# Try to find USB VID/PID path
usb_match = re.search(r'USB\\VID_[0-9A-Fa-f]+&PID_[0-9A-Fa-f]+\\[^\s\\]+', message)
if usb_match:
return usb_match.group(0)
# Try to find just the device identifier
device_match = re.search(r'Device ([^\s]+USBSTOR[^\s]+)', message)
if device_match:
return device_match.group(1)
device_match2 = re.search(r'Device ([^\s]+USB[^\s]+)', message)
if device_match2:
return device_match2.group(1)
return ""
def get_device_timestamps(serial):
"""
Get first and last insertion timestamps for a specific device serial.
"""
events = get_usb_events()
device_events = [e for e in events if serial in e.get('device_id', '')]
if not device_events:
return None, None
times = [e['time'] for e in device_events]
return min(times), max(times)