|
- #!/usr/bin/env python
-
- import pyad2usb.ad2usb
- import time
- import signal
- import traceback
- import sys
- import logging
- from OpenSSL import SSL
-
- running = True
-
- def signal_handler(signal, frame):
- global running
-
- running = False
-
- def handle_open(sender, args):
- print 'O', args
-
- def handle_close(sender, args):
- print 'C', args
-
- def handle_read(sender, args):
- print '<', args
-
- def handle_write(sender, args):
- print '>', args
-
- def handle_attached(sender, args):
- print '+', args
-
- def handle_detached(sender, args):
- print '-', args
-
- def handle_power_changed(sender, args):
- print 'power changed', args
-
- def handle_alarm_bell(sender, args):
- print 'alarm', args
-
- def handle_bypass(sender, args):
- print 'bypass', args
-
- def handle_message(sender, args):
- print args
-
- def handle_arm(sender, args):
- print 'armed', args
-
- def handle_disarm(sender, args):
- print 'disarmed', args
-
- def handle_firmware(stage):
- if stage == pyad2usb.ad2usb.util.Firmware.STAGE_START:
- handle_firmware.wait_tick = 0
- handle_firmware.upload_tick = 0
- elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_WAITING:
- if handle_firmware.wait_tick == 0:
- sys.stdout.write('Waiting for device.')
- handle_firmware.wait_tick += 1
-
- sys.stdout.write('.')
- sys.stdout.flush()
- elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_BOOT:
- if handle_firmware.wait_tick > 0: print ""
- print "Rebooting device.."
- elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_LOAD:
- print 'Waiting for boot loader..'
- elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_UPLOADING:
- if handle_firmware.upload_tick == 0:
- sys.stdout.write('Uploading firmware.')
-
- handle_firmware.upload_tick += 1
-
- if handle_firmware.upload_tick % 30 == 0:
- sys.stdout.write('.')
- sys.stdout.flush()
- elif stage == pyad2usb.ad2usb.util.Firmware.STAGE_DONE:
- print "\r\nDone!"
-
- def handle_boot(sender, args):
- print 'boot', args
-
- def handle_config(sender, args):
- print 'config', args
-
- def handle_fault(sender, args):
- print 'zone fault', args
-
- def handle_restore(sender, args):
- print 'zone restored', args
-
- def handle_battery(sender, args):
- print 'low battery', args
-
- def handle_fire(sender, args):
- print 'FIRE!', args
-
- def handle_lrr(sender, args):
- print 'LRR', args
-
- def handle_panic(sender, args):
- print 'PANIC!', args
-
- def handle_rfx(sender, args):
- print 'RFX', args
-
- def handle_relay(sender, args):
- print 'RELAY', args
-
- def upload_usb():
- dev = pyad2usb.ad2usb.devices.USBDevice()
-
- dev.open(no_reader_thread=True)
- #pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu_V2_2a_6.hex', handle_firmware)
- dev.close()
-
- def upload_serial():
- dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB0')
-
- dev.open(no_reader_thread=True)
- pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu_V2_2a_6.hex', handle_firmware)
- #pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu-V2-2a-5-beta20-C4.hex', handle_firmware)
- dev.close()
-
- def upload_usb_serial():
- dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB5')
-
- dev.open(baudrate=115200)
- pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu_V2_2a_6.hex', handle_firmware)
- dev.close()
-
- def upload_socket():
- dev = pyad2usb.ad2usb.devices.SocketDevice(interface=('localhost', 10000))
-
- dev.open()
- pyad2usb.ad2usb.util.Firmware.upload(dev, 'tmp/ademcoemu_V2_2a_6.hex', handle_firmware)
- dev.close()
-
- def test_usb():
- dev = pyad2usb.ad2usb.devices.USBDevice()
-
- a2u = pyad2usb.ad2usb.AD2USB(dev)
- a2u.on_open += handle_open
- a2u.on_close += handle_close
- a2u.on_read += handle_read
- a2u.on_write += handle_write
-
- a2u.on_power_changed += handle_power_changed
- a2u.on_alarm += handle_alarm_bell
- a2u.on_bypass += handle_bypass
-
- a2u.open()
-
- print dev._id
-
- while running:
- time.sleep(0.1)
-
- a2u.close()
-
- def test_serial():
- dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB0')
-
- a2u = pyad2usb.ad2usb.AD2USB(dev)
- a2u.on_open += handle_open
- a2u.on_close += handle_close
- #a2u.on_read += handle_read
- #a2u.on_write += handle_write
-
- a2u.on_message += handle_message
- a2u.on_power_changed += handle_power_changed
- a2u.on_alarm += handle_alarm_bell
- a2u.on_bypass += handle_bypass
-
- a2u.open(no_reader_thread=False)
- print a2u._device._device
- #print a2u._device.read_line()
- #dev.open()
-
- print dev._id
-
- while running:
- time.sleep(0.1)
-
- a2u.close()
- #dev.close()
-
- def test_usb_serial():
- dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB5')
-
- a2u = pyad2usb.ad2usb.AD2USB(dev)
- a2u.on_open += handle_open
- a2u.on_close += handle_close
- a2u.on_read += handle_read
- a2u.on_write += handle_write
-
- a2u.open(baudrate=115200)
- print dev._id
-
- while running:
- time.sleep(0.1)
-
- a2u.close()
-
- def test_factory():
- a2u = pyad2usb.ad2usb.Overseer.create()
-
- a2u.on_open += handle_open
- a2u.on_close += handle_close
- a2u.on_read += handle_read
- a2u.on_write += handle_write
-
- a2u.open()
-
- while running:
- time.sleep(0.1)
-
- a2u.close()
-
- def test_factory_watcher():
- overseer = pyad2usb.ad2usb.Overseer(attached_event=handle_attached, detached_event=handle_detached)
-
- a2u = overseer.get_device()
-
- a2u.on_open += handle_open
- a2u.on_close += handle_close
- a2u.on_read += handle_read
- a2u.on_write += handle_write
-
- a2u.open()
-
- while running:
- time.sleep(0.1)
-
- a2u.close()
- overseer.close()
-
- def test_socket():
- dev = pyad2usb.ad2usb.devices.SocketDevice(interface=("10.10.0.1", 10000), use_ssl=True, ssl_certificate='tmp/certs/client1.pem', ssl_key='tmp/certs/client1.key', ssl_ca='tmp/certs/ca.pem')
-
- a2u = pyad2usb.ad2usb.AD2USB(dev)
- a2u.on_open += handle_open
- a2u.on_close += handle_close
- a2u.on_read += handle_read
- #a2u.on_write += handle_write
- #
- #a2u.on_message += handle_message
- #a2u.on_power_changed += handle_power_changed
- #a2u.on_alarm += handle_alarm_bell
- #a2u.on_bypass += handle_bypass
- #a2u.on_boot += handle_boot
- #a2u.on_config_received += handle_config
- #a2u.on_arm += handle_arm
- #a2u.on_disarm += handle_disarm
- a2u.on_zone_fault += handle_fault
- a2u.on_zone_restore += handle_restore
- a2u.on_rfx_message += handle_rfx
- a2u.on_relay_changed += handle_relay
- #
- #a2u.on_fire += handle_fire
- #a2u.on_low_battery += handle_battery
- #a2u.on_lrr_message += handle_lrr
- #a2u.on_panic += handle_panic
-
-
- a2u.open()
- #a2u.save_config()
- #a2u.reboot()
- #a2u.get_config()
-
- #a2u.address = 18
- #a2u.configbits = 0xff00
- #a2u.address_mask = 0xFFFFFFFF
- #a2u.emulate_zone[0] = False
- #a2u.emulate_relay[0] = False
- #a2u.emulate_lrr = False
- #a2u.deduplicate = False
-
- time.sleep(3)
-
- a2u.get_config()
- #a2u.emulate_zone[1] = False
- #a2u.save_config()
-
- #time.sleep(1)
- #a2u.fault_zone(17, True)
-
- #time.sleep(15)
- #a2u.clear_zone(17)
-
- #time.sleep(1)
- #a2u.fault_zone((2, 2), True)
-
-
- while running:
- time.sleep(0.1)
-
- a2u.close()
-
- def test_no_read_thread():
- #a2u = pyad2usb.ad2usb.Overseer.create()
-
-
- #a2u.on_open += handle_open
- #a2u.on_close += handle_close
- #a2u.on_read += handle_read
- #a2u.on_write += handle_write
-
- #a2u.open(no_reader_thread=True)
-
- dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB0')
- dev.open(no_reader_thread=True)
-
- #print 'alive?', a2u._device._read_thread.is_alive()
- while 1:
- line = dev.read_line(timeout=5)
- print line
- line = dev.read_line(timeout=5)
- print line
- #time.sleep(0.1)
-
- dev.close()
-
- def test_serial_grep():
- re = pyad2usb.devices.SerialDevice.find_all(pattern='VID:PID=9710:7840')
- for x in re:
- print x
-
- def test_double_panel_write():
- dev = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB4')
- dev2 = pyad2usb.ad2usb.devices.SerialDevice(interface='/dev/ttyUSB5')
-
- dev.open(no_reader_thread=True)
- print dev._device
-
- dev2.open(no_reader_thread=True)
- print dev2._device
- #print a2u._device.read_line()
- #dev.open()
-
- print 'Writing characters..'
- dev.write('*****')
- dev2.write('*****')
-
- print 'Reading..'
-
- dev_res = dev.read_line()
- print dev.id, dev_res
-
- dev2_res = dev2.read_line()
- print dev2.id, dev2_res
-
- dev.close()
- dev2.close()
-
- try:
- logging.basicConfig(level=logging.DEBUG)
- signal.signal(signal.SIGINT, signal_handler)
-
- #test_serial()
- #upload_serial()
-
- #test_usb()
- #test_usb_serial()
- #test_factory()
- #test_factory_watcher()
- #upload_usb()
- #upload_usb_serial()
-
- test_socket()
- #upload_socket()
-
- #test_no_read_thread()
- #test_serial_grep()
-
- #test_double_panel_write()
-
- except Exception, err:
- traceback.print_exc(err)
|