From e7e742150db73f9ca0e0c15f816d44388d312563 Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Sun, 24 Apr 2022 21:50:48 -0700 Subject: [PATCH] add a working framework for doing 8b10b encoding/decoding.. This will need to be fixed up a bit to handle when a bit gets doubled due to when two frames have the same bit as the frame length will be around 33.3ms and the bit length around 34ms, which means around every 52s, the same bit will be split over two frames. --- blinkled/README.md | 24 ++++ blinkled/encdec8b10b/encdec8b10b/__init__.py | 2 +- blinkled/encdec8b10b/encdec8b10b/core.py | 139 ++++++++++++++++++- blinkled/encdec8b10b/tests/test_class.py | 110 +++++++++++++++ blinkled/genseq.py | 3 + blinkled/requirements.txt | 1 + 6 files changed, 276 insertions(+), 3 deletions(-) create mode 100644 blinkled/README.md create mode 100644 blinkled/encdec8b10b/tests/test_class.py create mode 100644 blinkled/genseq.py create mode 100644 blinkled/requirements.txt diff --git a/blinkled/README.md b/blinkled/README.md new file mode 100644 index 0000000..2e63a41 --- /dev/null +++ b/blinkled/README.md @@ -0,0 +1,24 @@ +Blink LED +========= + +This project is to blink a LED, and have a standard cell phone camera +decode the data. + + +8b10b code +---------- + +Adding: +``` +(cd ..; git subtree add -P blinkled/encdec8b10b --squash https://github.com/olagrottvik/encdec8b10b.git master) +``` + +Updating: +``` +(cd ..; git subtree pull -P blinkled/encdec8b10b --squash https://github.com/olagrottvik/encdec8b10b.git master) +``` + +Info +---- + +[8b/10b encoding](https://en.wikipedia.org/wiki/8b/10b_encoding) diff --git a/blinkled/encdec8b10b/encdec8b10b/__init__.py b/blinkled/encdec8b10b/encdec8b10b/__init__.py index 9b50930..80a74ca 100644 --- a/blinkled/encdec8b10b/encdec8b10b/__init__.py +++ b/blinkled/encdec8b10b/encdec8b10b/__init__.py @@ -1 +1 @@ -from .core import EncDec_8B10B as EncDec8B10B \ No newline at end of file +from .core import EncDec_8B10B as EncDec8B10B diff --git a/blinkled/encdec8b10b/encdec8b10b/core.py b/blinkled/encdec8b10b/encdec8b10b/core.py index 59d8625..078f5aa 100644 --- a/blinkled/encdec8b10b/encdec8b10b/core.py +++ b/blinkled/encdec8b10b/encdec8b10b/core.py @@ -1,6 +1,39 @@ -import random +# https://en.wikipedia.org/wiki/8b/10b_encoding + +import re + +class ControlSymbol(int): + pass class EncDec_8B10B(object): + # Table 36-3 of 802.3-2018 has symbols for 1000Base-X coding + + COMMA = ControlSymbol(0xbc) + COMMA_ALTA = ControlSymbol(0x3c) + COMMA_ALTB = ControlSymbol(0xfc) + K_28_0 = ControlSymbol(0x1c) + K_28_2 = ControlSymbol(0x5c) + K_28_3 = ControlSymbol(0x7c) + K_28_4 = ControlSymbol(0x9c) + K_28_6 = ControlSymbol(0xdc) + K_23_7 = ControlSymbol(0xf7) + K_27_7 = ControlSymbol(0xfb) + K_29_7 = ControlSymbol(0xfd) + K_30_7 = ControlSymbol(0xfe) + + # Where disparity of 0 means more 0's, 1 means more 1's + # + # index of lookup strings: + # + # + # data is lowest/rightmost bit maps to A, and high bit is H + # + # format of data: + # + # data is lowest/rightmost bit transmitted first + # + # comment of each line is: + # # "" [] enc_lookup = [ "00010111001", # "00000000" -D00.0- [0] "00010101110", # "00000001" -D01.0- [1] @@ -2055,8 +2088,109 @@ class EncDec_8B10B(object): "DEC8b10bERR" # "1111111111" ] + def __init__(self): + self.__rd = 0 + self.__syncd = False + self.__pendingdata = '' + + @staticmethod + def _to10b(v): + '''convert integer to bits, in the correct order, which is + first (high) bit to be sent first.''' + + binstr = bin(v)[2:] + + return ('0' * (10 - len(binstr)) + binstr)[10:-11:-1] + + def issyncd(self): + return self.__syncd + + __syncre = re.compile('11000001..|00111110..') + + def decode(self, bstr): + '''Decode bstr, a string of 0's, and 1's to bytes. This + consumes all the bits, and stores any excess bits if some are + missing. + + Returns either None, or a byte string w/ the data. + + If a control byte is received, it is returned by itself as an + instance of ControlSymbol. + + Note that it needs to receive an EncDec_8B10B.COMMA control byte + first to establish sync, and then will data after that. + ''' + + if set(bstr) - set('01'): + raise ValueError('invalid characters in: %s' % repr(bstr)) + + data = self.__pendingdata + bstr + + if not self.__syncd: + m = self.__syncre.search(data) + if not m: + self.__pendingdata = data[-9:] + return + + self.__syncd = True + + data = data[m.start():] + + r = bytearray() + + for idx in range(0, len(data), 10): + i = data[idx:idx + 10] + if len(i) != 10: + self.__pendingdata = i + break + + try: + ctrl, d = self.dec_8b10b(int(i[::-1], 2)) + except Exception: + self.__syncd = False + self.__pendingdata = data[idx + 1:] + return + + #print(repr(ctrl), repr(d), repr(r)) + if ctrl and r: + self.__pendingdata = data[idx:] + return r + + if ctrl: + self.__pendingdata = data[idx + 10:] + return ControlSymbol(d) + + r += d.to_bytes(1, byteorder='big') + else: + self.__pendingdata = '' + + if r: + return r + + def encode(self, bstr): + '''Encode bstr into a string representing the binary (unicode + string represented by 0 and 1). The first bit to transmit is + at index 0, and so on.''' + + r = [] + rd = self.__rd + + if isinstance(bstr, ControlSymbol): + rd, c = self.enc_8b10b(bstr, rd, ctrl=1) + r.append(c) + else: + for i in bstr: + rd, c = self.enc_8b10b(i, rd) + r.append(c) + + self.__rd = rd + + return ''.join(self._to10b(x) for x in r) + @staticmethod def enc_8b10b(data_in, running_disparity, ctrl=0, verbose=False): + '''Encode a single byte to 10 bits.''' + assert data_in <= 0xFF, "Data in must be maximum one byte" encoded = int(EncDec_8B10B.enc_lookup[( ctrl << 9) + (running_disparity << 8) + data_in], 2) @@ -2068,6 +2202,8 @@ class EncDec_8B10B(object): @staticmethod def dec_8b10b(data_in, verbose=False): + '''Decode 10 bits into a byte.''' + assert data_in <= 0x3FF, "Data in must be maximum 10 bits" decoded = EncDec_8B10B.dec_lookup[(data_in)] if decoded == "DEC8b10bERR": @@ -2080,4 +2216,3 @@ class EncDec_8B10B(object): print("Decoded: {:02X} - Control: {:01b}".format(decoded, ctrl)) return ctrl, decoded - diff --git a/blinkled/encdec8b10b/tests/test_class.py b/blinkled/encdec8b10b/tests/test_class.py new file mode 100644 index 0000000..c8da818 --- /dev/null +++ b/blinkled/encdec8b10b/tests/test_class.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- + +from .context import encdec8b10b +from encdec8b10b import EncDec8B10B +import random + +import unittest + +import sys +if sys.version_info.major != 3: + raise RuntimeError('invalid major version of python') + +class TestClass(unittest.TestCase): + def test_to10b(self): + to10b = EncDec8B10B._to10b + + self.assertEqual(to10b(0), '0' * 10) + self.assertEqual(to10b(2**10-1), '1' * 10) + self.assertEqual(to10b(0x20f), '1' * 4 + '000001') + self.assertEqual(to10b(0x0ff), '1' * 8 + '00') + + dispminmax = [ 2, 3, 2, 3, 2, 1, 2, 1, 2, 1 ] + + def test_disparity(self): + failures = [] + for i in range(512): + disp = i >> 8 + din = i & 0xff + ndisp, out = EncDec8B10B.enc_8b10b(din, disp) + cnt = +1 if disp else -1 + + for j in range(10): + out, bit = divmod(out, 2) + cnt += +1 if bit else -1 + minmax = self.dispminmax[j] + if cnt < -minmax or cnt > minmax: + failures.append((disp, din)) + + if cnt != 1 if ndisp else cnt != -1: + failures.append((disp, din)) + + if failures: + raise RuntimeError('failures(%d): %s' % (len(failures), repr(failures))) + + def test_bitdecoding(self): + coder = EncDec8B10B() + + with self.assertRaises(ValueError): + coder.decode('asioj') + + self.assertIsNone(coder.decode('')) + + self.assertFalse(coder.issyncd()) + self.assertIsNone(coder.decode('101011011010101101')) + + self.assertFalse(coder.issyncd()) + + self.assertEqual(coder.decode(coder.encode(EncDec8B10B.COMMA)), EncDec8B10B.COMMA) + self.assertTrue(coder.issyncd()) + + astr = coder.encode(b'a') + + self.assertIsNone(coder.decode(astr[:5])) + + self.assertEqual(coder.decode(astr[5:]), b'a') + + self.assertEqual(coder.decode(coder.encode(b'abc123')), b'abc123') + + chrctrlstr = coder.encode(b'xx') + coder.encode(EncDec8B10B.K_28_0) + + self.assertIsNone(coder.decode(chrctrlstr[:5])) + + self.assertEqual(coder.decode(chrctrlstr[5:]), b'xx') + self.assertEqual(coder.decode(''), EncDec8B10B.K_28_0) + self.assertEqual(coder.decode(coder.encode(EncDec8B10B.K_28_2)), EncDec8B10B.K_28_2) + + # that when junk is delivered, it is ignored + self.assertIsNone(coder.decode('111111111000011111')) + + # and is no longer synced + self.assertFalse(coder.issyncd()) + + commaastr = coder.encode(EncDec8B10B.COMMA) + \ + coder.encode(b'a') + + # But it will sync back up + self.assertEqual(coder.decode(commaastr), EncDec8B10B.COMMA) + self.assertEqual(coder.decode(''), b'a') + + self.assertIsNone(coder.decode('')) + + self.assertEqual(coder.decode(coder.encode(EncDec8B10B.COMMA)), EncDec8B10B.COMMA) + + def test_bitencoding(self): + coder = EncDec8B10B() + + self.assertEqual(coder.encode(b'a'), '0111010011') + # + + s = ''.join(( + '1000101100', # a - + '1011010011', # b + + '1100010011', # c + + '1000111001', # 1 + + '0100111001', # 2 + + '1100101001', # 3 + + )) + + self.assertEqual(coder.encode(b'abc123'), s) + self.assertEqual(coder.encode(EncDec8B10B.COMMA), '1100000101') + self.assertEqual(coder.encode(EncDec8B10B.COMMA), '0011111010') diff --git a/blinkled/genseq.py b/blinkled/genseq.py new file mode 100644 index 0000000..0295bdc --- /dev/null +++ b/blinkled/genseq.py @@ -0,0 +1,3 @@ +from encdec8b10b import EncDec8B10B + +print('done') diff --git a/blinkled/requirements.txt b/blinkled/requirements.txt new file mode 100644 index 0000000..deb64dc --- /dev/null +++ b/blinkled/requirements.txt @@ -0,0 +1 @@ +-e ./encdec8b10b