@@ -51,6 +51,12 @@ __all__ = [
'SNMPSwitch',
]
if False:
from pysnmp import debug
# use specific flags or 'all' for full debugging
debug.setLogger(debug.Debug('dsp', 'msgproc', 'secmod'))
_mbuilder = MibBuilder()
_mvc = MibViewController(_mbuilder)
@@ -290,9 +296,8 @@ def checkchanges(module):
switchuntagged = switch.getuntagged(*vlans)
untagged = getuntagged(i.vlanconf, lufun)
for i in vlans:
if not _cmpbits(switchegress[i], egress[i]):
if not _cmpbits(switchegress[i], egress[i]) or not _cmpbits(switchuntagged[i], untagged[i]) :
res.append((switch, name, 'setegress', i, egress[i], switchegress[i]))
if not _cmpbits(switchuntagged[i], untagged[i]):
res.append((switch, name, 'setuntagged', i, untagged[i], switchuntagged[i]))
return res
@@ -344,10 +349,14 @@ def getuntagged(data, lookupfun):
return r
_lenovo_ce0128t = (1, 3, 6, 1, 4, 1, 19046, 1, 7, 43) # LENOVO-REF-MIB::ce0128t
class SNMPSwitch(object):
'''A class for manipulating switches via standard SNMP MIBs.'''
def __init__(self, host, community=None, username=None, authKey=None, authProtocol=usmHMACSHAAuthProtocol, privKey=None, privProtocol=None):
def __init__(self, host, community=None, username=None, authKey=None,
authProtocol=usmHMACSHAAuthProtocol, privKey=None,
privProtocol=None, getmanybroken=None):
'''Create a instance to read data and program a switch via
SNMP.
@@ -399,10 +408,26 @@ class SNMPSwitch(object):
self._targ = UdpTransportTarget((host, 161), timeout=10)
self._getmanybroken = False
if getmanybroken is None:
if tuple(self._get(('SNMPv2-MIB',
'sysObjectID', 0))) == _lenovo_ce0128t:
self._getmanybroken = True
else:
self._getmanybroken = bool(getmanybroken)
def __repr__(self): # pragma: no cover
return '<SNMPSwitch: auth=%s, targ=%s>' % (repr(self._auth), repr(self._targ))
def _getmany(self, *oids):
if self._getmanybroken:
return [ self._getmany_real(x)[0] for x in oids ]
else:
return self._getmany_real(*oids)
def _getmany_real(self, *oids):
woids = [ ObjectIdentity(*oid) for oid in oids ]
[ oid.resolveWithMib(_mvc) for oid in woids ]
@@ -790,7 +815,8 @@ class _TestMisc(unittest.TestCase):
# That a switch passed a community string
commstr = 'foobar'
switch = SNMPSwitch(None, community=commstr)
switch = SNMPSwitch(None, community=commstr,
getmanybroken=False)
# That getCmd returns a valid object
vb = [ [ None, None ] ]
@@ -823,7 +849,8 @@ class _TestMisc(unittest.TestCase):
# That a switch passed v3 auth data
username = 'someuser'
authKey = 'authKey'
switch = SNMPSwitch(None, username=username, authKey=authKey)
switch = SNMPSwitch(None, username=username, authKey=authKey,
getmanybroken=False)
# That getCmd returns a valid object
vb = [ [ None, None ] ]
@@ -847,7 +874,7 @@ class _TestMisc(unittest.TestCase):
# that it can be called with a privKey
privKey = 'privKey'
switch = SNMPSwitch(None, username=username, authKey=authKey,
privKey=privKey)
privKey=privKey, getmanybroken=False )
# and that UsmUserData was called w/ the correct args
uud.assert_called_with(username, authKey, privKey,
@@ -859,7 +886,8 @@ class _TestMisc(unittest.TestCase):
# that it can be called with an alternate privProtocol
switch = SNMPSwitch(None, username=username, authKey=authKey,
privKey=privKey, privProtocol=usmDESPrivProtocol)
privKey=privKey, privProtocol=usmDESPrivProtocol,
getmanybroken=False)
# and that UsmUserData was called w/ the correct args
uud.assert_called_with(username, authKey, privKey,
@@ -872,7 +900,7 @@ class _TestMisc(unittest.TestCase):
# that it can be called with an alternate authProtocol
switch = SNMPSwitch(None, username=username, authKey=authKey,
authProtocol=usmHMACMD5AuthProtocol, privKey=privKey,
privProtocol=usmDESPrivProtocol)
privProtocol=usmDESPrivProtocol, getmanybroken=False )
# and that UsmUserData was called w/ the correct args
uud.assert_called_with(username, authKey, privKey,
@@ -907,6 +935,7 @@ class _TestMisc(unittest.TestCase):
gvlans.return_value = iter([ 1, 5 ])
res = checkchanges('data')
validres = [ ('createvlan', 283, '', '') ]
# make sure it needs to get created
@@ -982,6 +1011,7 @@ class _TestMisc(unittest.TestCase):
'1' * 10),
('setegress', 5, '1' * 8 + '0' * 11 + '11' + '0' * 8 +
'1', '1' * 10),
('setuntagged', 5, '1' * 8, '1' * 8 + '0' * 10),
]
self.assertEqual(set(res), set(validres))
@@ -1008,7 +1038,7 @@ class _TestSNMPSwitch(unittest.TestCase):
@mock.patch('vlanmang.getCmd')
def test_getmany_nosuchinstance(self, gc, cd):
# that a switch
switch = SNMPSwitch(None, community=None)
switch = SNMPSwitch(None, community=None, getmanybroken=False )
lookup = { x: chr(x) for x in range(1, 10) }
@@ -1021,7 +1051,7 @@ class _TestSNMPSwitch(unittest.TestCase):
@mock.patch('vlanmang.getCmd')
def test_getmany_nosuchobject(self, gc, cd):
# that a switch
switch = SNMPSwitch(None, community=None)
switch = SNMPSwitch(None, community=None, getmanybroken=False )
lookup = { x: chr(x) for x in range(1, 10) }
@@ -1034,7 +1064,7 @@ class _TestSNMPSwitch(unittest.TestCase):
@mock.patch('vlanmang.getCmd')
def test_getmany(self, gc, cd):
# that a switch
switch = SNMPSwitch(None, community=None)
switch = SNMPSwitch(None, community=None, getmanybroken=False )
lookup = { x: chr(x) for x in range(1, 10) }