|
|
@@ -6,6 +6,7 @@ try: |
|
|
|
except: |
|
|
|
from pyftdi.ftdi import Ftdi, FtdiError |
|
|
|
from usb.core import USBError, Device as USBCoreDevice |
|
|
|
import sys |
|
|
|
import socket |
|
|
|
import time |
|
|
|
import tempfile |
|
|
@@ -14,6 +15,24 @@ import select |
|
|
|
from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice |
|
|
|
from alarmdecoder.util import NoDeviceError, CommError, TimeoutError |
|
|
|
|
|
|
|
# Optional FTDI tests |
|
|
|
try: |
|
|
|
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError |
|
|
|
from usb.core import USBError, Device as USBCoreDevice |
|
|
|
|
|
|
|
have_pyftdi = True |
|
|
|
|
|
|
|
except ImportError: |
|
|
|
have_pyftdi = False |
|
|
|
|
|
|
|
# Optional SSL tests |
|
|
|
try: |
|
|
|
from OpenSSL import SSL, crypto |
|
|
|
|
|
|
|
have_openssl = True |
|
|
|
except ImportError: |
|
|
|
have_openssl = False |
|
|
|
|
|
|
|
|
|
|
|
class TestUSBDevice(TestCase): |
|
|
|
def setUp(self): |
|
|
@@ -120,31 +139,6 @@ class TestUSBDevice(TestCase): |
|
|
|
|
|
|
|
mock.assert_called_with('test') |
|
|
|
|
|
|
|
from unittest import TestCase |
|
|
|
from mock import Mock, MagicMock, patch |
|
|
|
from serial import Serial, SerialException |
|
|
|
|
|
|
|
from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice |
|
|
|
from alarmdecoder.util import NoDeviceError, CommError, TimeoutError |
|
|
|
|
|
|
|
# Optional FTDI tests |
|
|
|
try: |
|
|
|
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError |
|
|
|
from usb.core import USBError, Device as USBCoreDevice |
|
|
|
|
|
|
|
have_pyftdi = True |
|
|
|
|
|
|
|
except ImportError: |
|
|
|
have_pyftdi = False |
|
|
|
|
|
|
|
# Optional SSL tests |
|
|
|
try: |
|
|
|
from OpenSSL import SSL, crypto |
|
|
|
|
|
|
|
have_openssl = True |
|
|
|
except ImportError: |
|
|
|
have_openssl = False |
|
|
|
|
|
|
|
|
|
|
|
class TestSerialDevice(TestCase): |
|
|
|
def setUp(self): |
|
|
@@ -198,39 +192,53 @@ class TestSerialDevice(TestCase): |
|
|
|
self._device.open(no_reader_thread=True) |
|
|
|
|
|
|
|
with patch.object(self._device._device, 'read') as mock: |
|
|
|
self._device.read() |
|
|
|
with patch('serial.Serial.fileno', return_value=1): |
|
|
|
with patch.object(select, 'select', return_value=[[1], [], []]): |
|
|
|
ret = self._device.read() |
|
|
|
|
|
|
|
mock.assert_called_with(1) |
|
|
|
|
|
|
|
def test_read_exception(self): |
|
|
|
with patch.object(self._device._device, 'read', side_effect=SerialException): |
|
|
|
with self.assertRaises(CommError): |
|
|
|
self._device.read() |
|
|
|
with patch('serial.Serial.fileno', return_value=1): |
|
|
|
with patch.object(select, 'select', return_value=[[1], [], []]): |
|
|
|
with self.assertRaises(CommError): |
|
|
|
self._device.read() |
|
|
|
|
|
|
|
def test_read_line(self): |
|
|
|
with patch.object(self._device._device, 'read', side_effect=list("testing\r\n")): |
|
|
|
ret = None |
|
|
|
try: |
|
|
|
ret = self._device.read_line() |
|
|
|
except StopIteration: |
|
|
|
pass |
|
|
|
side_effect = list("testing\r\n") |
|
|
|
if sys.version_info > (3,): |
|
|
|
side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"] |
|
|
|
|
|
|
|
self.assertEquals(ret, b"testing") |
|
|
|
with patch.object(self._device._device, 'read', side_effect=side_effect): |
|
|
|
with patch('serial.Serial.fileno', return_value=1): |
|
|
|
with patch.object(select, 'select', return_value=[[1], [], []]): |
|
|
|
ret = None |
|
|
|
try: |
|
|
|
ret = self._device.read_line() |
|
|
|
except StopIteration: |
|
|
|
pass |
|
|
|
|
|
|
|
self.assertEquals(ret, "testing") |
|
|
|
|
|
|
|
def test_read_line_timeout(self): |
|
|
|
with patch.object(self._device._device, 'read', return_value='a') as mock: |
|
|
|
with self.assertRaises(TimeoutError): |
|
|
|
self._device.read_line(timeout=0.1) |
|
|
|
with patch.object(self._device._device, 'read', return_value=b'a') as mock: |
|
|
|
with patch('serial.Serial.fileno', return_value=1): |
|
|
|
with patch.object(select, 'select', return_value=[[1], [], []]): |
|
|
|
with self.assertRaises(TimeoutError): |
|
|
|
self._device.read_line(timeout=0.1) |
|
|
|
|
|
|
|
self.assertIn('a', self._device._buffer.decode('utf-8')) |
|
|
|
|
|
|
|
def test_read_line_exception(self): |
|
|
|
with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]): |
|
|
|
with self.assertRaises(CommError): |
|
|
|
self._device.read_line() |
|
|
|
with patch('serial.Serial.fileno', return_value=1): |
|
|
|
with patch.object(select, 'select', return_value=[[1], [], []]): |
|
|
|
with self.assertRaises(CommError): |
|
|
|
self._device.read_line() |
|
|
|
|
|
|
|
with self.assertRaises(CommError): |
|
|
|
self._device.read_line() |
|
|
|
with self.assertRaises(CommError): |
|
|
|
self._device.read_line() |
|
|
|
|
|
|
|
|
|
|
|
class TestSocketDevice(TestCase): |
|
|
@@ -292,21 +300,25 @@ class TestSocketDevice(TestCase): |
|
|
|
self._device.read() |
|
|
|
|
|
|
|
def test_read_line(self): |
|
|
|
side_effect = list("testing\r\n") |
|
|
|
if sys.version_info > (3,): |
|
|
|
side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"] |
|
|
|
|
|
|
|
with patch('socket.socket.fileno', return_value=1): |
|
|
|
with patch.object(select, 'select', return_value=[[1], [], []]): |
|
|
|
with patch.object(self._device._device, 'recv', side_effect=list("testing\r\n")): |
|
|
|
with patch.object(self._device._device, 'recv', side_effect=side_effect): |
|
|
|
ret = None |
|
|
|
try: |
|
|
|
ret = self._device.read_line() |
|
|
|
except StopIteration: |
|
|
|
pass |
|
|
|
|
|
|
|
self.assertEquals(ret, b"testing") |
|
|
|
self.assertEquals(ret, "testing") |
|
|
|
|
|
|
|
def test_read_line_timeout(self): |
|
|
|
with patch('socket.socket.fileno', return_value=1): |
|
|
|
with patch.object(select, 'select', return_value=[[1], [], []]): |
|
|
|
with patch.object(self._device._device, 'recv', return_value='a') as mock: |
|
|
|
with patch.object(self._device._device, 'recv', return_value=b'a') as mock: |
|
|
|
with self.assertRaises(TimeoutError): |
|
|
|
self._device.read_line(timeout=0.1) |
|
|
|
|
|
|
|