Browse Source

Added examples.

pyserial_fix
Scott Petersen 11 years ago
parent
commit
ab62967151
8 changed files with 436 additions and 0 deletions
  1. +68
    -0
      examples/alarm_email.py
  2. +37
    -0
      examples/basics.py
  3. +83
    -0
      examples/detection.py
  4. +49
    -0
      examples/rf_device.py
  5. +37
    -0
      examples/serialport.py
  6. +38
    -0
      examples/socket_example.py
  7. +51
    -0
      examples/ssl_socket.py
  8. +73
    -0
      examples/virtual_zone_expander.py

+ 68
- 0
examples/alarm_email.py View File

@@ -0,0 +1,68 @@
import time
import smtplib
from email.mime.text import MIMEText
from pyad2 import AD2
from pyad2.devices import USBDevice

# Configuration values
SUBJECT = "Alarm Decoder - ALARM"
FROM_ADDRESS = "root@localhost"
TO_ADDRESS = "root@localhost" # NOTE: Sending an SMS is as easy as looking
# up the email address format for your provider.
SMTP_SERVER = "localhost"
SMTP_USERNAME = None
SMTP_PASSWORD = None

def main():
"""
Example application that sends an email when an alarm event is
detected.
"""
try:
# Retrieve the first USB device
device = AD2(USBDevice.find())

# Set up an event handler and open the device
device.on_alarm += handle_alarm
device.open()

# Retrieve the device configuration
device.get_config()

# Wait for events
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex

finally:
device.close()

def handle_alarm(sender, *args, **kwargs):
"""
Handles alarm events from the AD2.
"""
status = kwargs['status']
text = "Alarm status: {0}".format(status)

# Build the email message
msg = MIMEText(text)
msg['Subject'] = SUBJECT
msg['From'] = FROM_ADDRESS
msg['To'] = TO_ADDRESS

s = smtplib.SMTP(SMTP_SERVER)

# Authenticate if needed
if SMTP_USERNAME is not None:
s.login(SMTP_USERNAME, SMTP_PASSWORD)

# Send the email
s.sendmail(FROM_ADDRESS, TO_ADDRESS, msg.as_string())
s.quit()

print 'sent alarm email:', text

if __name__ == '__main__':
main()

+ 37
- 0
examples/basics.py View File

@@ -0,0 +1,37 @@
import time
from pyad2 import AD2
from pyad2.devices import USBDevice

def main():
"""
Example application that prints messages from the panel to the terminal.
"""
try:
# Retrieve the first USB device
device = AD2(USBDevice.find())

# Set up an event handler and open the device
device.on_message += handle_message
device.open()
device.get_config()

# Wait for events.
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex

finally:
device.close()

def handle_message(sender, *args, **kwargs):
"""
Handles message events from the AD2.
"""
msg = kwargs['message']

print sender, msg.raw

if __name__ == '__main__':
main()

+ 83
- 0
examples/detection.py View File

@@ -0,0 +1,83 @@
import time
from pyad2 import AD2
from pyad2.devices import USBDevice

__devices = {}

def main():
"""
Example application that shows how to handle attach/detach events generated
by the USB devices.

In this case we open the device and listen for messages when it is attached.
And when it is detached we remove it from our list of monitored devices.
"""
try:
# Start up the detection thread such that handle_attached and handle_detached will
# be called when devices are attached and detached, respectively.
USBDevice.start_detection(on_attached=handle_attached, on_detached=handle_detached)

# Wait for events.
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex

finally:
# Close all devices and stop detection.
for sn, device in __devices.iteritems():
device.close()

USBDevice.stop_detection()

def create_device(device_args):
"""
Creates an AD2 from the specified USB device arguments.

:param device_args: Tuple containing information on the USB device to open.
:type device_args: Tuple (vid, pid, serialnumber, interface_count, description)
"""
device = AD2(USBDevice.find(device_args))
device.on_message += handle_message
device.open()

return device

def handle_message(sender, *args, **kwargs):
"""
Handles message events from the AD2.
"""
msg = kwargs['message']

print sender, msg.raw

def handle_attached(sender, *args, **kwargs):
"""
Handles attached events from USBDevice.start_detection().
"""
device_args = kwargs['device']

# Create the device from the specified device arguments.
device = create_device(device_args)
__devices[device.id] = device

print 'attached', device.id

def handle_detached(sender, *args, **kwargs):
"""
Handles detached events from USBDevice.start_detection().
"""
device = kwargs['device']
vendor, product, sernum, ifcount, description = device

# Close and remove the device from our list.
if sernum in __devices.keys():
__devices[sernum].close()

del __devices[sernum]

print 'detached', sernum

if __name__ == '__main__':
main()

+ 49
- 0
examples/rf_device.py View File

@@ -0,0 +1,49 @@
import time
from pyad2 import AD2
from pyad2.devices import USBDevice

RF_DEVICE_SERIAL_NUMBER = '0252254'

def main():
"""
Example application that watches for an event from a specific RF device.

This feature that allows you to watch for events from RF devices if you
have an RF receiver. This is useful in the case of internal sensors,
which don't emit a FAULT if the sensor is tripped and the panel is armed
STAY. It also will monitor sensors that aren't configured.

NOTE: You must have an RF receiver installed and enabled in your panel
for RFX messages to be seen.
"""
try:
# Retrieve the first USB device
device = AD2(USBDevice.find())

# Set up an event handler and open the device
device.on_rfx_message += handle_rfx
device.open()
device.get_config()

# Wait for events.
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex

finally:
device.close()

def handle_rfx(sender, *args, **kwargs):
"""
Handles RF message events from the AD2.
"""
msg = kwargs['message']

# Check for our target serial number and loop
if msg.serial_number == RF_DEVICE_SERIAL_NUMBER and msg.loop[0] == True:
print msg.serial_number, 'triggered loop #1'

if __name__ == '__main__':
main()

+ 37
- 0
examples/serialport.py View File

@@ -0,0 +1,37 @@
import time
from pyad2 import AD2
from pyad2.devices import SerialDevice

def main():
"""
Example application that opens a serial device and prints messages to the terminal.
"""
try:
# Retrieve the specified serial device.
device = AD2(SerialDevice(interface='/dev/ttyUSB0'))

# Set up an event handler and open the device
device.on_message += handle_message
device.open(baudrate=115200) # Override the default SerialDevice baudrate.
device.get_config()

# Wait for events.
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex

finally:
device.close()

def handle_message(sender, *args, **kwargs):
"""
Handles message events from the AD2.
"""
msg = kwargs['message']

print sender, msg.raw

if __name__ == '__main__':
main()

+ 38
- 0
examples/socket_example.py View File

@@ -0,0 +1,38 @@
import time
from pyad2 import AD2
from pyad2.devices import SocketDevice

def main():
"""
Example application that opens a device that has been exposed to the network
with ser2sock or similar serial->ip software.
"""
try:
# Retrieve an AD2 device that has been exposed with ser2sock on localhost:10000.
device = AD2(SocketDevice(interface=('localhost', 10000)))

# Set up an event handler and open the device
device.on_message += handle_message
device.open()
device.get_config()

# Wait for events.
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex

finally:
device.close()

def handle_message(sender, *args, **kwargs):
"""
Handles message events from the AD2.
"""
msg = kwargs['message']

print sender, msg.raw

if __name__ == '__main__':
main()

+ 51
- 0
examples/ssl_socket.py View File

@@ -0,0 +1,51 @@
import time
from pyad2 import AD2
from pyad2.devices import SocketDevice

def main():
"""
Example application that opens a device that has been exposed to the network
with ser2sock and SSL encryption and authentication.
"""
try:
# Retrieve an AD2 device that has been exposed with ser2sock on localhost:10000.
ssl_device = SocketDevice(interface=('localhost', 10000))

# Enable SSL and set the certificates to be used.
#
# The key/cert attributes can either be a filesystem path or an X509/PKey
# object from pyopenssl.
ssl_device.ssl = True
ssl_device.ssl_key = 'cert.key' # Client private key
ssl_device.ssl_certificate = 'cert.pem' # Client certificate
ssl_device.ssl_ca = 'ca.pem' # CA certificate

device = AD2(ssl_device)

# Set up an event handler and open the device
device.on_message += handle_message
device.open()

time.sleep(1) # Allow time for SSL handshake to complete.
device.get_config()

# Wait for events.
while True:
time.sleep(1)

except Exception, ex:
print 'Exception:', ex

finally:
device.close()

def handle_message(sender, *args, **kwargs):
"""
Handles message events from the AD2.
"""
msg = kwargs['message']

print sender, msg.raw

if __name__ == '__main__':
main()

+ 73
- 0
examples/virtual_zone_expander.py View File

@@ -0,0 +1,73 @@
import time
from pyad2 import AD2
from pyad2.devices import USBDevice

def main():
"""
Example application that periodically faults a virtual zone and then
restores it.

This is an advanced feature that allows you to emulate a virtual zone. When
the AD2 is configured to emulate a relay expander we can fault and restore
those zones programmatically at will. These events can also be seen by others,
such as home automation platforms which allows you to connect other devices or
services and monitor them as you would any pyhysical zone.

For example, you could connect a ZigBee device and receiver and fault or
restore it's zone(s) based on the data received.

In order for this to happen you need to perform a couple configuration steps:

1. Enable zone expander emulation on your AD2 device by hitting '!' in a
terminal and going through the prompts.
2. Enable the zone expander in your panel programming.
"""
try:
# Retrieve the first USB device
device = AD2(USBDevice.find())

# Set up an event handlers and open the device
device.on_zone_fault += handle_zone_fault
device.on_zone_restore += handle_zone_restore

device.open()
device.get_config()

# Wait for events.
last_update = time.time()
while True:
if time.time() - last_update > 10:
# Fault zone 41 every 10 seconds.
device.fault_zone(41)

last_update = time.time()

time.sleep(1)

except Exception, ex:
print 'Exception:', ex

finally:
device.close()

def handle_zone_fault(sender, *args, **kwargs):
"""
Handles zone fault messages.
"""
zone = kwargs['zone']

print 'zone faulted', zone

# Restore the zone
sender.clear_zone(zone)

def handle_zone_restore(sender, *args, **kwargs):
"""
Handles zone restore messages.
"""
zone = kwargs['zone']

print 'zone cleared', zone

if __name__ == '__main__':
main()

Loading…
Cancel
Save