import os import sys import unittest from ctypes import * __all__ = [ 'KVM' ] _kvm = CDLL('libkvm.so') class kvm_t(Structure): pass class timespec(Structure): _fields_ = [ ('tv_sec', c_ulong), ('tv_nsec', c_long), ] KVM_K_UNSIGNED_INT = 1 KVM_K_SIGNED_INT = 2 _bytesizes = [ 1, 2, 4, 8 ] _ttvlookup = [ ((KVM_K_UNSIGNED_INT, x), globals()['c_uint%d' % (x * 8)]) for x in _bytesizes ] + [ ((KVM_K_SIGNED_INT, x), globals()['c_int%d' % (x * 8)]) for x in _bytesizes ] _ttvlookup = { k: POINTER(v) for k, v in _ttvlookup } kvm_t_p = POINTER(kvm_t) kvm_iter_struct_t = CFUNCTYPE(c_int, c_char_p, c_int, c_size_t, POINTER(c_char), c_void_p) _funs = dict( kvm_open=(kvm_t_p, (c_char_p, c_char_p, c_char_p, c_int, c_char_p)), kvm_close=(c_int, (kvm_t_p,)), kvm_geterr=(c_char_p, (kvm_t_p,)), kvm_structsize=(c_ssize_t, (kvm_t_p, c_char_p)), kvm_iterstruct=(c_int, (kvm_t_p, c_char_p, c_void_p, kvm_iter_struct_t, c_void_p)), ) for k, v in _funs.items(): f = getattr(_kvm, k) f.restype, f.argtypes = v def _fetchmembers(kd, typ, obj): res = [] def func(memb, type, len, buf, arg): t = _ttvlookup[(type, len)] res.append((memb, cast(buf, t)[0])) return 0 cbfun = kvm_iter_struct_t(func) r = _kvm.kvm_iterstruct(kd, typ, byref(obj), cbfun, None) if r == -1: err = _kvm.kvm_geterr(kd) raise RuntimeError(err.decode('us-ascii')) return res class KVM(object): def __init__(self): self.kd = _kvm.kvm_open(None, None, None, os.O_RDONLY, None) def __enter__(self): return self def close(self): if self.kd is not None: _kvm.kvm_close(self.kd) self.kd = None def __exit__(self, a, b, c): self.close() def _iferr(self, fun, *args): res = fun(*args) if res == -1: err = _kvm.kvm_geterr(self.kd) raise RuntimeError(err.decode('us-ascii')) return res def structsize(self, typ): return self._iferr(_kvm.kvm_structsize, self.kd, typ.encode('us-ascii')) def getstruct(self, typ, obj): res = _fetchmembers(self.kd, typ.encode('us-ascii'), obj) return { k.decode('us-ascii'): v for k, v in res } def __del__(self): self.close() def deb(*args): if True: #pragma: no cover print(*args) sys.stdout.flush() class _TestCase(unittest.TestCase): def setUp(self): self.kd = _kvm.kvm_open(None, None, None, os.O_RDONLY, None) def tearDown(self): _kvm.kvm_close(self.kd) self.kd = None def test_ss(self): self.assertEqual(_kvm.kvm_structsize(self.kd, b'struct timespec'), 16) def test_iter(self): exp = [ (b'tv_sec', 0x1234), (b'tv_nsec', 0xabcd), ] ts = timespec(0x1234, 0xabcd) res = _fetchmembers(self.kd, b'struct timespec', ts) self.assertEqual(res, exp) def test_class_errs(self): kd = KVM() self.assertEqual(kd.structsize('struct timespec'), 16) sec = 1839238 nsec = 19849873 ts = timespec(sec, nsec) res = dict( tv_sec=sec, tv_nsec=nsec, ) self.assertEqual(kd.getstruct('struct timespec', ts), res) def test_class(self): kd = KVM() with KVM() as kd: with self.assertRaisesRegex(RuntimeError, 'unable to find kernel type: struct flksjdi'): kd.structsize('struct flksjdi') ts = timespec(0, 0) with self.assertRaisesRegex(RuntimeError, 'unable to find kernel type: struct weoiud'): kd.getstruct('struct weoiud', ts)