This will need to be fixed up a bit to handle when a bit gets doubled due to when two frames have the same bit as the frame length will be around 33.3ms and the bit length around 34ms, which means around every 52s, the same bit will be split over two frames.blinkled
@@ -0,0 +1,24 @@ | |||
Blink LED | |||
========= | |||
This project is to blink a LED, and have a standard cell phone camera | |||
decode the data. | |||
8b10b code | |||
---------- | |||
Adding: | |||
``` | |||
(cd ..; git subtree add -P blinkled/encdec8b10b --squash https://github.com/olagrottvik/encdec8b10b.git master) | |||
``` | |||
Updating: | |||
``` | |||
(cd ..; git subtree pull -P blinkled/encdec8b10b --squash https://github.com/olagrottvik/encdec8b10b.git master) | |||
``` | |||
Info | |||
---- | |||
[8b/10b encoding](https://en.wikipedia.org/wiki/8b/10b_encoding) |
@@ -1 +1 @@ | |||
from .core import EncDec_8B10B as EncDec8B10B | |||
from .core import EncDec_8B10B as EncDec8B10B |
@@ -1,6 +1,39 @@ | |||
import random | |||
# https://en.wikipedia.org/wiki/8b/10b_encoding | |||
import re | |||
class ControlSymbol(int): | |||
pass | |||
class EncDec_8B10B(object): | |||
# Table 36-3 of 802.3-2018 has symbols for 1000Base-X coding | |||
COMMA = ControlSymbol(0xbc) | |||
COMMA_ALTA = ControlSymbol(0x3c) | |||
COMMA_ALTB = ControlSymbol(0xfc) | |||
K_28_0 = ControlSymbol(0x1c) | |||
K_28_2 = ControlSymbol(0x5c) | |||
K_28_3 = ControlSymbol(0x7c) | |||
K_28_4 = ControlSymbol(0x9c) | |||
K_28_6 = ControlSymbol(0xdc) | |||
K_23_7 = ControlSymbol(0xf7) | |||
K_27_7 = ControlSymbol(0xfb) | |||
K_29_7 = ControlSymbol(0xfd) | |||
K_30_7 = ControlSymbol(0xfe) | |||
# Where disparity of 0 means more 0's, 1 means more 1's | |||
# | |||
# index of lookup strings: | |||
# <ctrl><disparity><data> | |||
# | |||
# data is lowest/rightmost bit maps to A, and high bit is H | |||
# | |||
# format of data: | |||
# <newdisparity><data> | |||
# data is lowest/rightmost bit transmitted first | |||
# | |||
# comment of each line is: | |||
# # "<data bitstring>" <start disp><encoding><ending> [<dec val>] | |||
enc_lookup = [ | |||
"00010111001", # "00000000" -D00.0- [0] | |||
"00010101110", # "00000001" -D01.0- [1] | |||
@@ -2055,8 +2088,109 @@ class EncDec_8B10B(object): | |||
"DEC8b10bERR" # "1111111111" | |||
] | |||
def __init__(self): | |||
self.__rd = 0 | |||
self.__syncd = False | |||
self.__pendingdata = '' | |||
@staticmethod | |||
def _to10b(v): | |||
'''convert integer to bits, in the correct order, which is | |||
first (high) bit to be sent first.''' | |||
binstr = bin(v)[2:] | |||
return ('0' * (10 - len(binstr)) + binstr)[10:-11:-1] | |||
def issyncd(self): | |||
return self.__syncd | |||
__syncre = re.compile('11000001..|00111110..') | |||
def decode(self, bstr): | |||
'''Decode bstr, a string of 0's, and 1's to bytes. This | |||
consumes all the bits, and stores any excess bits if some are | |||
missing. | |||
Returns either None, or a byte string w/ the data. | |||
If a control byte is received, it is returned by itself as an | |||
instance of ControlSymbol. | |||
Note that it needs to receive an EncDec_8B10B.COMMA control byte | |||
first to establish sync, and then will data after that. | |||
''' | |||
if set(bstr) - set('01'): | |||
raise ValueError('invalid characters in: %s' % repr(bstr)) | |||
data = self.__pendingdata + bstr | |||
if not self.__syncd: | |||
m = self.__syncre.search(data) | |||
if not m: | |||
self.__pendingdata = data[-9:] | |||
return | |||
self.__syncd = True | |||
data = data[m.start():] | |||
r = bytearray() | |||
for idx in range(0, len(data), 10): | |||
i = data[idx:idx + 10] | |||
if len(i) != 10: | |||
self.__pendingdata = i | |||
break | |||
try: | |||
ctrl, d = self.dec_8b10b(int(i[::-1], 2)) | |||
except Exception: | |||
self.__syncd = False | |||
self.__pendingdata = data[idx + 1:] | |||
return | |||
#print(repr(ctrl), repr(d), repr(r)) | |||
if ctrl and r: | |||
self.__pendingdata = data[idx:] | |||
return r | |||
if ctrl: | |||
self.__pendingdata = data[idx + 10:] | |||
return ControlSymbol(d) | |||
r += d.to_bytes(1, byteorder='big') | |||
else: | |||
self.__pendingdata = '' | |||
if r: | |||
return r | |||
def encode(self, bstr): | |||
'''Encode bstr into a string representing the binary (unicode | |||
string represented by 0 and 1). The first bit to transmit is | |||
at index 0, and so on.''' | |||
r = [] | |||
rd = self.__rd | |||
if isinstance(bstr, ControlSymbol): | |||
rd, c = self.enc_8b10b(bstr, rd, ctrl=1) | |||
r.append(c) | |||
else: | |||
for i in bstr: | |||
rd, c = self.enc_8b10b(i, rd) | |||
r.append(c) | |||
self.__rd = rd | |||
return ''.join(self._to10b(x) for x in r) | |||
@staticmethod | |||
def enc_8b10b(data_in, running_disparity, ctrl=0, verbose=False): | |||
'''Encode a single byte to 10 bits.''' | |||
assert data_in <= 0xFF, "Data in must be maximum one byte" | |||
encoded = int(EncDec_8B10B.enc_lookup[( | |||
ctrl << 9) + (running_disparity << 8) + data_in], 2) | |||
@@ -2068,6 +2202,8 @@ class EncDec_8B10B(object): | |||
@staticmethod | |||
def dec_8b10b(data_in, verbose=False): | |||
'''Decode 10 bits into a byte.''' | |||
assert data_in <= 0x3FF, "Data in must be maximum 10 bits" | |||
decoded = EncDec_8B10B.dec_lookup[(data_in)] | |||
if decoded == "DEC8b10bERR": | |||
@@ -2080,4 +2216,3 @@ class EncDec_8B10B(object): | |||
print("Decoded: {:02X} - Control: {:01b}".format(decoded, ctrl)) | |||
return ctrl, decoded | |||
@@ -0,0 +1,110 @@ | |||
# -*- coding: utf-8 -*- | |||
from .context import encdec8b10b | |||
from encdec8b10b import EncDec8B10B | |||
import random | |||
import unittest | |||
import sys | |||
if sys.version_info.major != 3: | |||
raise RuntimeError('invalid major version of python') | |||
class TestClass(unittest.TestCase): | |||
def test_to10b(self): | |||
to10b = EncDec8B10B._to10b | |||
self.assertEqual(to10b(0), '0' * 10) | |||
self.assertEqual(to10b(2**10-1), '1' * 10) | |||
self.assertEqual(to10b(0x20f), '1' * 4 + '000001') | |||
self.assertEqual(to10b(0x0ff), '1' * 8 + '00') | |||
dispminmax = [ 2, 3, 2, 3, 2, 1, 2, 1, 2, 1 ] | |||
def test_disparity(self): | |||
failures = [] | |||
for i in range(512): | |||
disp = i >> 8 | |||
din = i & 0xff | |||
ndisp, out = EncDec8B10B.enc_8b10b(din, disp) | |||
cnt = +1 if disp else -1 | |||
for j in range(10): | |||
out, bit = divmod(out, 2) | |||
cnt += +1 if bit else -1 | |||
minmax = self.dispminmax[j] | |||
if cnt < -minmax or cnt > minmax: | |||
failures.append((disp, din)) | |||
if cnt != 1 if ndisp else cnt != -1: | |||
failures.append((disp, din)) | |||
if failures: | |||
raise RuntimeError('failures(%d): %s' % (len(failures), repr(failures))) | |||
def test_bitdecoding(self): | |||
coder = EncDec8B10B() | |||
with self.assertRaises(ValueError): | |||
coder.decode('asioj') | |||
self.assertIsNone(coder.decode('')) | |||
self.assertFalse(coder.issyncd()) | |||
self.assertIsNone(coder.decode('101011011010101101')) | |||
self.assertFalse(coder.issyncd()) | |||
self.assertEqual(coder.decode(coder.encode(EncDec8B10B.COMMA)), EncDec8B10B.COMMA) | |||
self.assertTrue(coder.issyncd()) | |||
astr = coder.encode(b'a') | |||
self.assertIsNone(coder.decode(astr[:5])) | |||
self.assertEqual(coder.decode(astr[5:]), b'a') | |||
self.assertEqual(coder.decode(coder.encode(b'abc123')), b'abc123') | |||
chrctrlstr = coder.encode(b'xx') + coder.encode(EncDec8B10B.K_28_0) | |||
self.assertIsNone(coder.decode(chrctrlstr[:5])) | |||
self.assertEqual(coder.decode(chrctrlstr[5:]), b'xx') | |||
self.assertEqual(coder.decode(''), EncDec8B10B.K_28_0) | |||
self.assertEqual(coder.decode(coder.encode(EncDec8B10B.K_28_2)), EncDec8B10B.K_28_2) | |||
# that when junk is delivered, it is ignored | |||
self.assertIsNone(coder.decode('111111111000011111')) | |||
# and is no longer synced | |||
self.assertFalse(coder.issyncd()) | |||
commaastr = coder.encode(EncDec8B10B.COMMA) + \ | |||
coder.encode(b'a') | |||
# But it will sync back up | |||
self.assertEqual(coder.decode(commaastr), EncDec8B10B.COMMA) | |||
self.assertEqual(coder.decode(''), b'a') | |||
self.assertIsNone(coder.decode('')) | |||
self.assertEqual(coder.decode(coder.encode(EncDec8B10B.COMMA)), EncDec8B10B.COMMA) | |||
def test_bitencoding(self): | |||
coder = EncDec8B10B() | |||
self.assertEqual(coder.encode(b'a'), '0111010011') | |||
# + | |||
s = ''.join(( | |||
'1000101100', # a - | |||
'1011010011', # b + | |||
'1100010011', # c + | |||
'1000111001', # 1 + | |||
'0100111001', # 2 + | |||
'1100101001', # 3 + | |||
)) | |||
self.assertEqual(coder.encode(b'abc123'), s) | |||
self.assertEqual(coder.encode(EncDec8B10B.COMMA), '1100000101') | |||
self.assertEqual(coder.encode(EncDec8B10B.COMMA), '0011111010') |
@@ -0,0 +1,3 @@ | |||
from encdec8b10b import EncDec8B10B | |||
print('done') |
@@ -0,0 +1 @@ | |||
-e ./encdec8b10b |