diff --git a/runner/precise_runner/runner.py b/runner/precise_runner/runner.py index 74066b7..986f331 100644 --- a/runner/precise_runner/runner.py +++ b/runner/precise_runner/runner.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import atexit + +import time from subprocess import PIPE, Popen from threading import Thread, Event @@ -72,17 +74,25 @@ class ListenerEngine(Engine): class ReadWriteStream(object): - """Class used to support writing binary audio data at any pace""" - def __init__(self, s=b''): + """ + Class used to support writing binary audio data at any pace, + optionally chopping when the buffer gets too large + """ + def __init__(self, s=b'', chop_samples=-1): self.buffer = s self.write_event = Event() + self.chop_samples = chop_samples def read(self, n=-1, timeout=None): if n == -1: n = len(self.buffer) + if 0 < self.chop_samples < len(self.buffer): + self.buffer = self.buffer[self.chop_samples:] + return_time = time.time() + (float('inf') if timeout is None else timeout) while len(self.buffer) < n: self.write_event.clear() - self.write_event.wait(timeout) + if not self.write_event.wait(return_time - time.time()): + return b'' chunk = self.buffer[:n] self.buffer = self.buffer[n:] return chunk diff --git a/runner/test/test_runner.py b/runner/test/test_runner.py new file mode 100644 index 0000000..a8e6d6e --- /dev/null +++ b/runner/test/test_runner.py @@ -0,0 +1,18 @@ +from precise_runner import ReadWriteStream + + +class TestReadWriteStream: + def test_read_write(self): + s = ReadWriteStream(b'1234567890') + assert s.read(2) == b'12' + assert s.read(2) == b'34' + s.write(b'hi') + assert s.read() == b'567890hi' + s.write(b'hello') + assert s.read() == b'hello' + assert s.read(1, timeout=0.1) == b'' + + def test_chop(self): + s = ReadWriteStream(chop_samples=10) + s.write(b'1234567890hello') + assert s.read(5) == b'hello'