-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathremote_import_core.py
More file actions
38 lines (28 loc) · 1.22 KB
/
remote_import_core.py
File metadata and controls
38 lines (28 loc) · 1.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import importlib.util
import sys
from dataclasses import dataclass
from importlib.abc import MetaPathFinder
from transport_catalog import TransportCatalog
@dataclass
class TransportChoice(TransportCatalog):
def __init__(self, transport, *args, **kwargs):
self.transport_obj = self.__getattribute__(transport)(*args, **kwargs)
class RemoteImport(MetaPathFinder):
session = None
def __init__(self, transport):
if RemoteImport.session is None:
RemoteImport.session = transport
self.current_module_code: str = ""
def find_module(self, name, path=None):
remote_code = RemoteImport.session.get(name)
if remote_code is not None:
self.current_module_code = remote_code
return self
def load_module(self, name):
spec = importlib.util.spec_from_loader(name, loader=None, origin=RemoteImport.session.source, is_package=True)
new_module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = new_module
exec(self.current_module_code, new_module.__dict__)
return new_module
def init(transport, *args, **kwargs):
sys.meta_path.append(RemoteImport(TransportChoice(transport, *args, **kwargs).transport_obj))