Browse Source

Removed.

pyserial_fix
Scott Petersen 11 years ago
parent
commit
07976100fd
1 changed files with 0 additions and 400 deletions
  1. +0
    -400
      bin/ad2-test

+ 0
- 400
bin/ad2-test View File

@@ -1,400 +0,0 @@
#!/usr/bin/env python

import pyad2usb.ad2usb
import time
import signal
import traceback
import sys
import logging
from OpenSSL import SSL

running = True

def signal_handler(signal, frame):
global running

running = False

def handle_open(sender, args):
print 'O', args

def handle_close(sender, args):
print 'C', args

def handle_read(sender, args):
print '<', args

def handle_write(sender, args):
print '>', args

def handle_attached(sender, args):
print '+', args

def handle_detached(sender, args):
print '-', args

def handle_power_changed(sender, args):
print 'power changed', args

def handle_alarm_bell(sender, args):
print 'alarm', args

def handle_bypass(sender, args):
print 'bypass', args

def handle_message(sender, args):
print args

def handle_arm(sender, args):
print 'armed', args

def handle_disarm(sender, args):
print 'disarmed', args

def handle_firmware(stage):
if stage == pyad2usb.ad2usb.util.Firmware.STAGE_START:
handle_firmware.wait_tick = 0
handle_firmware.upload_tick = 0
elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_WAITING:
if handle_firmware.wait_tick == 0:
sys.stdout.write('Waiting for device.')
handle_firmware.wait_tick += 1

sys.stdout.write('.')
sys.stdout.flush()
elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_BOOT:
if handle_firmware.wait_tick > 0: print ""
print "Rebooting device.."
elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_LOAD:
print 'Waiting for boot loader..'
elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_UPLOADING:
if handle_firmware.upload_tick == 0:
sys.stdout.write('Uploading firmware.')

handle_firmware.upload_tick += 1

if handle_firmware.upload_tick % 30 == 0:
sys.stdout.write('.')
sys.stdout.flush()
elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_DONE:
print "\r\nDone!"

def handle_boot(sender, args):
print 'boot', args

def handle_config(sender, args):
print 'config', args

def handle_fault(sender, args):
print 'zone fault', args

def handle_restore(sender, args):
print 'zone restored', args

def handle_battery(sender, args):
print 'low battery', args

def handle_fire(sender, args):
print 'FIRE!', args

def handle_lrr(sender, args):
print 'LRR', args

def handle_panic(sender, args):
print 'PANIC!', args

def handle_rfx(sender, args):
print 'RFX', args

def handle_relay(sender, args):
print 'RELAY', args

def upload_usb():
dev = pyad2usb.ad2usb.devices.USBDevice()

dev.open(no_reader_thread=True)
#pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu_V2_2a_6.hex', handle_firmware)
dev.close()

def upload_serial():
dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB0')

dev.open(no_reader_thread=True)
pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu_V2_2a_6.hex', handle_firmware)
#pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu-V2-2a-5-beta20-C4.hex', handle_firmware)
dev.close()

def upload_usb_serial():
dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB5')

dev.open(baudrate=115200)
pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu_V2_2a_6.hex', handle_firmware)
dev.close()

def upload_socket():
dev = pyad2usb.ad2usb.devices.SocketDevice(interface=('localhost', 10000))

dev.open()
pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu_V2_2a_6.hex', handle_firmware)
dev.close()

def test_usb():
global running

dev = pyad2usb.ad2usb.devices.USBDevice(interface=(0, 0))

a2u = pyad2usb.ad2usb.AD2USB(dev)
a2u.on_open += handle_open
a2u.on_close += handle_close
#a2u.on_read += handle_read
#a2u.on_write += handle_write
a2u.on_message += handle_message
a2u.on_zone_fault += handle_fault
a2u.on_zone_restore += handle_restore

#a2u.on_power_changed += handle_power_changed
#a2u.on_alarm += handle_alarm_bell
#a2u.on_bypass += handle_bypass

a2u.open()

print dev.id

while running:
time.sleep(0.1)

a2u.close()

def test_serial():
dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB2')

a2u = pyad2usb.ad2usb.AD2USB(dev)
a2u.on_open += handle_open
a2u.on_close += handle_close
#a2u.on_read += handle_read
#a2u.on_write += handle_write

a2u.on_message += handle_message
a2u.on_power_changed += handle_power_changed
a2u.on_alarm += handle_alarm_bell
a2u.on_bypass += handle_bypass

a2u.open(no_reader_thread=False)
print a2u._device._device
#print a2u._device.read_line()
#dev.open()

print dev._id

time.sleep(1)

a2u.send('V')

while running:
time.sleep(0.1)

a2u.close()
#dev.close()

def test_usb_serial():
dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB1')

#a2u = pyad2usb.ad2usb.AD2USB(dev)
dev.on_open += handle_open
dev.on_close += handle_close
dev.on_read += handle_read
dev.on_write += handle_write

dev.open(baudrate=115200)
print dev._id

while running:
time.sleep(0.1)

dev.close()

def test_factory():
a2u = pyad2usb.ad2usb.Overseer.create()

a2u.on_open += handle_open
a2u.on_close += handle_close
a2u.on_read += handle_read
a2u.on_write += handle_write

a2u.open()

while running:
time.sleep(0.1)

a2u.close()

def test_factory_watcher():
overseer = pyad2usb.ad2usb.Overseer(attached_event=handle_attached, detached_event=handle_detached)

a2u = overseer.get_device()

a2u.on_open += handle_open
a2u.on_close += handle_close
a2u.on_read += handle_read
a2u.on_write += handle_write

a2u.open()

while running:
time.sleep(0.1)

a2u.close()
overseer.close()

def test_socket():
dev = pyad2usb.ad2usb.devices.SocketDevice(interface=("10.10.0.1", 10000))
dev.ssl = True
dev.ssl_certificate = '../certs-temp/client.pem'
dev.ssl_key = '../certs-temp/client.key'
dev.ssl_ca = '../certs-temp/ca.pem'

a2u = pyad2usb.ad2usb.AD2USB(dev)
a2u.on_open += handle_open
a2u.on_close += handle_close
a2u.on_read += handle_read
#a2u.on_write += handle_write
#
a2u.on_message += handle_message
#a2u.on_power_changed += handle_power_changed
#a2u.on_alarm += handle_alarm_bell
#a2u.on_bypass += handle_bypass
#a2u.on_boot += handle_boot
#a2u.on_config_received += handle_config
#a2u.on_arm += handle_arm
#a2u.on_disarm += handle_disarm
a2u.on_zone_fault += handle_fault
a2u.on_zone_restore += handle_restore
a2u.on_rfx_message += handle_rfx
a2u.on_relay_changed += handle_relay
#
#a2u.on_fire += handle_fire
#a2u.on_low_battery += handle_battery
#a2u.on_lrr_message += handle_lrr
#a2u.on_panic += handle_panic


a2u.open()
#a2u.save_config()
#a2u.reboot()
#a2u.get_config()

#a2u.address = 18
#a2u.configbits = 0xff00
#a2u.address_mask = 0xFFFFFFFF
#a2u.emulate_zone[0] = False
#a2u.emulate_relay[0] = False
#a2u.emulate_lrr = False
#a2u.deduplicate = False

time.sleep(3)

a2u.get_config()
#a2u.emulate_zone[1] = False
#a2u.save_config()

#time.sleep(1)
#a2u.fault_zone(17, True)

#time.sleep(15)
#a2u.clear_zone(17)

#time.sleep(1)
#a2u.fault_zone((2, 2), True)


while running:
time.sleep(0.1)

a2u.close()

def test_no_read_thread():
#a2u = pyad2usb.ad2usb.Overseer.create()


#a2u.on_open += handle_open
#a2u.on_close += handle_close
#a2u.on_read += handle_read
#a2u.on_write += handle_write

#a2u.open(no_reader_thread=True)

dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB0')
dev.open(no_reader_thread=True)

#print 'alive?', a2u._device._read_thread.is_alive()
while 1:
line = dev.read_line(timeout=5)
print line
line = dev.read_line(timeout=5)
print line
#time.sleep(0.1)

dev.close()

def test_serial_grep():
re = pyad2usb.devices.SerialDevice.find_all(pattern='VID:PID=0403:6001')
print 'serial'
for x in re:
print x

print 'usb'
re = pyad2usb.devices.USBDevice.find_all()
for x in re:
print x

def test_double_panel_write():
dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB4')
dev2 = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB5')

dev.open(no_reader_thread=True)
print dev._device

dev2.open(no_reader_thread=True)
print dev2._device
#print a2u._device.read_line()
#dev.open()

print 'Writing characters..'
dev.write('*****')
dev2.write('*****')

print 'Reading..'

dev_res = dev.read_line()
print dev.id, dev_res

dev2_res = dev2.read_line()
print dev2.id, dev2_res

dev.close()
dev2.close()

try:
logging.basicConfig(level=logging.DEBUG)
signal.signal(signal.SIGINT, signal_handler)

test_serial()
#upload_serial()

#test_usb()
#test_usb_serial()
#test_factory()
#test_factory_watcher()
#upload_usb()
#upload_usb_serial()

#test_socket()
#upload_socket()

#test_no_read_thread()
#test_serial_grep()

#test_double_panel_write()

except Exception, err:
traceback.print_exc(err)

Loading…
Cancel
Save