#!/usr/bin/env python # -*- coding: utf-8 -*- from pysnmp.hlapi import * from pysnmp.smi.builder import MibBuilder from pysnmp.smi.view import MibViewController import random import unittest _mbuilder = MibBuilder() _mvc = MibViewController(_mbuilder) # received packages # pvid: dot1qPvid # # tx packets: # dot1qVlanStaticEgressPorts # dot1qVlanStaticUntaggedPorts # # vlans: # dot1qVlanCurrentTable # lists ALL vlans, including baked in ones # # note that even though an snmpwalk of dot1qVlanStaticEgressPorts # skips over other vlans (only shows statics), the other vlans (1,2,3) # are still accessible via that oid # # LLDP: # 1.0.8802.1.1.2.1.4.1.1 aka LLDP-MIB, lldpRemTable class SNMPSwitch(object): def __init__(self, host, community): self._eng = SnmpEngine() self._cd = CommunityData(community, mpModel=0) self._targ = UdpTransportTarget((host, 161)) def _get(self, oid): oid = ObjectIdentity(*oid) oid.resolveWithMib(_mvc) errorInd, errorStatus, errorIndex, varBinds = \ next(getCmd(self._eng, self._cd, self._targ, ContextData(), ObjectType(oid))) if errorInd: # pragma: no cover raise ValueError(errorIndication) elif errorStatus: raise ValueError('%s at %s' % (errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex)-1][0] or '?')) else: if len(varBinds) != 1: # pragma: no cover raise ValueError('too many return values') varBind = varBinds[0] return varBind[1] def _set(self, oid, value): oid = ObjectIdentity(*oid) oid.resolveWithMib(_mvc) if isinstance(value, (int, long)): value = Integer(value) elif isinstance(value, str): value = OctetString(value) errorInd, errorStatus, errorIndex, varBinds = \ next(setCmd(self._eng, self._cd, self._targ, ContextData(), ObjectType(oid, value))) if errorInd: # pragma: no cover raise ValueError(errorIndication) elif errorStatus: # pragma: no cover raise ValueError('%s at %s' % (errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex)-1][0] or '?')) else: for varBind in varBinds: if varBind[1] != value: # pragma: no cover raise RuntimeError('failed to set: %s' % ' = '.join([x.prettyPrint() for x in varBind])) def _walk(self, *oid): oid = ObjectIdentity(*oid) # XXX - keep these, this might stop working, no clue what managed to magically make things work # ref: http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html#mib-objects-to-pdu-var-binds # mibdump.py --mib-source '/Users/jmg/Nextcloud/Documents/user manuals/netgear/gs7xxt-v6.3.1.19-mibs' --mib-source /usr/share/snmp/mibs --rebuild rfc1212 pbridge vlan #oid.addAsn1MibSource('/usr/share/snmp/mibs', '/Users/jmg/Nextcloud/Documents/user manuals/netgear/gs7xxt-v6.3.1.19-mibs') oid.resolveWithMib(_mvc) for (errorInd, errorStatus, errorIndex, varBinds) in nextCmd( self._eng, self._cd, self._targ, ContextData(), ObjectType(oid), lexicographicMode=False): if errorInd: # pragma: no cover raise ValueError(errorIndication) elif errorStatus: # pragma: no cover raise ValueError('%s at %s' % (errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex)-1][0] or '?')) else: for varBind in varBinds: yield varBind def findport(self, name): return [ x[0][-1] for x in self._walk('IF-MIB', 'ifName') if str(x[1]) == name ][0] def getvlanname(self, vlan): v = self._get(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', vlan)) return str(v).decode('utf-8') def createvlan(self, vlan, name): # createAndGo(4) self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticRowStatus', int(vlan)), 4) self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', int(vlan)), name) def deletevlan(self, vlan): self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticRowStatus', int(vlan)), 6) # destroy(6) def getvlans(self): return (x[0][-1] for x in self._walk('Q-BRIDGE-MIB', 'dot1qVlanStatus')) def staticvlans(self): return (x[0][-1] for x in self._walk('Q-BRIDGE-MIB', 'dot1qVlanStaticName')) class Test(unittest.TestCase): def setUp(self): args = open('test.creds').read().split() self.switch = SNMPSwitch(*args) def test_unpacktable(self): pass def test_misc(self): switch = self.switch self.assertEqual(switch.findport('g1'), 1) self.assertEqual(switch.findport('l1'), 14) def test_vlan(self): switch = self.switch existingvlans = set(switch.getvlans()) while True: testvlan = random.randint(1,4095) if testvlan not in existingvlans: break # Test that getting a non-existant vlans raises an exception self.assertRaises(ValueError, switch.getvlanname, testvlan) self.assertTrue(set(switch.staticvlans()).issubset(existingvlans)) testname = 'Sometestname' # Create test vlan switch.createvlan(testvlan, testname) try: # make sure the test vlan was created self.assertIn(testvlan, set(switch.staticvlans())) self.assertEqual(testname, switch.getvlanname(testvlan)) finally: switch.deletevlan(testvlan)