Super simple and small Web Push and Notification service
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

97 lines
2.2 KiB

  1. # (A) INIT
  2. # (A1) LOAD MODULES
  3. from flask import Flask, render_template, request, make_response, send_from_directory
  4. from pywebpush import webpush, WebPushException
  5. import json
  6. import sys
  7. import time
  8. import threading
  9. # (A2) FLASK SETTINGS + INIT - CHANGE TO YOUR OWN!
  10. HOST_NAME = "localhost"
  11. HOST_NAME = "0.0.0.0"
  12. HOST_PORT = 80
  13. VAPID_SUBJECT = "mailto:your@email.com"
  14. VAPID_PRIVATE = "YOUR-PRIVATE-KEY"
  15. app = Flask(__name__,
  16. static_folder='../static',
  17. template_folder='.',
  18. )
  19. app.debug = True
  20. #app.config['EXPLAIN_TEMPLATE_LOADING'] = True
  21. # (B) VIEWS
  22. # (B1) "LANDING PAGE"
  23. @app.route("/")
  24. def index():
  25. return render_template("S2_perm_sw.html")
  26. # (B2) SERVICE WORKER
  27. @app.route("/S3_sw.js")
  28. def sw():
  29. import sys
  30. print(repr(app.static_folder), file=sys.stderr)
  31. response = make_response(send_from_directory(app.static_folder, "S3_sw.js"))
  32. return response
  33. threadlist = []
  34. def donotify(sub, sleep=10):
  35. global threadlist
  36. for t in threadlist:
  37. t.join(0)
  38. threadlist = [ t for t in threadlist if t.is_alive() ]
  39. if sleep:
  40. time.sleep(sleep)
  41. print('sending notification...')
  42. try:
  43. # Mozilla appears to not require the vapid key.
  44. # Chrome requires the vapid key.
  45. if True:
  46. kwargs = dict(vapid_private_key=None,
  47. #vapid_claims=None,
  48. )
  49. else:
  50. kwargs = dict(vapid_private_key=VAPID_PRIVATE,
  51. vapid_claims={ "sub": VAPID_SUBJECT },
  52. )
  53. webpush(
  54. subscription_info = sub,
  55. data = json.dumps({
  56. "title" : "Welcome!",
  57. "body" : "Yes, it works!",
  58. "icon" : "static/i-ico.png",
  59. "image" : "static/i-banner.png"
  60. }), **kwargs
  61. )
  62. print('done')
  63. except WebPushException as ex:
  64. print(ex)
  65. t = threading.Thread(target=donotify, args=(sub,))
  66. t.run()
  67. threadlist.append(t)
  68. # (B3) PUSH DEMO
  69. @app.route("/push", methods=["POST"])
  70. def push():
  71. # (B3-1) GET SUBSCRIBER
  72. sub = json.loads(request.form["sub"])
  73. import sys
  74. print('sub:', repr(json.dumps(sub)), file=sys.stderr)
  75. # (B3-2) TEST PUSH NOTIFICATION
  76. result = "OK"
  77. donotify(sub, 1)
  78. return result
  79. # (C) START
  80. if __name__ == "__main__":
  81. if len(sys.argv) == 2:
  82. HOST_PORT = int(sys.argv[1])
  83. app.run(HOST_NAME, HOST_PORT)