Browse Source

Merge branch 'dev'

Conflicts:
	setup.py
pyserial_fix
Scott Petersen 7 years ago
parent
commit
3f369baa07
6 changed files with 257 additions and 204 deletions
  1. +3
    -1
      alarmdecoder/decoder.py
  2. +53
    -32
      alarmdecoder/devices.py
  3. +131
    -113
      alarmdecoder/util.py
  4. +12
    -12
      bin/ad2-firmwareupload
  5. +1
    -1
      setup.py
  6. +57
    -45
      test/test_devices.py

+ 3
- 1
alarmdecoder/decoder.py View File

@@ -116,6 +116,7 @@ class AlarmDecoder(object):
self._alarm_status = None self._alarm_status = None
self._bypass_status = None self._bypass_status = None
self._armed_status = None self._armed_status = None
self._armed_stay = False
self._fire_status = (False, 0) self._fire_status = (False, 0)
self._battery_status = (False, 0) self._battery_status = (False, 0)
self._panic_status = False self._panic_status = False
@@ -615,10 +616,11 @@ class AlarmDecoder(object):
message_status = message.armed_away | message.armed_home message_status = message.armed_away | message.armed_home
if message_status != self._armed_status: if message_status != self._armed_status:
self._armed_status, old_status = message_status, self._armed_status self._armed_status, old_status = message_status, self._armed_status
self._armed_stay = message.armed_home


if old_status is not None: if old_status is not None:
if self._armed_status: if self._armed_status:
self.on_arm()
self.on_arm(stay=message.armed_home)
else: else:
self.on_disarm() self.on_disarm()




+ 53
- 32
alarmdecoder/devices.py View File

@@ -800,6 +800,10 @@ class SerialDevice(Device):
:raises: py:class:`~alarmdecoder.util.CommError` :raises: py:class:`~alarmdecoder.util.CommError`
""" """
try: try:
# Hack to support unicode under Python 2.x
if isinstance(data, str) or (sys.version_info < (3,) and isinstance(data, unicode)):
data = data.encode('utf-8')

self._device.write(data) self._device.write(data)


except serial.SerialTimeoutException: except serial.SerialTimeoutException:
@@ -818,15 +822,18 @@ class SerialDevice(Device):
:returns: character read from the device :returns: character read from the device
:raises: :py:class:`~alarmdecoder.util.CommError` :raises: :py:class:`~alarmdecoder.util.CommError`
""" """
ret = None
data = ''


try: try:
ret = self._device.read(1)
read_ready, _, _ = select.select([self._device.fileno()], [], [], 0.5)

if len(read_ready) != 0:
data = self._device.read(1)


except serial.SerialException as err: except serial.SerialException as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err) raise CommError('Error reading from device: {0}'.format(str(err)), err)


return ret
return data.decode('utf-8')


def read_line(self, timeout=0.0, purge_buffer=False): def read_line(self, timeout=0.0, purge_buffer=False):
""" """
@@ -850,39 +857,54 @@ class SerialDevice(Device):
if purge_buffer: if purge_buffer:
self._buffer = b'' self._buffer = b''


got_line, ret = False, None
got_line, data = False, ''


timer = threading.Timer(timeout, timeout_event) timer = threading.Timer(timeout, timeout_event)
if timeout > 0: if timeout > 0:
timer.start() timer.start()


leftovers = b''
try: try:
while timeout_event.reading:
buf = self._device.read(1)
while timeout_event.reading and not got_line:
read_ready, _, _ = select.select([self._device.fileno()], [], [], 0.5)
if len(read_ready) == 0:
continue


# NOTE: AD2SERIAL apparently sends down \xFF on boot.
if buf != b'' and buf != b"\xff":
ub = bytes_hack(buf)
bytes_avail = 0
if hasattr(self._device, "in_waiting"):
bytes_avail = self._device.in_waiting
else:
bytes_avail = self._device.inWaiting()


self._buffer += ub
buf = self._device.read(bytes_avail)


if ub == b"\n":
self._buffer = self._buffer.rstrip(b"\r\n")
for idx in range(len(buf)):
c = buf[idx]


if len(self._buffer) > 0:
got_line = True
break
else:
time.sleep(0.01)
ub = bytes_hack(c)
if sys.version_info > (3,):
ub = bytes([ub])

# NOTE: AD2SERIAL and AD2PI apparently sends down \xFF on boot.
if ub != b'' and ub != b"\xff":
self._buffer += ub

if ub == b"\n":
self._buffer = self._buffer.strip(b"\r\n")

if len(self._buffer) > 0:
got_line = True
leftovers = buf[idx:]
break


except (OSError, serial.SerialException) as err: except (OSError, serial.SerialException) as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err) raise CommError('Error reading from device: {0}'.format(str(err)), err)


else: else:
if got_line: if got_line:
ret, self._buffer = self._buffer, b''
data, self._buffer = self._buffer, leftovers


self.on_read(data=ret)
self.on_read(data=data)


else: else:
raise TimeoutError('Timeout while waiting for line terminator.') raise TimeoutError('Timeout while waiting for line terminator.')
@@ -890,7 +912,7 @@ class SerialDevice(Device):
finally: finally:
timer.cancel() timer.cancel()


return ret
return data.decode('utf-8')


def purge(self): def purge(self):
""" """
@@ -1096,6 +1118,9 @@ class SocketDevice(Device):
data_sent = None data_sent = None


try: try:
if isinstance(data, str):
data = data.encode('utf-8')

data_sent = self._device.send(data) data_sent = self._device.send(data)


if data_sent == 0: if data_sent == 0:
@@ -1115,18 +1140,18 @@ class SocketDevice(Device):
:returns: character read from the device :returns: character read from the device
:raises: :py:class:`~alarmdecoder.util.CommError` :raises: :py:class:`~alarmdecoder.util.CommError`
""" """
data = None
data = ''


try: try:
read_ready, _, _ = select.select([self._device], [], [], 0)
read_ready, _, _ = select.select([self._device], [], [], 0.5)


if (len(read_ready) != 0):
if len(read_ready) != 0:
data = self._device.recv(1) data = self._device.recv(1)


except socket.error as err: except socket.error as err:
raise CommError('Error while reading from device: {0}'.format(str(err)), err) raise CommError('Error while reading from device: {0}'.format(str(err)), err)


return data
return data.decode('utf-8')


def read_line(self, timeout=0.0, purge_buffer=False): def read_line(self, timeout=0.0, purge_buffer=False):
""" """
@@ -1158,15 +1183,14 @@ class SocketDevice(Device):


try: try:
while timeout_event.reading: while timeout_event.reading:
read_ready, _, _ = select.select([self._device], [], [], 0)
read_ready, _, _ = select.select([self._device], [], [], 0.5)


if (len(read_ready) == 0):
time.sleep(0.01)
if len(read_ready) == 0:
continue continue


buf = self._device.recv(1) buf = self._device.recv(1)


if buf != b'':
if buf != b'' and buf != b"\xff":
ub = bytes_hack(buf) ub = bytes_hack(buf)


self._buffer += ub self._buffer += ub
@@ -1178,9 +1202,6 @@ class SocketDevice(Device):
got_line = True got_line = True
break break


else:
time.sleep(0.01)

except socket.error as err: except socket.error as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err) raise CommError('Error reading from device: {0}'.format(str(err)), err)


@@ -1203,7 +1224,7 @@ class SocketDevice(Device):
finally: finally:
timer.cancel() timer.cancel()


return ret
return ret.decode('utf-8')


def purge(self): def purge(self):
""" """


+ 131
- 113
alarmdecoder/util.py View File

@@ -8,7 +8,11 @@ Provides utility classes for the `AlarmDecoder`_ (AD2) devices.


import time import time
import threading import threading
import select
import alarmdecoder

from io import open from io import open
from collections import deque




class NoDeviceError(Exception): class NoDeviceError(Exception):
@@ -53,6 +57,30 @@ class UploadChecksumError(UploadError):
pass pass




def bytes_available(device):
bytes_avail = 0

if isinstance(device, alarmdecoder.devices.SerialDevice):
if hasattr(device._device, "in_waiting"):
bytes_avail = device._device.in_waiting
else:
bytes_avail = device._device.inWaiting()
elif isinstance(device, alarmdecoder.devices.SocketDevice):
bytes_avail = 4096

return bytes_avail

def read_firmware_file(file_path):
data_queue = deque()

with open(file_path) as firmware_handle:
for line in firmware_handle:
line = line.rstrip()
if line != '' and line[0] == ':':
data_queue.append(line + "\r")

return data_queue

class Firmware(object): class Firmware(object):
""" """
Represents firmware for the `AlarmDecoder`_ devices. Represents firmware for the `AlarmDecoder`_ devices.
@@ -62,144 +90,134 @@ class Firmware(object):
STAGE_START = 0 STAGE_START = 0
STAGE_WAITING = 1 STAGE_WAITING = 1
STAGE_BOOT = 2 STAGE_BOOT = 2
STAGE_WAITING_ON_LOADER = 2.5
STAGE_LOAD = 3 STAGE_LOAD = 3
STAGE_UPLOADING = 4 STAGE_UPLOADING = 4
STAGE_DONE = 5 STAGE_DONE = 5
STAGE_ERROR = 98 STAGE_ERROR = 98
STAGE_DEBUG = 99 STAGE_DEBUG = 99


# FIXME: Rewrite this monstrosity.
@staticmethod @staticmethod
def upload(dev, filename, progress_callback=None, debug=False):
def read(device):
response = None
bytes_avail = bytes_available(device)

if isinstance(device, alarmdecoder.devices.SerialDevice):
response = device._device.read(bytes_avail)
elif isinstance(device, alarmdecoder.devices.SocketDevice):
response = device._device.recv(bytes_avail)

return response

@staticmethod
def upload(device, file_path, progress_callback=None, debug=False):
""" """
Uploads firmware to an `AlarmDecoder`_ device. Uploads firmware to an `AlarmDecoder`_ device.


:param filename: firmware filename
:type filename: string
:param file_path: firmware file path
:type file_path: string
:param progress_callback: callback function used to report progress :param progress_callback: callback function used to report progress
:type progress_callback: function :type progress_callback: function


:raises: :py:class:`~alarmdecoder.util.NoDeviceError`, :py:class:`~alarmdecoder.util.TimeoutError` :raises: :py:class:`~alarmdecoder.util.NoDeviceError`, :py:class:`~alarmdecoder.util.TimeoutError`
""" """


def do_upload():
"""
Perform the actual firmware upload to the device.
"""
with open(filename) as upload_file:
line_cnt = 0
for line in upload_file:
line_cnt += 1
line = line.rstrip()

if line[0] == ':':
dev.write(line + "\r")
response = dev.read_line(timeout=5.0, purge_buffer=True)
if debug:
stage_callback(Firmware.STAGE_DEBUG, data="line={0} - line={1} response={2}".format(line_cnt, line, response));

if '!ce' in response:
raise UploadChecksumError("Checksum error on line " + str(line_cnt) + " of " + filename);

elif '!no' in response:
raise UploadError("Incorrect data sent to bootloader.")

elif '!ok' in response:
break

else:
if progress_callback is not None:
progress_callback(Firmware.STAGE_UPLOADING)

time.sleep(0.0)

def read_until(pattern, timeout=0.0):
"""
Read characters until a specific pattern is found or the timeout is
hit.
"""
def timeout_event():
"""Handles the read timeout event."""
timeout_event.reading = False

timeout_event.reading = True

timer = None
if timeout > 0:
timer = threading.Timer(timeout, timeout_event)
timer.start()

position = 0

dev.purge()

while timeout_event.reading:
try:
char = dev.read()

if char is not None and char != '':
if char == pattern[position]:
position = position + 1
if position == len(pattern):
break
else:
position = 0

except Exception as err:
pass

if timer:
if timer.is_alive():
timer.cancel()
else:
raise TimeoutError('Timeout while waiting for line terminator.')

def stage_callback(stage, **kwargs):
def progress_stage(stage, **kwargs):
"""Callback to update progress for the specified stage.""" """Callback to update progress for the specified stage."""
if progress_callback is not None: if progress_callback is not None:
progress_callback(stage, **kwargs) progress_callback(stage, **kwargs)


if dev is None:
return stage

if device is None:
raise NoDeviceError('No device specified for firmware upload.') raise NoDeviceError('No device specified for firmware upload.')


stage_callback(Firmware.STAGE_START)
fds = [device._device.fileno()]

# Read firmware file into memory
try:
write_queue = read_firmware_file(file_path)
except IOError as err:
stage = progress_stage(Firmware.STAGE_ERROR, error=str(err))
return


if dev.is_reader_alive():
data_read = ''
got_response = False
running = True
stage = progress_stage(Firmware.STAGE_START)

if device.is_reader_alive():
# Close the reader thread and wait for it to die, otherwise # Close the reader thread and wait for it to die, otherwise
# it interferes with our reading. # it interferes with our reading.
dev.stop_reader()
while dev._read_thread.is_alive():
stage_callback(Firmware.STAGE_WAITING)
device.stop_reader()
while device._read_thread.is_alive():
stage = progress_stage(Firmware.STAGE_WAITING)
time.sleep(0.5) time.sleep(0.5)


# Reboot the device and wait for the boot loader.
retry = 3
found_loader = False
while retry > 0:
try:
stage_callback(Firmware.STAGE_BOOT)
dev.write("=")
read_until('!boot', timeout=15.0)

# Get ourselves into the boot loader and wait for indication
# that it's ready for the firmware upload.
stage_callback(Firmware.STAGE_LOAD)
dev.write("=")
read_until('!load', timeout=15.0)

except TimeoutError as err:
retry -= 1
else:
retry = 0
found_loader = True

# And finally do the upload.
if found_loader:
try:
do_upload()
except UploadError as err:
stage_callback(Firmware.STAGE_ERROR, error=str(err))
else:
stage_callback(Firmware.STAGE_DONE)
time.sleep(3)

try:
while running:
rr, wr, _ = select.select(fds, fds, [], 0.5)

if len(rr) != 0:
response = Firmware.read(device)

for c in response:
# HACK: Python 3 / PySerial hack.
if isinstance(c, int):
c = chr(c)

if c == '\xff' or c == '\r': # HACK: odd case for our mystery \xff byte.
# Boot started, start looking for the !boot message
if data_read.startswith("!sn"):
stage = progress_stage(Firmware.STAGE_BOOT)
# Entered bootloader upload mode, start uploading
elif data_read.startswith("!load"):
got_response = True
stage = progress_stage(Firmware.STAGE_UPLOADING)
# Checksum error
elif data_read == '!ce':
running = False
raise UploadChecksumError("Checksum error in {0}".format(file_path))
# Bad data
elif data_read == '!no':
running = False
raise UploadError("Incorrect data sent to bootloader.")
# Firmware upload complete
elif data_read == '!ok':
running = False
stage = progress_stage(Firmware.STAGE_DONE)
# All other responses are valid during upload.
else:
got_response = True
if stage == Firmware.STAGE_UPLOADING:
progress_stage(stage)

data_read = ''
elif c == '\n':
pass
else:
data_read += c

if len(wr) != 0:
# Reboot device
if stage in [Firmware.STAGE_START, Firmware.STAGE_WAITING]:
device.write('=')
stage = progress_stage(Firmware.STAGE_WAITING_ON_LOADER)

# Enter bootloader
elif stage == Firmware.STAGE_BOOT:
device.write('=')
stage = progress_stage(Firmware.STAGE_LOAD)

# Upload firmware
elif stage == Firmware.STAGE_UPLOADING:
if len(write_queue) > 0 and got_response == True:
got_response = False
device.write(write_queue.popleft())

except UploadError as err:
stage = progress_stage(Firmware.STAGE_ERROR, error=str(err))
else: else:
stage_callback(Firmware.STAGE_ERROR, error="Error entering bootloader.")
stage = progress_stage(Firmware.STAGE_DONE)

+ 12
- 12
bin/ad2-firmwareupload View File

@@ -2,6 +2,7 @@


import os import os
import sys, time import sys, time
import traceback
import alarmdecoder import alarmdecoder


def handle_firmware(stage, **kwargs): def handle_firmware(stage, **kwargs):
@@ -41,13 +42,12 @@ def main():
firmware = None firmware = None
baudrate = 115200 baudrate = 115200


if len(sys.argv) < 2:
print("Syntax: {0} <firmware> [device path or hostname:port] [baudrate]".format(sys.argv[0]))
if len(sys.argv) < 3:
print("Syntax: {0} <firmware> [device path or hostname:port] [baudrate=115200]".format(sys.argv[0]))
sys.exit(1) sys.exit(1)


firmware = sys.argv[1] firmware = sys.argv[1]
if len(sys.argv) > 2:
device = sys.argv[2]
device = sys.argv[2]


if len(sys.argv) > 3: if len(sys.argv) > 3:
baudrate = sys.argv[3] baudrate = sys.argv[3]
@@ -60,20 +60,20 @@ def main():
if ':' in device: if ':' in device:
hostname, port = device.split(':') hostname, port = device.split(':')
dev = alarmdecoder.devices.SocketDevice(interface=(hostname, int(port))) dev = alarmdecoder.devices.SocketDevice(interface=(hostname, int(port)))
dev.open(no_reader_thread=True)
else: else:
dev = alarmdecoder.devices.SerialDevice(interface=device) dev = alarmdecoder.devices.SerialDevice(interface=device)
dev.open(baudrate=baudrate, no_reader_thread=True)

dev.open(baudrate=baudrate, no_reader_thread=True)


time.sleep(3) time.sleep(3)
alarmdecoder.util.Firmware.upload(dev, firmware, handle_firmware, debug=debug) alarmdecoder.util.Firmware.upload(dev, firmware, handle_firmware, debug=debug)


except alarmdecoder.util.NoDeviceError, ex:
print "Error: Could not find device: {0}".format(ex)
except alarmdecoder.util.UploadError, ex:
print "Error: Error uploading firmware: {0}".format(ex)
except Exception, ex:
print "Error: {0}".format(ex)
except alarmdecoder.util.NoDeviceError as ex:
print("Error: Could not find device: {0}".format(ex))
except alarmdecoder.util.UploadError as ex:
print("Error: Error uploading firmware: {0}".format(ex))
except Exception as ex:
print("Error: {0}: {1}".format(ex, traceback.format_exc()))
finally: finally:
if dev is not None: if dev is not None:
dev.close() dev.close()


+ 1
- 1
setup.py View File

@@ -14,7 +14,7 @@ if sys.version_info < (3,):
extra_requirements.append('future==0.14.3') extra_requirements.append('future==0.14.3')


setup(name='alarmdecoder', setup(name='alarmdecoder',
version='0.12.1',
version='0.12.2',
description='Python interface for the AlarmDecoder (AD2) family ' description='Python interface for the AlarmDecoder (AD2) family '
'of alarm devices which includes the AD2USB, AD2SERIAL and AD2PI.', 'of alarm devices which includes the AD2USB, AD2SERIAL and AD2PI.',
long_description=readme(), long_description=readme(),


+ 57
- 45
test/test_devices.py View File

@@ -6,6 +6,7 @@ try:
except: except:
from pyftdi.ftdi import Ftdi, FtdiError from pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice from usb.core import USBError, Device as USBCoreDevice
import sys
import socket import socket
import time import time
import tempfile import tempfile
@@ -14,6 +15,24 @@ import select
from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
from alarmdecoder.util import NoDeviceError, CommError, TimeoutError from alarmdecoder.util import NoDeviceError, CommError, TimeoutError


# Optional FTDI tests
try:
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice

have_pyftdi = True

except ImportError:
have_pyftdi = False

# Optional SSL tests
try:
from OpenSSL import SSL, crypto

have_openssl = True
except ImportError:
have_openssl = False



class TestUSBDevice(TestCase): class TestUSBDevice(TestCase):
def setUp(self): def setUp(self):
@@ -120,31 +139,6 @@ class TestUSBDevice(TestCase):


mock.assert_called_with('test') mock.assert_called_with('test')


from unittest import TestCase
from mock import Mock, MagicMock, patch
from serial import Serial, SerialException

from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
from alarmdecoder.util import NoDeviceError, CommError, TimeoutError

# Optional FTDI tests
try:
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice

have_pyftdi = True

except ImportError:
have_pyftdi = False

# Optional SSL tests
try:
from OpenSSL import SSL, crypto

have_openssl = True
except ImportError:
have_openssl = False



class TestSerialDevice(TestCase): class TestSerialDevice(TestCase):
def setUp(self): def setUp(self):
@@ -198,39 +192,53 @@ class TestSerialDevice(TestCase):
self._device.open(no_reader_thread=True) self._device.open(no_reader_thread=True)


with patch.object(self._device._device, 'read') as mock: with patch.object(self._device._device, 'read') as mock:
self._device.read()
with patch('serial.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
ret = self._device.read()


mock.assert_called_with(1) mock.assert_called_with(1)


def test_read_exception(self): def test_read_exception(self):
with patch.object(self._device._device, 'read', side_effect=SerialException): with patch.object(self._device._device, 'read', side_effect=SerialException):
with self.assertRaises(CommError):
self._device.read()
with patch('serial.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
with self.assertRaises(CommError):
self._device.read()


def test_read_line(self): def test_read_line(self):
with patch.object(self._device._device, 'read', side_effect=list("testing\r\n")):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass
side_effect = list("testing\r\n")
if sys.version_info > (3,):
side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"]


self.assertEquals(ret, b"testing")
with patch.object(self._device._device, 'read', side_effect=side_effect):
with patch('serial.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass

self.assertEquals(ret, "testing")


def test_read_line_timeout(self): def test_read_line_timeout(self):
with patch.object(self._device._device, 'read', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)
with patch.object(self._device._device, 'read', return_value=b'a') as mock:
with patch('serial.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)


self.assertIn('a', self._device._buffer.decode('utf-8')) self.assertIn('a', self._device._buffer.decode('utf-8'))


def test_read_line_exception(self): def test_read_line_exception(self):
with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]): with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]):
with self.assertRaises(CommError):
self._device.read_line()
with patch('serial.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
with self.assertRaises(CommError):
self._device.read_line()


with self.assertRaises(CommError):
self._device.read_line()
with self.assertRaises(CommError):
self._device.read_line()




class TestSocketDevice(TestCase): class TestSocketDevice(TestCase):
@@ -292,21 +300,25 @@ class TestSocketDevice(TestCase):
self._device.read() self._device.read()


def test_read_line(self): def test_read_line(self):
side_effect = list("testing\r\n")
if sys.version_info > (3,):
side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"]

with patch('socket.socket.fileno', return_value=1): with patch('socket.socket.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]): with patch.object(select, 'select', return_value=[[1], [], []]):
with patch.object(self._device._device, 'recv', side_effect=list("testing\r\n")):
with patch.object(self._device._device, 'recv', side_effect=side_effect):
ret = None ret = None
try: try:
ret = self._device.read_line() ret = self._device.read_line()
except StopIteration: except StopIteration:
pass pass


self.assertEquals(ret, b"testing")
self.assertEquals(ret, "testing")


def test_read_line_timeout(self): def test_read_line_timeout(self):
with patch('socket.socket.fileno', return_value=1): with patch('socket.socket.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]): with patch.object(select, 'select', return_value=[[1], [], []]):
with patch.object(self._device._device, 'recv', return_value='a') as mock:
with patch.object(self._device._device, 'recv', return_value=b'a') as mock:
with self.assertRaises(TimeoutError): with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1) self._device.read_line(timeout=0.1)




Loading…
Cancel
Save