Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions nuclearesrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@
import requests
import pypresence
import subprocess
from typing import Dict


VARIABLE_TYPES = {
"CORE_TEMP": float,
"GENERATOR_0_KW": float,
"GENERATOR_1_KW": float,
"GENERATOR_2_KW": float,
"CORE_IMMINENT_FUSION": str
"CORE_IMMINENT_FUSION": str,
"RODS_POS_ORDERED": float
}


def get_all_vars(srv_url: str):
def get_all_vars(srv_url: str) -> Dict[str, float | str]:
"""
Request a list of dvars from the webserver
:param srv_url: URL to the webserver, typically localhost:8785
:return: A dictionary of cast variables and their names
:rtype: Dict[str, float | str]
"""
results = {}
for key, typeof in VARIABLE_TYPES.items():
Expand All @@ -33,9 +36,10 @@ def get_all_vars(srv_url: str):
return results


def find_nucleares():
def find_nucleares() -> psutil.Process | None:
"""
Find the running Nucleares.exe process, if it exists
:rtype: psutil.Process
:return: A psutil Process representing Nucleares
"""
for process in psutil.process_iter():
Expand Down Expand Up @@ -74,6 +78,7 @@ def find_nucleares():
print("Webserver is live, firing up RPC...")


mission = False
presence.connect()
print("Connected. Press Ctrl+C to Exit")
while 1:
Expand All @@ -90,6 +95,11 @@ def find_nucleares():
status = "Generator Offline"
if dvars["CORE_IMMINENT_FUSION"] == "TRUE":
details = "Imminent Meltdown"
if (dvars["CORE_TEMP"] == 20 and dvars["RODS_POS_ORDERED"] == 87.5) or mission:
# I can give you my complete assurance that my work will be back to normal~
mission = True
details = "This mission is too important"
status = "The intruder must be dealt with"
presence.update(
pid=proc.pid,
start=round(starttime),
Expand All @@ -98,13 +108,15 @@ def find_nucleares():
large_image="nucleares"
)
#print(
# f"Sent Update: Core = {dvars['CORE_TEMP']} - Total Pwr = {pwr} - Panic = {dvars['CORE_IMMINENT_FUSION']}"
# f"Sent Update: Core = {dvars['CORE_TEMP']} - Total Pwr = {pwr} - Panic = {dvars['CORE_IMMINENT_FUSION']}",
# f"- Rods: {dvars['RODS_POS_ORDERED']}"
#)
time.sleep(15)
except requests.ConnectionError:
print("Webserver connection lost, trying to re-establish...")
if find_nucleares() is None:
print("Nucleares is closed, RPC will close...")
presence.close()
sys.exit(0)
while 1:
try:
Expand Down