#!/usr/local/bin/python3 import email.mime.multipart import os import sys import traceback import unittest from email.message import Message from urllib.parse import parse_qs def _deb(*args): if True: print(*args, file=sys.stderr) def getdata(ct, bstr): _deb('ct:', repr(ct)) _deb('bstr:', repr(bstr)) m = Message() m['content-type'] = ct boundary = '--' + m.get_param('boundary') # trim the end: _deb('bstr:', repr(bstr)) # pretend we had the previous record body = b'\r\n' + bstr.split(('\r\n' + boundary + '--\r\n').encode('ASCII'), 1)[0] _deb('body:', repr(body)) parts = body.split(('\r\n' + boundary + '\r\n').encode('ASCII'))[1:] msgs = [ x for x in map(email.message_from_bytes, parts) if x.get_param('name', header='content-disposition') == 'sub' ] return msgs[0].get_payload() if __name__ == '__main__': _deb('env:', repr(os.environ)) contentlen = int(os.environ.get('CONTENT_LENGTH', '0')) body = sys.stdin.buffer.read(contentlen) try: sub = getdata(os.environ.get('CONTENT_TYPE'), body) os.umask(0o66) with open('/tmp/subinfo.txt', 'w') as fp: print(sub, file=fp) print('Content-Type: text/plain\r') print('\r') print('OK\r') except: _deb(traceback.format_exc()) print('status: 500 Server Error\r') print('Content-Type: text/plain\r') print('\r') print('ERROR\r') class Test(unittest.TestCase): _testdata = ''' 0x0040: 504f 5354 202f 7075 7368 2048 5454 ..POST./push.HTT 0x0050: 502f 312e 310d 0a48 6f73 743a 2031 3932 P/1.1..Host:.192 0x0060: 2e31 3638 2e30 2e33 0d0a 5573 6572 2d41 .168.0.3..User-A 0x0070: 6765 6e74 3a20 4d6f 7a69 6c6c 612f 352e gent:.Mozilla/5. 0x0080: 3020 284d 6163 696e 746f 7368 3b20 496e 0.(Macintosh;.In 0x0090: 7465 6c20 4d61 6320 4f53 2058 2031 302e tel.Mac.OS.X.10. 0x00a0: 3135 3b20 7276 3a31 3039 2e30 2920 4765 15;.rv:109.0).Ge 0x00b0: 636b 6f2f 3230 3130 3031 3031 2046 6972 cko/20100101.Fir 0x00c0: 6566 6f78 2f31 3135 2e30 0d0a 4163 6365 efox/115.0..Acce 0x00d0: 7074 3a20 2a2f 2a0d 0a41 6363 6570 742d pt:.*/*..Accept- 0x00e0: 4c61 6e67 7561 6765 3a20 656e 2d55 532c Language:.en-US, 0x00f0: 656e 3b71 3d30 2e35 0d0a 4163 6365 7074 en;q=0.5..Accept 0x0100: 2d45 6e63 6f64 696e 673a 2067 7a69 702c -Encoding:.gzip, 0x0110: 2064 6566 6c61 7465 2c20 6272 0d0a 5265 .deflate,.br..Re 0x0120: 6665 7265 723a 2068 7474 7073 3a2f 2f77 ferer:.https://w 0x0130: 7777 2e66 756e 6b74 6861 742e 636f 6d2f ww.funkthat.com/ 0x0140: 7765 6270 7573 682f 0d0a 436f 6e74 656e webpush/..Conten 0x0150: 742d 5479 7065 3a20 6d75 6c74 6970 6172 t-Type:.multipar 0x0160: 742f 666f 726d 2d64 6174 613b 2062 6f75 t/form-data;.bou 0x0170: 6e64 6172 793d 2d2d 2d2d 2d2d 2d2d 2d2d ndary=---------- 0x0180: 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d ---------------- 0x0190: 2d33 3435 3837 3235 3136 3133 3033 3132 -345872516130312 0x01a0: 3635 3935 3233 3135 3033 3739 3130 310d 659523150379101. 0x01b0: 0a4f 7269 6769 6e3a 2068 7474 7073 3a2f .Origin:.https:/ 0x01c0: 2f77 7777 2e66 756e 6b74 6861 742e 636f /www.funkthat.co 0x01d0: 6d0d 0a44 4e54 3a20 310d 0a53 6563 2d46 m..DNT:.1..Sec-F 0x01e0: 6574 6368 2d44 6573 743a 2065 6d70 7479 etch-Dest:.empty 0x01f0: 0d0a 5365 632d 4665 7463 682d 4d6f 6465 ..Sec-Fetch-Mode 0x0200: 3a20 636f 7273 0d0a 5365 632d 4665 7463 :.cors..Sec-Fetc 0x0210: 682d 5369 7465 3a20 7361 6d65 2d6f 7269 h-Site:.same-ori 0x0220: 6769 6e0d 0a53 6563 2d47 5043 3a20 310d gin..Sec-GPC:.1. 0x0230: 0a58 2d46 6f72 7761 7264 6564 2d46 6f72 .X-Forwarded-For 0x0240: 3a20 3139 322e 3136 382e 302e 330d 0a58 :.192.168.0.3..X 0x0250: 2d46 6f72 7761 7264 6564 2d48 6f73 743a -Forwarded-Host: 0x0260: 2077 7777 2e66 756e 6b74 6861 742e 636f .www.funkthat.co 0x0270: 6d0d 0a58 2d46 6f72 7761 7264 6564 2d53 m..X-Forwarded-S 0x0280: 6572 7665 723a 2077 7777 2e66 756e 6b74 erver:.www.funkt 0x0290: 6861 742e 636f 6d0d 0a43 6f6e 6e65 6374 hat.com..Connect 0x02a0: 696f 6e3a 204b 6565 702d 416c 6976 650d ion:.Keep-Alive. 0x02b0: 0a43 6f6e 7465 6e74 2d4c 656e 6774 683a .Content-Length: 0x02c0: 2035 3833 0d0a 0d0a 2d2d 2d2d 2d2d 2d2d .583....-------- 0x02d0: 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d ---------------- 0x02e0: 2d2d 2d2d 2d33 3435 3837 3235 3136 3133 -----34587251613 0x02f0: 3033 3132 3635 3935 3233 3135 3033 3739 0312659523150379 0x0300: 3130 310d 0a43 6f6e 7465 6e74 2d44 6973 101..Content-Dis 0x0310: 706f 7369 7469 6f6e 3a20 666f 726d 2d64 position:.form-d 0x0320: 6174 613b 206e 616d 653d 2273 7562 220d ata;.name="sub". 0x0330: 0a0d 0a7b 2265 6e64 706f 696e 7422 3a22 ...{"endpoint":" 0x0340: 6874 7470 733a 2f2f 7570 6461 7465 732e https://updates. 0x0350: 7075 7368 2e73 6572 7669 6365 732e 6d6f push.services.mo 0x0360: 7a69 6c6c 612e 636f 6d2f 7770 7573 682f zilla.com/wpush/ 0x04c0: 5a4e 565a 6441 535f 4954 3822 7d7d 0d0a ZNVZdAS_IT8"}}.. 0x04d0: 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d ---------------- 0x04e0: 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d33 3435 -------------345 0x04f0: 3837 3235 3136 3133 3033 3132 3635 3935 8725161303126595 0x0500: 3233 3135 3033 3739 3130 312d 2d0d 0a 23150379101--.. ''' @staticmethod def _process_hexdump(data): lines = (x.split(':', 1)[1].strip().split(' ', 1)[0] for x in data.split('\n') if x.strip()) return bytes.fromhex(''.join(lines)) def test_basic(self): bstr = self._process_hexdump(self._testdata) # drop post line bstr = bstr.split(b'\r\n', 1)[1] _deb(repr(bstr)) msg = email.message_from_bytes(bstr) _deb('msg hdrs:', repr(msg.items())) # get ct ct = msg['content-type'] _deb('ct:', repr(ct)) # get body: body = bstr.split(b'\r\n\r\n', 1)[1] # do the actual test: res = getdata(ct, body) self.assertEqual(res, '{"endpoint":"https://updates.push.services.mozilla.com/wpush/ZNVZdAS_IT8"}}')