-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchart_session.py
More file actions
210 lines (179 loc) · 8.27 KB
/
Copy pathchart_session.py
File metadata and controls
210 lines (179 loc) · 8.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import socketio
import aiohttp.web
import webbrowser
import asyncio
import threading
import contextlib
from urllib.request import urlopen
import binascii
import random
import string
import os
from .plot import _Figure
from .utils import exception_as_string, wait_with_timeout
from .browser_controller import PuppeteerBrowser, SimpleBrowser
DEFAULT_PORT = 15555
DEFAULT_HOST = "localhost"
STATIC_FILES = os.path.join(os.path.dirname(__file__), "build")
RENDER_TIMEOUT = 5
SAVE_TIMEOUT = 30
BROWSER_LOAD_TIMEOUT = 10
DISCONNECT_TIMEOUT = 4
class ChartSession(object):
def __init__(self, port=DEFAULT_PORT, retina_display=True):
self.port = port
self.host = DEFAULT_HOST
self.token = ''.join([random.choice(string.ascii_letters + string.digits) for n in range(25)]) # TODO
self._figure = _Figure()
self._connections = set()
self._save_file = None
self._retina = retina_display
async def _start_server(self):
try:
# Basically a condition variable without a lock, lets us lock out from shutting down event loop
# and server if we have a pending request for image data or something
self._is_saving = asyncio.Event()
self._is_rendering = asyncio.Event()
self._sio_connecting = asyncio.Event()
self.app = aiohttp.web.Application()
self.sio = socketio.AsyncServer(async_mode='aiohttp', cors_allowed_origins="*") # TODO cors
self.sio.attach(self.app)
# request from the client for updated graph data, for interactive visualization
self.sio.on("get_graph_update", self._update)
# request from the client for updated function data, contains Python to execute on server and
# give back info about exceptions and parameters for sliders
self.sio.on("get_function_update", self._function_update)
# signal from the client that a render triggered by _emit() is finished
self.sio.on("graph_updated", self._graph_updated)
# image data from the client from a save request
self.sio.on("send_image_data", self._receive_image_data)
self.sio.on("connect", self._record_connection)
self.sio.on("disconnect", self._remove_connection)
self.app.router.add_static("/", STATIC_FILES)
self.runner = aiohttp.web.AppRunner(self.app)
await self.runner.setup()
self.site = aiohttp.web.TCPSite(self.runner, self.host, self.port)
await self.site.start()
return True
except Exception as e:
# just try to execute the with body in order, and bail if we get another exception.
# if sio.disconnect fails, we didn't even create the runner.
with contextlib.suppress(Exception):
await self._stop_server()
print (''.join(exception_as_string(e)))
return False
async def _stop_server(self):
try:
await asyncio.wait_for(self.sio.disconnect(True), DISCONNECT_TIMEOUT)
await asyncio.wait_for(self.runner.cleanup(), DISCONNECT_TIMEOUT)
return True
except asyncio.TimeoutError:
print ("Server shutdown timed out. Close extraneous Chromium instances. Forcing shutdown...")
except Exception as e: # won't catch KeyboardInterrupt
print (''.join(exception_as_string(e)))
return False
# context manager syntax handles server spinup and teardown
def __enter__(self):
self.event_loop = asyncio.new_event_loop()
# puppeteer can only run from main thread due to multiprocessing things.
self.main_thread_event_loop = asyncio.new_event_loop()
# run the server both asynchronously and on a separate thread.
asyncio.set_event_loop(self.main_thread_event_loop)
# Start event loop on separate thread. We can throw coroutines at this event loop
# from anywhere (any thread) we want, and wait on them to finish (if necessary) by
# accessing the returned future.
self._el_thread = threading.Thread(target=self.event_loop.run_forever)
self._el_thread.start()
# run start server coroutine on the event loop we just started.
future = asyncio.run_coroutine_threadsafe(self._start_server(), self.event_loop)
# wait for server to start, events to register, etc
if not future.result():
raise RuntimeError("Could not start server")
# would like to use contextlib.ExitStack(), but I don't think we could wait on
# the browser on the EL thread then
# this ensures that we properly exit the server if we get an exception while starting the browser.
try:
future = self.main_thread_event_loop.run_until_complete(self._open_page())
# wait for browser to start and go to page
wait_for_browser = wait_with_timeout(self._sio_connecting, BROWSER_LOAD_TIMEOUT)
future = asyncio.run_coroutine_threadsafe(wait_for_browser, self.event_loop)
if not future.result():
raise RuntimeError("Could not start browser")
except Exception as e:
print ("".join(exception_as_string(e)))
self.__exit__(type(e), e, e.__traceback__)
raise
return self
def __exit__(self, exc_type, exc_value, traceback):
# close browser window and process if possible
res = self.main_thread_event_loop.run_until_complete(self._browser.try_close())
# send server shutdown coroutine to the event loop
future = asyncio.run_coroutine_threadsafe(self._stop_server(), self.event_loop)
# wait for server to finish shutting down
if not future.result():
print ("Could not stop server. Going to force close async tasks anyway...")
# shut down the event loop on the other thread
asyncio.gather(*asyncio.Task.all_tasks()).cancel() # unsure if this is advisable/necessary
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
self.main_thread_event_loop.close()
# join the event loop thread
self._el_thread.join()
async def _open_page(self):
self._browser = PuppeteerBrowser(use_scale_factor=self._retina)
await self._browser.start_browser()
await self._browser.open_page(f"http://{self.host}:{self.port}/index.html")
return True
async def _update(self, sid, params):
ipl = self._figure._get_interactive_plot()
if params and ipl:
plot_result = ipl._update_plot(params)
self.figure_data = self._figure._get_data()
if plot_result["error"]:
self.figure_data["error"] = plot_result["error"]
return await self._emit()
async def _emit(self):
await self.sio.emit("update_graph", self.figure_data)
return await wait_with_timeout(self._is_rendering, RENDER_TIMEOUT)
async def _function_update(self, sid, params):
ipl = self._figure._get_interactive_plot()
if ipl:
res = ipl._get_function_info(params)
await self.sio.emit("update_function", res)
async def _graph_updated(self, sid, params=None):
self._is_rendering.set()
async def _record_connection(self, sid, params):
self._connections.add(sid)
self._sio_connecting.set()
async def _remove_connection(self, sid):
self._connections.remove(sid)
async def _request_image_data(self):
await self.sio.emit("request_image_data")
return await wait_with_timeout(self._is_saving, SAVE_TIMEOUT)
async def _receive_image_data(self, sid, params):
with urlopen(params) as image:
data = image.read()
with open(self._save_file, "wb") as f:
f.write(data)
self._is_saving.set()
return True
async def _set_figure_size(self, width, height):
await self._browser.set_figure_size(width, height)
# right now, there can only be one figure per session, but this could easily be changed.
def get_figure(self):
return self._figure
# set height/width of figure viewport in px to be shown in Chromium.
# the control panel for interactive figures takes up space, so this won't work correctly
# in interactive mode (for now).
def set_figure_size(self, width, height):
future = self.main_thread_event_loop.run_until_complete(self._set_figure_size(width, height))
def show(self, blocking : bool = True):
self.figure_data = self._figure._get_data()
future = asyncio.run_coroutine_threadsafe(self._emit(), self.event_loop)
if not future.result(): # waits until coroutine is executed or raises
raise RuntimeError("Could not update graph data")
if blocking:
input("Press enter to unblock.")
# save the figure as a png
def save(self, filename):
params = {"path": filename}
self.main_thread_event_loop.run_until_complete(self._browser.take_screenshot(params))