Browse Source

add docs.. refactor to use a helper function instead of rolling

it's own..  also make sure getegress fully works.. more tests need
to be written..
ssh-lenovo
John-Mark Gurney 5 years ago
parent
commit
55d9a7cda8
1 changed files with 78 additions and 14 deletions
  1. +78
    -14
      vlanmang.py

+ 78
- 14
vlanmang.py View File

@@ -35,6 +35,29 @@ _mvc = MibViewController(_mbuilder)
# 1.0.8802.1.1.2.1.4.1.1 aka LLDP-MIB, lldpRemTable

class SwitchConfig(object):
'''This is a simple object to store switch configuration for
the checkchanges() function.

host -- The host of the switch you are maintaining configuration of.

community -- Either the SNMPv1 community name or a
pysnmp.hlapi.UsmUserData object, either of which can write to the
necessary MIBs to configure the VLANs of the switch.

vlanconf -- This is a dictionary w/ vlans as the key. Each value has
a dictionary that contains keys, 'u' or 't', each of which
contains the port that traffic should be sent untagged ('u') or
tagged ('t'). Note that the Pvid (vlan of traffic that is
received when untagged), is set to match the 'u' definition. The
port is either an integer, which maps directly to the switch's
index number, or it can be a string, which will be looked up via
the IF-MIB::ifName table.

ignports -- Ports that will be ignored and not required to be
configured. List any ports that will not be active here, such as
any unused lag ports.
'''

def __init__(self, host, community, vlanconf, ignports):
self._host = host
self._community = community
@@ -58,33 +81,40 @@ class SwitchConfig(object):
return self._ignports

def getportlist(self, lookupfun):
'''Return a set of all the ports indexes in data.'''
'''Return a set of all the ports indexes in data. This
includes, both vlanconf and ignports. Any ports using names
will be resolved by being passed to the provided lookupfun.'''

res = set()
res = []

for id in self._vlanconf:
res.update(self._vlanconf[id].get('u', []))
res.update(self._vlanconf[id].get('t', []))
res.extend(self._vlanconf[id].get('u', []))
res.extend(self._vlanconf[id].get('t', []))

# add in the ignore ports
res.update(self.ignports)

# filter out the strings
strports = set(x for x in res if isinstance(x, str))
res.extend(self.ignports)

res.update(lookupfun(x) for x in strports)
res.difference_update(strports)
# eliminate dups so that lookupfun isn't called as often
res = set(res)

return res
return set(getidxs(res, lookupfun))

def _octstrtobits(os):
num = 1
'''Convert a string into a list of bits. Easier to figure out what
ports are set.'''

num = 1 # leading 1 to make sure leading zeros are not stripped
for i in str(os):
num = (num << 8) | ord(i)

return bin(num)[3:]

def _intstobits(*ints):
'''Convert the int args to a string of bits in the expected format
that SNMP expects for them. The results will be a string of '1's
and '0's where the first one represents 1, and second one
representing 2 and so on.'''

v = 0
for i in ints:
v |= 1 << i
@@ -95,6 +125,9 @@ def _intstobits(*ints):
return ''.join(r)

def _cmpbits(a, b):
'''Compare two strings of bits to make sure they are equal.
Trailing 0's are ignored.'''

try:
last1a = a.rindex('1')
except ValueError:
@@ -116,6 +149,20 @@ def _cmpbits(a, b):
import vlanmang

def checkchanges(module):
'''Function to check for any differences between the switch, and the
configured state.

The parameter module is a string to the name of a python module. It
will be imported, and any names that reference a vlanmang.SwitchConfig
class will be validate that the configuration matches. If it does not,
the returned list will contain a set of tuples, each one containing
(verb, arg1, arg2, switcharg2). verb is what needs to be changed.
arg1 is either the port (for setting Pvid), or the VLAN that needs to
be configured. arg2 is what it needs to be set to. switcharg2 is
what the switch is currently configured to, so that you can easily
see what the effect of the configuration change is.
'''

mod = importlib.import_module(module)
mods = [ i for i in mod.__dict__.itervalues() if isinstance(i, vlanmang.SwitchConfig) ]

@@ -159,11 +206,20 @@ def checkchanges(module):
return res, switch

def getidxs(lst, lookupfun):
'''Take a list of ports, and if any are a string, replace them w/
the value returned by lookupfun(s).

Note that duplicates are not detected or removed, both in the
original list, and the values returned by the lookup function
may duplicate other values in the list.'''

return [ lookupfun(i) if isinstance(i, str) else i for i in lst ]

def getpvidmapping(data, lookupfun):
'''Return a mapping from vlan based table to a port: vlan
dictionary.'''
dictionary. This only looks at that untagged part of the vlan
configuration, and is used for finding what a port's Pvid should
be.'''

res = []
for id in data:
@@ -175,6 +231,10 @@ def getpvidmapping(data, lookupfun):
return dict(res)

def getegress(data, lookupfun):
'''Return a dictionary, keyed by VLAN id with a string of the ports
that need to be enagled for egress. This include both tagged and
untagged traffic.'''

r = {}
for id in data:
r[id] = _intstobits(*(getidxs(data[id].get('u', []),
@@ -561,6 +621,8 @@ class _TestSNMPSwitch(unittest.TestCase):
# that a switch
switch = SNMPSwitch(None, None)

lookup = { x: chr(x) for x in xrange(1, 10) }

# when getCmd returns tooBig when too many oids are asked for
def custgetcmd(eng, cd, targ, contextdata, *oids):
# induce a too big error
@@ -569,6 +631,8 @@ class _TestSNMPSwitch(unittest.TestCase):
else:
#import pdb; pdb.set_trace()
[ oid.resolveWithMib(_mvc) for oid in oids ]
oids = [ ObjectType(x[0], OctetString(lookup[x[0][-1]])) for x in oids ]
[ oid.resolveWithMib(_mvc) for oid in oids ]
res = ( None, None, None, oids )

return iter([res])
@@ -579,7 +643,7 @@ class _TestSNMPSwitch(unittest.TestCase):
res = switch.getegress(*xrange(1, 10))

# will still return the complete set of results
self.assertEqual(res, { x: '' for x in xrange(1, 10) })
self.assertEqual(res, { x: _octstrtobits(lookup[x]) for x in xrange(1, 10) })

_skipSwitchTests = True



Loading…
Cancel
Save