-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathencoder.py
More file actions
61 lines (50 loc) · 1.96 KB
/
encoder.py
File metadata and controls
61 lines (50 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import subprocess
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK
from configuration import config
import threading
import time
class Encoder:
stream = None
__sox_compression = ["compand", "0.3,1", "6:-70,-60,-20", "-5", "-90", "0.2"]
__compression_supported_models = ("Pi 3", "Pi 2")
input_stream = None
output_stream = None
ready = threading.Event()
def __init__(self):
self.__sox_cmd = ["sox", "-t", "raw", "-G", "-b", "16", "-e", "signed",
"-c", "2", "-r", "44100", "-", "-t", "wav", "-"]
self.__enable_compression_if_supported()
self.__set_treble()
def run(self):
self.stream = subprocess.Popen(self.__sox_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
while self.stream.pid is None:
print("waiting for the encoder process to spawn...")
time.sleep(0.1)
# set the encoder to non-blocking output:
flags = fcntl(self.stream.stdout, F_GETFL) # get current stdout flags
fcntl(self.stream.stdout, F_SETFL, flags | O_NONBLOCK)
self.input_stream = self.stream.stdin
self.output_stream = self.stream.stdout
self.ready.set()
def reload(self):
self.ready.clear()
self.stop()
self.__init__()
self.run()
def stop(self):
self.stream.kill()
def __enable_compression_if_supported(self):
try:
with open("/sys/firmware/devicetree/base/model") as f:
model = f.read()
for supp in self.__compression_supported_models:
if supp in model:
self.__sox_cmd.extend(self.__sox_compression)
except FileNotFoundError:
pass
def __set_treble(self):
treble = config.get_settings()["PIRATERADIO"]["treble"]
if treble != "0":
self.__sox_cmd.extend(["treble", treble])