From 7479d41931d41bc8b785ee8ea699bd6040fc5962 Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Wed, 18 Sep 2019 15:43:25 -0700 Subject: [PATCH] first cut at a vlan manager utility. --- .gitignore | 3 + Makefile | 7 ++ requirements.txt | 2 + vlanmang.py | 166 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 178 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 requirements.txt create mode 100644 vlanmang.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..851a20d --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.pyc +p +.coverage diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8aa4d8c --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +MODULES=vlanmang.py + +test: + (echo $(MODULES) | entr sh -c 'python -m coverage run -m unittest $(basename $(MODULES)) && coverage report --omit=p/\* -m -i') + +env: + (virtualenv p && . ./p/bin/activate && pip install -r requirements.txt) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e8535b0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +pysnmp +coverage diff --git a/vlanmang.py b/vlanmang.py new file mode 100644 index 0000000..2ecefbc --- /dev/null +++ b/vlanmang.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from pysnmp.hlapi import * +from pysnmp.smi.builder import MibBuilder +from pysnmp.smi.view import MibViewController + +import random +import unittest + +_mbuilder = MibBuilder() +_mvc = MibViewController(_mbuilder) + +# received packages +# pvid: dot1qPvid +# +# tx packets: +# dot1qVlanStaticEgressPorts +# dot1qVlanStaticUntaggedPorts +# +# vlans: +# dot1qVlanCurrentTable +# lists ALL vlans, including baked in ones +# +# note that even though an snmpwalk of dot1qVlanStaticEgressPorts +# skips over other vlans (only shows statics), the other vlans (1,2,3) +# are still accessible via that oid +# +# LLDP: +# 1.0.8802.1.1.2.1.4.1.1 aka LLDP-MIB, lldpRemTable + +class SNMPSwitch(object): + def __init__(self, host, community): + self._eng = SnmpEngine() + self._cd = CommunityData(community, mpModel=0) + self._targ = UdpTransportTarget((host, 161)) + + def _get(self, oid): + oid = ObjectIdentity(*oid) + oid.resolveWithMib(_mvc) + + errorInd, errorStatus, errorIndex, varBinds = \ + next(getCmd(self._eng, self._cd, self._targ, ContextData(), ObjectType(oid))) + + if errorInd: # pragma: no cover + raise ValueError(errorIndication) + elif errorStatus: + raise ValueError('%s at %s' % + (errorStatus.prettyPrint(), errorIndex and + varBinds[int(errorIndex)-1][0] or '?')) + else: + if len(varBinds) != 1: # pragma: no cover + raise ValueError('too many return values') + + varBind = varBinds[0] + return varBind[1] + + def _set(self, oid, value): + oid = ObjectIdentity(*oid) + oid.resolveWithMib(_mvc) + + if isinstance(value, (int, long)): + value = Integer(value) + elif isinstance(value, str): + value = OctetString(value) + + errorInd, errorStatus, errorIndex, varBinds = \ + next(setCmd(self._eng, self._cd, self._targ, ContextData(), ObjectType(oid, value))) + + if errorInd: # pragma: no cover + raise ValueError(errorIndication) + elif errorStatus: # pragma: no cover + raise ValueError('%s at %s' % + (errorStatus.prettyPrint(), errorIndex and + varBinds[int(errorIndex)-1][0] or '?')) + else: + for varBind in varBinds: + if varBind[1] != value: # pragma: no cover + raise RuntimeError('failed to set: %s' % ' = '.join([x.prettyPrint() for x in varBind])) + + def _walk(self, *oid): + oid = ObjectIdentity(*oid) + # XXX - keep these, this might stop working, no clue what managed to magically make things work + # ref: http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html#mib-objects-to-pdu-var-binds + # mibdump.py --mib-source '/Users/jmg/Nextcloud/Documents/user manuals/netgear/gs7xxt-v6.3.1.19-mibs' --mib-source /usr/share/snmp/mibs --rebuild rfc1212 pbridge vlan + #oid.addAsn1MibSource('/usr/share/snmp/mibs', '/Users/jmg/Nextcloud/Documents/user manuals/netgear/gs7xxt-v6.3.1.19-mibs') + + oid.resolveWithMib(_mvc) + + for (errorInd, errorStatus, errorIndex, varBinds) in nextCmd( + self._eng, self._cd, self._targ, ContextData(), + ObjectType(oid), + lexicographicMode=False): + if errorInd: # pragma: no cover + raise ValueError(errorIndication) + elif errorStatus: # pragma: no cover + raise ValueError('%s at %s' % (errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex)-1][0] or '?')) + else: + for varBind in varBinds: + yield varBind + + def findport(self, name): + return [ x[0][-1] for x in self._walk('IF-MIB', 'ifName') if str(x[1]) == name ][0] + + def getvlanname(self, vlan): + v = self._get(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', vlan)) + + return str(v).decode('utf-8') + + def createvlan(self, vlan, name): + # createAndGo(4) + self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticRowStatus', + int(vlan)), 4) + self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', int(vlan)), + name) + + def deletevlan(self, vlan): + self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticRowStatus', + int(vlan)), 6) # destroy(6) + + def getvlans(self): + return (x[0][-1] for x in self._walk('Q-BRIDGE-MIB', 'dot1qVlanStatus')) + + def staticvlans(self): + return (x[0][-1] for x in self._walk('Q-BRIDGE-MIB', 'dot1qVlanStaticName')) + +class Test(unittest.TestCase): + def setUp(self): + args = open('test.creds').read().split() + self.switch = SNMPSwitch(*args) + + def test_unpacktable(self): + pass + + def test_misc(self): + switch = self.switch + + self.assertEqual(switch.findport('g1'), 1) + self.assertEqual(switch.findport('l1'), 14) + + def test_vlan(self): + switch = self.switch + + existingvlans = set(switch.getvlans()) + + while True: + testvlan = random.randint(1,4095) + if testvlan not in existingvlans: + break + + # Test that getting a non-existant vlans raises an exception + self.assertRaises(ValueError, switch.getvlanname, testvlan) + + self.assertTrue(set(switch.staticvlans()).issubset(existingvlans)) + + testname = 'Sometestname' + + # Create test vlan + switch.createvlan(testvlan, testname) + try: + # make sure the test vlan was created + self.assertIn(testvlan, set(switch.staticvlans())) + + self.assertEqual(testname, switch.getvlanname(testvlan)) + finally: + switch.deletevlan(testvlan)