|
- #!/usr/local/bin/python3
-
- import email.mime.multipart
- import os
- import sys
- import traceback
- import unittest
-
- from email.message import Message
- from urllib.parse import parse_qs
-
- def _deb(*args):
- if True:
- print(*args, file=sys.stderr)
-
- def getdata(ct, bstr):
- _deb('ct:', repr(ct))
- _deb('bstr:', repr(bstr))
- m = Message()
- m['content-type'] = ct
- boundary = '--' + m.get_param('boundary')
-
- # trim the end:
- _deb('bstr:', repr(bstr))
-
- # pretend we had the previous record
- body = b'\r\n' + bstr.split(('\r\n' + boundary + '--\r\n').encode('ASCII'), 1)[0]
-
- _deb('body:', repr(body))
-
- parts = body.split(('\r\n' + boundary + '\r\n').encode('ASCII'))[1:]
-
- msgs = [ x for x in map(email.message_from_bytes, parts) if x.get_param('name', header='content-disposition') == 'sub' ]
-
- return msgs[0].get_payload()
-
- if __name__ == '__main__':
- _deb('env:', repr(os.environ))
- contentlen = int(os.environ.get('CONTENT_LENGTH', '0'))
-
- body = sys.stdin.buffer.read(contentlen)
-
- try:
- sub = getdata(os.environ.get('CONTENT_TYPE'), body)
-
- os.umask(0o66)
- with open('/tmp/subinfo.txt', 'w') as fp:
- print(sub, file=fp)
-
- print('Content-Type: text/plain\r')
- print('\r')
- print('OK\r')
- except:
- _deb(traceback.format_exc())
-
- print('status: 500 Server Error\r')
- print('Content-Type: text/plain\r')
- print('\r')
- print('ERROR\r')
-
- class Test(unittest.TestCase):
- _testdata = '''
- 0x0040: 504f 5354 202f 7075 7368 2048 5454 ..POST./push.HTT
- 0x0050: 502f 312e 310d 0a48 6f73 743a 2031 3932 P/1.1..Host:.192
- 0x0060: 2e31 3638 2e30 2e33 0d0a 5573 6572 2d41 .168.0.3..User-A
- 0x0070: 6765 6e74 3a20 4d6f 7a69 6c6c 612f 352e gent:.Mozilla/5.
- 0x0080: 3020 284d 6163 696e 746f 7368 3b20 496e 0.(Macintosh;.In
- 0x0090: 7465 6c20 4d61 6320 4f53 2058 2031 302e tel.Mac.OS.X.10.
- 0x00a0: 3135 3b20 7276 3a31 3039 2e30 2920 4765 15;.rv:109.0).Ge
- 0x00b0: 636b 6f2f 3230 3130 3031 3031 2046 6972 cko/20100101.Fir
- 0x00c0: 6566 6f78 2f31 3135 2e30 0d0a 4163 6365 efox/115.0..Acce
- 0x00d0: 7074 3a20 2a2f 2a0d 0a41 6363 6570 742d pt:.*/*..Accept-
- 0x00e0: 4c61 6e67 7561 6765 3a20 656e 2d55 532c Language:.en-US,
- 0x00f0: 656e 3b71 3d30 2e35 0d0a 4163 6365 7074 en;q=0.5..Accept
- 0x0100: 2d45 6e63 6f64 696e 673a 2067 7a69 702c -Encoding:.gzip,
- 0x0110: 2064 6566 6c61 7465 2c20 6272 0d0a 5265 .deflate,.br..Re
- 0x0120: 6665 7265 723a 2068 7474 7073 3a2f 2f77 ferer:.https://w
- 0x0130: 7777 2e66 756e 6b74 6861 742e 636f 6d2f ww.funkthat.com/
- 0x0140: 7765 6270 7573 682f 0d0a 436f 6e74 656e webpush/..Conten
- 0x0150: 742d 5479 7065 3a20 6d75 6c74 6970 6172 t-Type:.multipar
- 0x0160: 742f 666f 726d 2d64 6174 613b 2062 6f75 t/form-data;.bou
- 0x0170: 6e64 6172 793d 2d2d 2d2d 2d2d 2d2d 2d2d ndary=----------
- 0x0180: 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d ----------------
- 0x0190: 2d33 3435 3837 3235 3136 3133 3033 3132 -345872516130312
- 0x01a0: 3635 3935 3233 3135 3033 3739 3130 310d 659523150379101.
- 0x01b0: 0a4f 7269 6769 6e3a 2068 7474 7073 3a2f .Origin:.https:/
- 0x01c0: 2f77 7777 2e66 756e 6b74 6861 742e 636f /www.funkthat.co
- 0x01d0: 6d0d 0a44 4e54 3a20 310d 0a53 6563 2d46 m..DNT:.1..Sec-F
- 0x01e0: 6574 6368 2d44 6573 743a 2065 6d70 7479 etch-Dest:.empty
- 0x01f0: 0d0a 5365 632d 4665 7463 682d 4d6f 6465 ..Sec-Fetch-Mode
- 0x0200: 3a20 636f 7273 0d0a 5365 632d 4665 7463 :.cors..Sec-Fetc
- 0x0210: 682d 5369 7465 3a20 7361 6d65 2d6f 7269 h-Site:.same-ori
- 0x0220: 6769 6e0d 0a53 6563 2d47 5043 3a20 310d gin..Sec-GPC:.1.
- 0x0230: 0a58 2d46 6f72 7761 7264 6564 2d46 6f72 .X-Forwarded-For
- 0x0240: 3a20 3139 322e 3136 382e 302e 330d 0a58 :.192.168.0.3..X
- 0x0250: 2d46 6f72 7761 7264 6564 2d48 6f73 743a -Forwarded-Host:
- 0x0260: 2077 7777 2e66 756e 6b74 6861 742e 636f .www.funkthat.co
- 0x0270: 6d0d 0a58 2d46 6f72 7761 7264 6564 2d53 m..X-Forwarded-S
- 0x0280: 6572 7665 723a 2077 7777 2e66 756e 6b74 erver:.www.funkt
- 0x0290: 6861 742e 636f 6d0d 0a43 6f6e 6e65 6374 hat.com..Connect
- 0x02a0: 696f 6e3a 204b 6565 702d 416c 6976 650d ion:.Keep-Alive.
- 0x02b0: 0a43 6f6e 7465 6e74 2d4c 656e 6774 683a .Content-Length:
- 0x02c0: 2035 3833 0d0a 0d0a 2d2d 2d2d 2d2d 2d2d .583....--------
- 0x02d0: 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d ----------------
- 0x02e0: 2d2d 2d2d 2d33 3435 3837 3235 3136 3133 -----34587251613
- 0x02f0: 3033 3132 3635 3935 3233 3135 3033 3739 0312659523150379
- 0x0300: 3130 310d 0a43 6f6e 7465 6e74 2d44 6973 101..Content-Dis
- 0x0310: 706f 7369 7469 6f6e 3a20 666f 726d 2d64 position:.form-d
- 0x0320: 6174 613b 206e 616d 653d 2273 7562 220d ata;.name="sub".
- 0x0330: 0a0d 0a7b 2265 6e64 706f 696e 7422 3a22 ...{"endpoint":"
- 0x0340: 6874 7470 733a 2f2f 7570 6461 7465 732e https://updates.
- 0x0350: 7075 7368 2e73 6572 7669 6365 732e 6d6f push.services.mo
- 0x0360: 7a69 6c6c 612e 636f 6d2f 7770 7573 682f zilla.com/wpush/
- 0x04c0: 5a4e 565a 6441 535f 4954 3822 7d7d 0d0a ZNVZdAS_IT8"}}..
- 0x04d0: 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d ----------------
- 0x04e0: 2d2d 2d2d 2d2d 2d2d 2d2d 2d2d 2d33 3435 -------------345
- 0x04f0: 3837 3235 3136 3133 3033 3132 3635 3935 8725161303126595
- 0x0500: 3233 3135 3033 3739 3130 312d 2d0d 0a 23150379101--..
- '''
-
- @staticmethod
- def _process_hexdump(data):
- lines = (x.split(':', 1)[1].strip().split(' ', 1)[0] for x in data.split('\n') if x.strip())
-
- return bytes.fromhex(''.join(lines))
-
- def test_basic(self):
- bstr = self._process_hexdump(self._testdata)
-
- # drop post line
- bstr = bstr.split(b'\r\n', 1)[1]
-
- _deb(repr(bstr))
- msg = email.message_from_bytes(bstr)
- _deb('msg hdrs:', repr(msg.items()))
-
- # get ct
- ct = msg['content-type']
- _deb('ct:', repr(ct))
-
- # get body:
- body = bstr.split(b'\r\n\r\n', 1)[1]
-
- # do the actual test:
- res = getdata(ct, body)
- self.assertEqual(res, '{"endpoint":"https://updates.push.services.mozilla.com/wpush/ZNVZdAS_IT8"}}')
|