diff --git a/blinkled/README.md b/blinkled/README.md new file mode 100644 index 0000000..2e63a41 --- /dev/null +++ b/blinkled/README.md @@ -0,0 +1,24 @@ +Blink LED +========= + +This project is to blink a LED, and have a standard cell phone camera +decode the data. + + +8b10b code +---------- + +Adding: +``` +(cd ..; git subtree add -P blinkled/encdec8b10b --squash https://github.com/olagrottvik/encdec8b10b.git master) +``` + +Updating: +``` +(cd ..; git subtree pull -P blinkled/encdec8b10b --squash https://github.com/olagrottvik/encdec8b10b.git master) +``` + +Info +---- + +[8b/10b encoding](https://en.wikipedia.org/wiki/8b/10b_encoding) diff --git a/blinkled/encdec8b10b/encdec8b10b/__init__.py b/blinkled/encdec8b10b/encdec8b10b/__init__.py index 9b50930..80a74ca 100644 --- a/blinkled/encdec8b10b/encdec8b10b/__init__.py +++ b/blinkled/encdec8b10b/encdec8b10b/__init__.py @@ -1 +1 @@ -from .core import EncDec_8B10B as EncDec8B10B \ No newline at end of file +from .core import EncDec_8B10B as EncDec8B10B diff --git a/blinkled/encdec8b10b/encdec8b10b/core.py b/blinkled/encdec8b10b/encdec8b10b/core.py index 59d8625..078f5aa 100644 --- a/blinkled/encdec8b10b/encdec8b10b/core.py +++ b/blinkled/encdec8b10b/encdec8b10b/core.py @@ -1,6 +1,39 @@ -import random +# https://en.wikipedia.org/wiki/8b/10b_encoding + +import re + +class ControlSymbol(int): + pass class EncDec_8B10B(object): + # Table 36-3 of 802.3-2018 has symbols for 1000Base-X coding + + COMMA = ControlSymbol(0xbc) + COMMA_ALTA = ControlSymbol(0x3c) + COMMA_ALTB = ControlSymbol(0xfc) + K_28_0 = ControlSymbol(0x1c) + K_28_2 = ControlSymbol(0x5c) + K_28_3 = ControlSymbol(0x7c) + K_28_4 = ControlSymbol(0x9c) + K_28_6 = ControlSymbol(0xdc) + K_23_7 = ControlSymbol(0xf7) + K_27_7 = ControlSymbol(0xfb) + K_29_7 = ControlSymbol(0xfd) + K_30_7 = ControlSymbol(0xfe) + + # Where disparity of 0 means more 0's, 1 means more 1's + # + # index of lookup strings: + # + # + # data is lowest/rightmost bit maps to A, and high bit is H + # + # format of data: + # + # data is lowest/rightmost bit transmitted first + # + # comment of each line is: + # # "" [] enc_lookup = [ "00010111001", # "00000000" -D00.0- [0] "00010101110", # "00000001" -D01.0- [1] @@ -2055,8 +2088,109 @@ class EncDec_8B10B(object): "DEC8b10bERR" # "1111111111" ] + def __init__(self): + self.__rd = 0 + self.__syncd = False + self.__pendingdata = '' + + @staticmethod + def _to10b(v): + '''convert integer to bits, in the correct order, which is + first (high) bit to be sent first.''' + + binstr = bin(v)[2:] + + return ('0' * (10 - len(binstr)) + binstr)[10:-11:-1] + + def issyncd(self): + return self.__syncd + + __syncre = re.compile('11000001..|00111110..') + + def decode(self, bstr): + '''Decode bstr, a string of 0's, and 1's to bytes. This + consumes all the bits, and stores any excess bits if some are + missing. + + Returns either None, or a byte string w/ the data. + + If a control byte is received, it is returned by itself as an + instance of ControlSymbol. + + Note that it needs to receive an EncDec_8B10B.COMMA control byte + first to establish sync, and then will data after that. + ''' + + if set(bstr) - set('01'): + raise ValueError('invalid characters in: %s' % repr(bstr)) + + data = self.__pendingdata + bstr + + if not self.__syncd: + m = self.__syncre.search(data) + if not m: + self.__pendingdata = data[-9:] + return + + self.__syncd = True + + data = data[m.start():] + + r = bytearray() + + for idx in range(0, len(data), 10): + i = data[idx:idx + 10] + if len(i) != 10: + self.__pendingdata = i + break + + try: + ctrl, d = self.dec_8b10b(int(i[::-1], 2)) + except Exception: + self.__syncd = False + self.__pendingdata = data[idx + 1:] + return + + #print(repr(ctrl), repr(d), repr(r)) + if ctrl and r: + self.__pendingdata = data[idx:] + return r + + if ctrl: + self.__pendingdata = data[idx + 10:] + return ControlSymbol(d) + + r += d.to_bytes(1, byteorder='big') + else: + self.__pendingdata = '' + + if r: + return r + + def encode(self, bstr): + '''Encode bstr into a string representing the binary (unicode + string represented by 0 and 1). The first bit to transmit is + at index 0, and so on.''' + + r = [] + rd = self.__rd + + if isinstance(bstr, ControlSymbol): + rd, c = self.enc_8b10b(bstr, rd, ctrl=1) + r.append(c) + else: + for i in bstr: + rd, c = self.enc_8b10b(i, rd) + r.append(c) + + self.__rd = rd + + return ''.join(self._to10b(x) for x in r) + @staticmethod def enc_8b10b(data_in, running_disparity, ctrl=0, verbose=False): + '''Encode a single byte to 10 bits.''' + assert data_in <= 0xFF, "Data in must be maximum one byte" encoded = int(EncDec_8B10B.enc_lookup[( ctrl << 9) + (running_disparity << 8) + data_in], 2) @@ -2068,6 +2202,8 @@ class EncDec_8B10B(object): @staticmethod def dec_8b10b(data_in, verbose=False): + '''Decode 10 bits into a byte.''' + assert data_in <= 0x3FF, "Data in must be maximum 10 bits" decoded = EncDec_8B10B.dec_lookup[(data_in)] if decoded == "DEC8b10bERR": @@ -2080,4 +2216,3 @@ class EncDec_8B10B(object): print("Decoded: {:02X} - Control: {:01b}".format(decoded, ctrl)) return ctrl, decoded - diff --git a/blinkled/encdec8b10b/tests/test_class.py b/blinkled/encdec8b10b/tests/test_class.py new file mode 100644 index 0000000..c8da818 --- /dev/null +++ b/blinkled/encdec8b10b/tests/test_class.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- + +from .context import encdec8b10b +from encdec8b10b import EncDec8B10B +import random + +import unittest + +import sys +if sys.version_info.major != 3: + raise RuntimeError('invalid major version of python') + +class TestClass(unittest.TestCase): + def test_to10b(self): + to10b = EncDec8B10B._to10b + + self.assertEqual(to10b(0), '0' * 10) + self.assertEqual(to10b(2**10-1), '1' * 10) + self.assertEqual(to10b(0x20f), '1' * 4 + '000001') + self.assertEqual(to10b(0x0ff), '1' * 8 + '00') + + dispminmax = [ 2, 3, 2, 3, 2, 1, 2, 1, 2, 1 ] + + def test_disparity(self): + failures = [] + for i in range(512): + disp = i >> 8 + din = i & 0xff + ndisp, out = EncDec8B10B.enc_8b10b(din, disp) + cnt = +1 if disp else -1 + + for j in range(10): + out, bit = divmod(out, 2) + cnt += +1 if bit else -1 + minmax = self.dispminmax[j] + if cnt < -minmax or cnt > minmax: + failures.append((disp, din)) + + if cnt != 1 if ndisp else cnt != -1: + failures.append((disp, din)) + + if failures: + raise RuntimeError('failures(%d): %s' % (len(failures), repr(failures))) + + def test_bitdecoding(self): + coder = EncDec8B10B() + + with self.assertRaises(ValueError): + coder.decode('asioj') + + self.assertIsNone(coder.decode('')) + + self.assertFalse(coder.issyncd()) + self.assertIsNone(coder.decode('101011011010101101')) + + self.assertFalse(coder.issyncd()) + + self.assertEqual(coder.decode(coder.encode(EncDec8B10B.COMMA)), EncDec8B10B.COMMA) + self.assertTrue(coder.issyncd()) + + astr = coder.encode(b'a') + + self.assertIsNone(coder.decode(astr[:5])) + + self.assertEqual(coder.decode(astr[5:]), b'a') + + self.assertEqual(coder.decode(coder.encode(b'abc123')), b'abc123') + + chrctrlstr = coder.encode(b'xx') + coder.encode(EncDec8B10B.K_28_0) + + self.assertIsNone(coder.decode(chrctrlstr[:5])) + + self.assertEqual(coder.decode(chrctrlstr[5:]), b'xx') + self.assertEqual(coder.decode(''), EncDec8B10B.K_28_0) + self.assertEqual(coder.decode(coder.encode(EncDec8B10B.K_28_2)), EncDec8B10B.K_28_2) + + # that when junk is delivered, it is ignored + self.assertIsNone(coder.decode('111111111000011111')) + + # and is no longer synced + self.assertFalse(coder.issyncd()) + + commaastr = coder.encode(EncDec8B10B.COMMA) + \ + coder.encode(b'a') + + # But it will sync back up + self.assertEqual(coder.decode(commaastr), EncDec8B10B.COMMA) + self.assertEqual(coder.decode(''), b'a') + + self.assertIsNone(coder.decode('')) + + self.assertEqual(coder.decode(coder.encode(EncDec8B10B.COMMA)), EncDec8B10B.COMMA) + + def test_bitencoding(self): + coder = EncDec8B10B() + + self.assertEqual(coder.encode(b'a'), '0111010011') + # + + s = ''.join(( + '1000101100', # a - + '1011010011', # b + + '1100010011', # c + + '1000111001', # 1 + + '0100111001', # 2 + + '1100101001', # 3 + + )) + + self.assertEqual(coder.encode(b'abc123'), s) + self.assertEqual(coder.encode(EncDec8B10B.COMMA), '1100000101') + self.assertEqual(coder.encode(EncDec8B10B.COMMA), '0011111010') diff --git a/blinkled/genseq.py b/blinkled/genseq.py new file mode 100644 index 0000000..0295bdc --- /dev/null +++ b/blinkled/genseq.py @@ -0,0 +1,3 @@ +from encdec8b10b import EncDec8B10B + +print('done') diff --git a/blinkled/requirements.txt b/blinkled/requirements.txt new file mode 100644 index 0000000..deb64dc --- /dev/null +++ b/blinkled/requirements.txt @@ -0,0 +1 @@ +-e ./encdec8b10b