From 358a950aac8d732473afe66c6d9064ef352084a3 Mon Sep 17 00:00:00 2001 From: Scott Petersen Date: Mon, 20 Mar 2017 16:16:14 -0700 Subject: [PATCH] Reworked SerialDevice to use select as well as associated test patches and 2/3 support. --- alarmdecoder/devices.py | 72 ++++++++++++++++------------ test/test_devices.py | 102 ++++++++++++++++++++++------------------ 2 files changed, 100 insertions(+), 74 deletions(-) diff --git a/alarmdecoder/devices.py b/alarmdecoder/devices.py index 4345a60..586ece2 100644 --- a/alarmdecoder/devices.py +++ b/alarmdecoder/devices.py @@ -822,15 +822,18 @@ class SerialDevice(Device): :returns: character read from the device :raises: :py:class:`~alarmdecoder.util.CommError` """ - ret = None + data = '' try: - ret = self._device.read(1).decode('utf-8') + read_ready, _, _ = select.select([self._device.fileno()], [], [], 0.5) + + if len(read_ready) != 0: + data = self._device.read(1) except serial.SerialException as err: raise CommError('Error reading from device: {0}'.format(str(err)), err) - return ret + return data.decode('utf-8') def read_line(self, timeout=0.0, purge_buffer=False): """ @@ -854,39 +857,54 @@ class SerialDevice(Device): if purge_buffer: self._buffer = b'' - got_line, ret = False, None + got_line, data = False, '' timer = threading.Timer(timeout, timeout_event) if timeout > 0: timer.start() + leftovers = b'' try: - while timeout_event.reading: - buf = self._device.read(1) + while timeout_event.reading and not got_line: + read_ready, _, _ = select.select([self._device.fileno()], [], [], 0.5) + if len(read_ready) == 0: + continue - # NOTE: AD2SERIAL apparently sends down \xFF on boot. - if buf != b'' and buf != b"\xff": - ub = bytes_hack(buf) + bytes_avail = 0 + if hasattr(self._device, "in_waiting"): + bytes_avail = self._device.in_waiting + else: + bytes_avail = self._device.inWaiting() - self._buffer += ub + buf = self._device.read(bytes_avail) - if ub == b"\n": - self._buffer = self._buffer.rstrip(b"\r\n") + for idx in range(len(buf)): + c = buf[idx] - if len(self._buffer) > 0: - got_line = True - break - else: - time.sleep(0.01) + ub = bytes_hack(c) + if sys.version_info > (3,): + ub = bytes([ub]) + + # NOTE: AD2SERIAL and AD2PI apparently sends down \xFF on boot. + if ub != b'' and ub != b"\xff": + self._buffer += ub + + if ub == b"\n": + self._buffer = self._buffer.strip(b"\r\n") + + if len(self._buffer) > 0: + got_line = True + leftovers = buf[idx:] + break except (OSError, serial.SerialException) as err: raise CommError('Error reading from device: {0}'.format(str(err)), err) else: if got_line: - ret, self._buffer = self._buffer, b'' + data, self._buffer = self._buffer, leftovers - self.on_read(data=ret) + self.on_read(data=data) else: raise TimeoutError('Timeout while waiting for line terminator.') @@ -894,7 +912,7 @@ class SerialDevice(Device): finally: timer.cancel() - return ret.decode('utf-8') + return data.decode('utf-8') def purge(self): """ @@ -1122,12 +1140,12 @@ class SocketDevice(Device): :returns: character read from the device :raises: :py:class:`~alarmdecoder.util.CommError` """ - data = None + data = '' try: - read_ready, _, _ = select.select([self._device], [], []) + read_ready, _, _ = select.select([self._device], [], [], 0.5) - if (len(read_ready) != 0): + if len(read_ready) != 0: data = self._device.recv(1) except socket.error as err: @@ -1165,10 +1183,9 @@ class SocketDevice(Device): try: while timeout_event.reading: - read_ready, _, _ = select.select([self._device], [], []) + read_ready, _, _ = select.select([self._device], [], [], 0.5) - if (len(read_ready) == 0): - time.sleep(0.01) + if len(read_ready) == 0: continue buf = self._device.recv(1) @@ -1185,9 +1202,6 @@ class SocketDevice(Device): got_line = True break - else: - time.sleep(0.01) - except socket.error as err: raise CommError('Error reading from device: {0}'.format(str(err)), err) diff --git a/test/test_devices.py b/test/test_devices.py index 554c336..8c47719 100644 --- a/test/test_devices.py +++ b/test/test_devices.py @@ -6,6 +6,7 @@ try: except: from pyftdi.ftdi import Ftdi, FtdiError from usb.core import USBError, Device as USBCoreDevice +import sys import socket import time import tempfile @@ -14,6 +15,24 @@ import select from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice from alarmdecoder.util import NoDeviceError, CommError, TimeoutError +# Optional FTDI tests +try: + from pyftdi.pyftdi.ftdi import Ftdi, FtdiError + from usb.core import USBError, Device as USBCoreDevice + + have_pyftdi = True + +except ImportError: + have_pyftdi = False + +# Optional SSL tests +try: + from OpenSSL import SSL, crypto + + have_openssl = True +except ImportError: + have_openssl = False + class TestUSBDevice(TestCase): def setUp(self): @@ -120,31 +139,6 @@ class TestUSBDevice(TestCase): mock.assert_called_with('test') -from unittest import TestCase -from mock import Mock, MagicMock, patch -from serial import Serial, SerialException - -from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice -from alarmdecoder.util import NoDeviceError, CommError, TimeoutError - -# Optional FTDI tests -try: - from pyftdi.pyftdi.ftdi import Ftdi, FtdiError - from usb.core import USBError, Device as USBCoreDevice - - have_pyftdi = True - -except ImportError: - have_pyftdi = False - -# Optional SSL tests -try: - from OpenSSL import SSL, crypto - - have_openssl = True -except ImportError: - have_openssl = False - class TestSerialDevice(TestCase): def setUp(self): @@ -198,39 +192,53 @@ class TestSerialDevice(TestCase): self._device.open(no_reader_thread=True) with patch.object(self._device._device, 'read') as mock: - self._device.read() + with patch('serial.Serial.fileno', return_value=1): + with patch.object(select, 'select', return_value=[[1], [], []]): + ret = self._device.read() mock.assert_called_with(1) def test_read_exception(self): with patch.object(self._device._device, 'read', side_effect=SerialException): - with self.assertRaises(CommError): - self._device.read() + with patch('serial.Serial.fileno', return_value=1): + with patch.object(select, 'select', return_value=[[1], [], []]): + with self.assertRaises(CommError): + self._device.read() def test_read_line(self): - with patch.object(self._device._device, 'read', side_effect=list("testing\r\n")): - ret = None - try: - ret = self._device.read_line() - except StopIteration: - pass + side_effect = list("testing\r\n") + if sys.version_info > (3,): + side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"] - self.assertEquals(ret, b"testing") + with patch.object(self._device._device, 'read', side_effect=side_effect): + with patch('serial.Serial.fileno', return_value=1): + with patch.object(select, 'select', return_value=[[1], [], []]): + ret = None + try: + ret = self._device.read_line() + except StopIteration: + pass + + self.assertEquals(ret, "testing") def test_read_line_timeout(self): - with patch.object(self._device._device, 'read', return_value='a') as mock: - with self.assertRaises(TimeoutError): - self._device.read_line(timeout=0.1) + with patch.object(self._device._device, 'read', return_value=b'a') as mock: + with patch('serial.Serial.fileno', return_value=1): + with patch.object(select, 'select', return_value=[[1], [], []]): + with self.assertRaises(TimeoutError): + self._device.read_line(timeout=0.1) self.assertIn('a', self._device._buffer.decode('utf-8')) def test_read_line_exception(self): with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]): - with self.assertRaises(CommError): - self._device.read_line() + with patch('serial.Serial.fileno', return_value=1): + with patch.object(select, 'select', return_value=[[1], [], []]): + with self.assertRaises(CommError): + self._device.read_line() - with self.assertRaises(CommError): - self._device.read_line() + with self.assertRaises(CommError): + self._device.read_line() class TestSocketDevice(TestCase): @@ -292,21 +300,25 @@ class TestSocketDevice(TestCase): self._device.read() def test_read_line(self): + side_effect = list("testing\r\n") + if sys.version_info > (3,): + side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"] + with patch('socket.socket.fileno', return_value=1): with patch.object(select, 'select', return_value=[[1], [], []]): - with patch.object(self._device._device, 'recv', side_effect=list("testing\r\n")): + with patch.object(self._device._device, 'recv', side_effect=side_effect): ret = None try: ret = self._device.read_line() except StopIteration: pass - self.assertEquals(ret, b"testing") + self.assertEquals(ret, "testing") def test_read_line_timeout(self): with patch('socket.socket.fileno', return_value=1): with patch.object(select, 'select', return_value=[[1], [], []]): - with patch.object(self._device._device, 'recv', return_value='a') as mock: + with patch.object(self._device._device, 'recv', return_value=b'a') as mock: with self.assertRaises(TimeoutError): self._device.read_line(timeout=0.1)