|
|
@@ -27,10 +27,14 @@ |
|
|
|
# |
|
|
|
|
|
|
|
from pysnmp.hlapi import * |
|
|
|
from pysnmp.proto.rfc1905 import NoSuchInstance |
|
|
|
from pysnmp.proto.rfc1905 import NoSuchInstance, NoSuchObject |
|
|
|
from pysnmp.smi.builder import MibBuilder |
|
|
|
from pysnmp.smi.view import MibViewController |
|
|
|
|
|
|
|
if False: |
|
|
|
from pysnmp import debug |
|
|
|
debug.setLogger(debug.Debug('mibbuild')) |
|
|
|
|
|
|
|
import importlib |
|
|
|
import itertools |
|
|
|
import mock |
|
|
@@ -50,6 +54,11 @@ __all__ = [ |
|
|
|
_mbuilder = MibBuilder() |
|
|
|
_mvc = MibViewController(_mbuilder) |
|
|
|
|
|
|
|
# Doesn't work because internally uses a different MibBuilder(), and |
|
|
|
# therefore the class object is different. |
|
|
|
#for m, n in {('SNMPv2-TC', 'DisplayString')}: |
|
|
|
# locals()[n] = _mbuilder.importSymbols(m, n)[0] |
|
|
|
|
|
|
|
# received packages |
|
|
|
# pvid: dot1qPvid |
|
|
|
# |
|
|
@@ -109,11 +118,12 @@ class SwitchConfig(object): |
|
|
|
any unused lag ports. |
|
|
|
''' |
|
|
|
|
|
|
|
def __init__(self, host, authargs, vlanconf, ignports): |
|
|
|
def __init__(self, host, authargs, vlanconf, ignports, settings=[]): |
|
|
|
self._host = host |
|
|
|
self._authargs = authargs |
|
|
|
self._vlanconf = vlanconf |
|
|
|
self._ignports = ignports |
|
|
|
self._settings = settings |
|
|
|
|
|
|
|
@property |
|
|
|
def host(self): |
|
|
@@ -131,6 +141,10 @@ class SwitchConfig(object): |
|
|
|
def ignports(self): |
|
|
|
return self._ignports |
|
|
|
|
|
|
|
@property |
|
|
|
def settings(self): |
|
|
|
return self._settings |
|
|
|
|
|
|
|
def getportlist(self, lookupfun): |
|
|
|
'''Return a set of all the ports indexes in data. This |
|
|
|
includes, both vlanconf and ignports. Any ports using names |
|
|
@@ -239,6 +253,13 @@ def checkchanges(module): |
|
|
|
raise ValueError('missing or extra ports found: %s' % |
|
|
|
repr(ports.symmetric_difference(portlist))) |
|
|
|
|
|
|
|
# compare settings |
|
|
|
settings = list(i.settings) |
|
|
|
switchsettings = list(switch.getsettings(*(x[0] for x in settings))) |
|
|
|
|
|
|
|
res.extend((switch, name, 'setsetting', s, tobeval, curval) for (s, tobeval), |
|
|
|
curval in zip(settings, switchsettings) if tobeval != curval) |
|
|
|
|
|
|
|
# compare pvid |
|
|
|
pvidmap = getpvidmapping(i.vlanconf, lufun) |
|
|
|
switchpvid = switch.getpvid() |
|
|
@@ -389,6 +410,13 @@ class SNMPSwitch(object): |
|
|
|
if len(varBinds) != len(oids): # pragma: no cover |
|
|
|
raise ValueError('too many return values') |
|
|
|
|
|
|
|
nsi = [ x for x in varBinds if isinstance(x[1], |
|
|
|
(NoSuchInstance, NoSuchObject)) ] |
|
|
|
if nsi: |
|
|
|
raise ValueError( |
|
|
|
'No such instance/object: %s' % |
|
|
|
repr(nsi[0][0].getMibSymbol())) |
|
|
|
|
|
|
|
return varBinds |
|
|
|
|
|
|
|
def _get(self, oid): |
|
|
@@ -396,16 +424,19 @@ class SNMPSwitch(object): |
|
|
|
|
|
|
|
var = varBinds[0][1] |
|
|
|
|
|
|
|
if isinstance(var, NoSuchInstance): |
|
|
|
raise ValueError(repr(var)) |
|
|
|
|
|
|
|
return var |
|
|
|
|
|
|
|
def _set(self, oid, value): |
|
|
|
oid = ObjectIdentity(*oid) |
|
|
|
oid.resolveWithMib(_mvc) |
|
|
|
|
|
|
|
if isinstance(value, int): |
|
|
|
#print(repr(tuple(oid))) |
|
|
|
#if tuple(oid) == (1, 3, 6, 1, 4, 1, 4526, 11, 1, 2, 15, 10, 1, 2, 0): |
|
|
|
if True: |
|
|
|
#print('yes') |
|
|
|
#import pdb; pdb.set_trace() |
|
|
|
value = oid.getMibNode().getSyntax().clone(value) |
|
|
|
elif isinstance(value, int): |
|
|
|
value = Integer(value) |
|
|
|
elif isinstance(value, bytes): |
|
|
|
value = OctetString(value) |
|
|
@@ -448,6 +479,20 @@ class SNMPSwitch(object): |
|
|
|
for varBind in varBinds: |
|
|
|
yield varBind |
|
|
|
|
|
|
|
_convfun = dict(DisplayString=str, Integer32=int, Integer=int, Gauge32=int) |
|
|
|
|
|
|
|
def getsettings(self, *oids): |
|
|
|
'''Return the values for the passed in oids. It is best |
|
|
|
to pass in as many oids as possible, as a bulk walk get |
|
|
|
will be used for effeciency.''' |
|
|
|
|
|
|
|
#a = list(self._getmany(*oids)) |
|
|
|
#print(repr(a)) |
|
|
|
return (self._convfun[y.__class__.__name__](y) for x, y in self._getmany(*oids)) |
|
|
|
|
|
|
|
def setsetting(self, oid, value): |
|
|
|
return self._set(oid, value) |
|
|
|
|
|
|
|
def getportmapping(self): |
|
|
|
'''Return a port name mapping. Keys are the port index |
|
|
|
and the value is the name from the IF-MIB::ifName entry.''' |
|
|
@@ -778,12 +823,13 @@ class _TestMisc(unittest.TestCase): |
|
|
|
privProtocol=usmDESPrivProtocol) |
|
|
|
|
|
|
|
#@unittest.skip('foo') |
|
|
|
@mock.patch('vlanmang.SNMPSwitch.getsettings') |
|
|
|
@mock.patch('vlanmang.SNMPSwitch.getuntagged') |
|
|
|
@mock.patch('vlanmang.SNMPSwitch.getegress') |
|
|
|
@mock.patch('vlanmang.SNMPSwitch.getpvid') |
|
|
|
@mock.patch('vlanmang.SNMPSwitch.getportmapping') |
|
|
|
@mock.patch('importlib.import_module') |
|
|
|
def test_checkchanges(self, imprt, portmapping, gpvid, gegress, guntagged): |
|
|
|
def test_checkchanges(self, imprt, portmapping, gpvid, gegress, guntagged, gsettings): |
|
|
|
# that import returns the test data |
|
|
|
imprt.side_effect = itertools.repeat(self._test_data) |
|
|
|
|
|
|
@@ -825,8 +871,21 @@ class _TestMisc(unittest.TestCase): |
|
|
|
283: '00000000111111111110011', |
|
|
|
} ] |
|
|
|
|
|
|
|
# that the switch's settings provided |
|
|
|
settings = [ |
|
|
|
('somesettingtrue', False), |
|
|
|
('annumberset', 6), |
|
|
|
('nochange', 100), |
|
|
|
] |
|
|
|
|
|
|
|
gsettings.side_effect = itertools.repeat(tuple(x[1] for x in |
|
|
|
settings)) |
|
|
|
|
|
|
|
res = checkchanges('data') |
|
|
|
|
|
|
|
# That gsettings was called |
|
|
|
gsettings.assert_called_with(*(x[0] for x in settings)) |
|
|
|
|
|
|
|
# Make sure that the first one are all instances of SNMPSwitch |
|
|
|
# XXX make sure args for them are correct. |
|
|
|
self.assertTrue(all(isinstance(x[0], SNMPSwitch) for x in res)) |
|
|
@@ -835,15 +894,17 @@ class _TestMisc(unittest.TestCase): |
|
|
|
self.assertTrue(all(x[1] == 'distswitch' for x in res)) |
|
|
|
|
|
|
|
res = [ x[2:] for x in res ] |
|
|
|
validres = [ ('setpvid', x, 5, 283) for x in range(1, 9) ] + \ |
|
|
|
validres = [ ('setsetting', 'somesettingtrue', True, False), |
|
|
|
('setsetting', 'annumberset', 42, 6) ] + \ |
|
|
|
[ ('setpvid', x, 5, 283) for x in range(1, 9) ] + \ |
|
|
|
[ ('setpvid', 20, 1, 283), |
|
|
|
('setpvid', 21, 1, 283), |
|
|
|
('setpvid', 30, 1, 5), |
|
|
|
('setegress', 1, '0' * 19 + '11' + '0' * 8 + '1', |
|
|
|
('setpvid', 21, 1, 283), |
|
|
|
('setpvid', 30, 1, 5), |
|
|
|
('setegress', 1, '0' * 19 + '11' + '0' * 8 + '1', |
|
|
|
'1' * 10), |
|
|
|
('setuntagged', 1, '0' * 19 + '11' + '0' * 8 + '1', |
|
|
|
('setuntagged', 1, '0' * 19 + '11' + '0' * 8 + '1', |
|
|
|
'1' * 10), |
|
|
|
('setegress', 5, '1' * 8 + '0' * 11 + '11' + '0' * 8 + |
|
|
|
('setegress', 5, '1' * 8 + '0' * 11 + '11' + '0' * 8 + |
|
|
|
'1', '1' * 10), |
|
|
|
] |
|
|
|
|
|
|
@@ -867,6 +928,32 @@ class _TestSNMPSwitch(unittest.TestCase): |
|
|
|
# and call _getmany w/ the correct arg |
|
|
|
gm.assert_called_with(arg) |
|
|
|
|
|
|
|
@mock.patch('pysnmp.hlapi.ContextData') |
|
|
|
@mock.patch('vlanmang.getCmd') |
|
|
|
def test_getmany_nosuchinstance(self, gc, cd): |
|
|
|
# that a switch |
|
|
|
switch = SNMPSwitch(None, community=None) |
|
|
|
|
|
|
|
lookup = { x: chr(x) for x in range(1, 10) } |
|
|
|
|
|
|
|
# when getCmd returns NoSuchInstance |
|
|
|
gc.side_effect = [iter([[None, None, None, [ None, NoSuchInstance() ]]])] |
|
|
|
|
|
|
|
self.assertRaises(ValueError, switch.getsettings, ('IF-MIB', 'ifName')) |
|
|
|
|
|
|
|
@mock.patch('pysnmp.hlapi.ContextData') |
|
|
|
@mock.patch('vlanmang.getCmd') |
|
|
|
def test_getmany_nosuchobject(self, gc, cd): |
|
|
|
# that a switch |
|
|
|
switch = SNMPSwitch(None, community=None) |
|
|
|
|
|
|
|
lookup = { x: chr(x) for x in range(1, 10) } |
|
|
|
|
|
|
|
# when getCmd returns NoSuchInstance |
|
|
|
gc.side_effect = [iter([[None, None, None, [ None, NoSuchObject() ]]])] |
|
|
|
|
|
|
|
self.assertRaises(ValueError, switch.getsettings, ('IF-MIB', 'ifName')) |
|
|
|
|
|
|
|
@mock.patch('pysnmp.hlapi.ContextData') |
|
|
|
@mock.patch('vlanmang.getCmd') |
|
|
|
def test_getmany(self, gc, cd): |
|
|
@@ -928,6 +1015,19 @@ class _TestSwitch(unittest.TestCase): |
|
|
|
self.assertEqual(switch.findport('g1'), 1) |
|
|
|
self.assertEqual(switch.findport('l1'), 14) |
|
|
|
|
|
|
|
def test_settings(self): |
|
|
|
switch = self.switch |
|
|
|
|
|
|
|
vals = [ |
|
|
|
('SNMPv2-MIB', 'sysDescr', 0), |
|
|
|
('SNMPv2-MIB', 'sysServices', 0), |
|
|
|
] |
|
|
|
|
|
|
|
a, b = switch.getsettings(*vals) |
|
|
|
#raise Exception('%s %s' % (repr(a), repr(b))) |
|
|
|
self.assertEqual(a, 'GS108Tv2') |
|
|
|
self.assertEqual(b, 2) |
|
|
|
|
|
|
|
def test_portnames(self): |
|
|
|
switch = self.switch |
|
|
|
|
|
|
|