-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathmp_io.py
More file actions
76 lines (61 loc) · 2.42 KB
/
mp_io.py
File metadata and controls
76 lines (61 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import io
import threading
from typing import Union
import time
class MpradioIO(io.BytesIO):
def __init__(self):
super().__init__()
self.__lock = threading.Lock()
self.__last_r = 0
self.__size = 0
self.__write_completed = False
self.__terminating = False
self.__silent = False
self.__first_chunk = True
self.chunk_sleep_time = 0
def read(self, size=None):
# generate silence (zeroes) if silent is set
if self.__silent:
if size is None:
size = 1024 * 4
return bytearray(size)
# read the first chunk with no delay, but read the subsequent chunks after a short delay
# (to be set from the player and < player's buffer time) to give it the time to write before the next read
if not self.__first_chunk:
time.sleep(self.chunk_sleep_time)
else:
self.__first_chunk = False
while True:
self.__lock.acquire() # thread-safe lock
self.seek(self.__last_r) # Seek to last read position
result = super().read(size) # read the specified amount of bytes
self.__last_r += len(result) # update the last read position
self.__size = self.seek(0, 2) # seek to the end (prepare for next write)
self.__lock.release()
if len(result) < 1:
# print("MpradioIO error: read 0 bytes.")
time.sleep(0.008) # TODO: maybe align it to bluetooth player buffer time?
if self.__terminating:
break
else:
break
# print("read", len(result), "bytes")
return result
def stop(self):
self.__terminating = True
def silence(self, silent=True):
self.__silent = silent
def write(self, b: Union[bytes, bytearray]):
self.__lock.acquire() # thread-safe lock
super().write(b) # write
self.__lock.release() # release lock
# print("wrote", len(b), "bytes")
def set_write_completed(self):
self.__write_completed = True
def is_read_completed(self):
if self.__size > 0 and self.__size == self.__last_r and self.__write_completed:
return True
else:
return False
def seek_to_start(self):
self.__last_r = 0