Browse Source

Cleanup.

pyserial_fix
Scott Petersen 11 years ago
parent
commit
ccc3635d01
4 changed files with 51 additions and 42 deletions
  1. +31
    -29
      pyad2usb/devices.py
  2. +2
    -0
      pyad2usb/panels.py
  3. +2
    -0
      pyad2usb/util.py
  4. +16
    -13
      test.py

+ 31
- 29
pyad2usb/devices.py View File

@@ -4,12 +4,10 @@ Contains different types of devices belonging to the AD2USB family.
.. moduleauthor:: Scott Petersen <scott@nutech.com> .. moduleauthor:: Scott Petersen <scott@nutech.com>
""" """


import usb.core
import usb.util
import usb.core, usb.util
import time import time
import threading import threading
import serial
import serial.tools.list_ports
import serial, serial.tools.list_ports
import socket import socket
from OpenSSL import SSL, crypto from OpenSSL import SSL, crypto
from pyftdi.pyftdi.ftdi import * from pyftdi.pyftdi.ftdi import *
@@ -301,6 +299,7 @@ class USBDevice(Device):
self._device.write_data(data) self._device.write_data(data)


self.on_write(data) self.on_write(data)

except FtdiError, err: except FtdiError, err:
raise util.CommError('Error writing to device: {0}'.format(str(err)), err) raise util.CommError('Error writing to device: {0}'.format(str(err)), err)


@@ -740,31 +739,7 @@ class SocketDevice(Device):
self._device = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._device = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


if self._use_ssl: if self._use_ssl:
try:
ctx = SSL.Context(SSL.TLSv1_METHOD)

if isinstance(self.ssl_key, crypto.PKey):
ctx.use_privatekey(self.ssl_key)
else:
ctx.use_privatekey_file(self.ssl_key)

if isinstance(self.ssl_certificate, crypto.X509):
ctx.use_certificate(self.ssl_certificate)
else:
ctx.use_certificate_file(self.ssl_certificate)

if isinstance(self.ssl_ca, crypto.X509):
store = ctx.get_cert_store()
store.add_cert(self.ssl_ca)
else:
ctx.load_verify_locations(self.ssl_ca, None)

ctx.set_verify(SSL.VERIFY_PEER, self._verify_ssl_callback)

self._device = SSL.Connection(ctx, self._device)

except SSL.Error, err:
raise util.CommError('Error setting up SSL connection.', err)
self._init_ssl()


self._device.connect((self._host, self._port)) self._device.connect((self._host, self._port))


@@ -906,6 +881,33 @@ class SocketDevice(Device):


return ret return ret


def _init_ssl(self):
try:
ctx = SSL.Context(SSL.TLSv1_METHOD)

if isinstance(self.ssl_key, crypto.PKey):
ctx.use_privatekey(self.ssl_key)
else:
ctx.use_privatekey_file(self.ssl_key)

if isinstance(self.ssl_certificate, crypto.X509):
ctx.use_certificate(self.ssl_certificate)
else:
ctx.use_certificate_file(self.ssl_certificate)

if isinstance(self.ssl_ca, crypto.X509):
store = ctx.get_cert_store()
store.add_cert(self.ssl_ca)
else:
ctx.load_verify_locations(self.ssl_ca, None)

ctx.set_verify(SSL.VERIFY_PEER, self._verify_ssl_callback)

self._device = SSL.Connection(ctx, self._device)

except SSL.Error, err:
raise util.CommError('Error setting up SSL connection.', err)

def _verify_ssl_callback(self, connection, x509, errnum, errdepth, ok): def _verify_ssl_callback(self, connection, x509, errnum, errdepth, ok):
#print ok #print ok
return ok return ok

+ 2
- 0
pyad2usb/panels.py View File

@@ -1,5 +1,7 @@
""" """
Representations of Panels and their templates. Representations of Panels and their templates.

.. moduleauthor:: Scott Petersen <scott@nutech.com>
""" """


VISTA20 = 0 VISTA20 = 0


+ 2
- 0
pyad2usb/util.py View File

@@ -1,5 +1,7 @@
""" """
Provides utility classes for the AD2USB devices. Provides utility classes for the AD2USB devices.

.. moduleauthor:: Scott Petersen <scott@nutech.com>
""" """


import ad2usb import ad2usb


+ 16
- 13
test.py View File

@@ -143,24 +143,27 @@ def test_usb():


dev = pyad2usb.ad2usb.devices.USBDevice(interface=(0, 0)) dev = pyad2usb.ad2usb.devices.USBDevice(interface=(0, 0))


#a2u = pyad2usb.ad2usb.AD2USB(dev)
dev.on_open += handle_open
dev.on_close += handle_close
dev.on_read += handle_read
dev.on_write += handle_write
a2u = pyad2usb.ad2usb.AD2USB(dev)
a2u.on_open += handle_open
a2u.on_close += handle_close
#a2u.on_read += handle_read
#a2u.on_write += handle_write
a2u.on_message += handle_message
a2u.on_zone_fault += handle_fault
a2u.on_zone_restore += handle_restore


#a2u.on_power_changed += handle_power_changed #a2u.on_power_changed += handle_power_changed
#a2u.on_alarm += handle_alarm_bell #a2u.on_alarm += handle_alarm_bell
#a2u.on_bypass += handle_bypass #a2u.on_bypass += handle_bypass


dev.open()
a2u.open()


print dev.id print dev.id


while running: while running:
time.sleep(0.1) time.sleep(0.1)


dev.close()
a2u.close()


def test_serial(): def test_serial():
dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB1') dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB1')
@@ -242,9 +245,9 @@ def test_factory_watcher():
def test_socket(): def test_socket():
dev = pyad2usb.ad2usb.devices.SocketDevice(interface=("10.10.0.1", 10000)) dev = pyad2usb.ad2usb.devices.SocketDevice(interface=("10.10.0.1", 10000))
dev.ssl = True dev.ssl = True
dev.ssl_certificate = 'tmp/certs/client1.pem'
dev.ssl_key = 'tmp/certs/client1.key'
dev.ssl_ca = 'tmp/certs/ca.pem'
dev.ssl_certificate = '../certs-temp/client.pem'
dev.ssl_key = '../certs-temp/client.key'
dev.ssl_ca = '../certs-temp/ca.pem'


a2u = pyad2usb.ad2usb.AD2USB(dev) a2u = pyad2usb.ad2usb.AD2USB(dev)
a2u.on_open += handle_open a2u.on_open += handle_open
@@ -252,7 +255,7 @@ def test_socket():
a2u.on_read += handle_read a2u.on_read += handle_read
#a2u.on_write += handle_write #a2u.on_write += handle_write
# #
#a2u.on_message += handle_message
a2u.on_message += handle_message
#a2u.on_power_changed += handle_power_changed #a2u.on_power_changed += handle_power_changed
#a2u.on_alarm += handle_alarm_bell #a2u.on_alarm += handle_alarm_bell
#a2u.on_bypass += handle_bypass #a2u.on_bypass += handle_bypass
@@ -374,14 +377,14 @@ try:
#test_serial() #test_serial()
#upload_serial() #upload_serial()


test_usb()
#test_usb()
#test_usb_serial() #test_usb_serial()
#test_factory() #test_factory()
#test_factory_watcher() #test_factory_watcher()
#upload_usb() #upload_usb()
#upload_usb_serial() #upload_usb_serial()


#test_socket()
test_socket()
#upload_socket() #upload_socket()


#test_no_read_thread() #test_no_read_thread()


Loading…
Cancel
Save