-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebhook_receiver.py
More file actions
264 lines (208 loc) · 10.3 KB
/
webhook_receiver.py
File metadata and controls
264 lines (208 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
#!/usr/bin/env python3
"""
Simple webhook receiver for testing Microsoft Graph notifications.
This creates a local HTTP server to receive webhook notifications.
"""
import http.server
import socketserver
import json
import urllib.parse
import os
from datetime import datetime, timezone
import threading
import webbrowser
def get_utc_timestamp() -> str:
"""Get current UTC timestamp in ISO format"""
return datetime.now(timezone.utc).isoformat()
def get_utc_timestamp_filename() -> str:
"""Get current UTC timestamp formatted for filenames"""
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
def get_utc_display_time() -> str:
"""Get current UTC time for display purposes"""
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
class WebhookHandler(http.server.BaseHTTPRequestHandler):
"""HTTP request handler for webhook notifications"""
def do_GET(self):
"""Handle GET requests (validation)"""
# Parse query parameters
parsed_url = urllib.parse.urlparse(self.path)
query_params = urllib.parse.parse_qs(parsed_url.query)
# Log the request
self.log_request_details("GET")
# Check for validation token
if 'validationToken' in query_params:
validation_token = query_params['validationToken'][0]
print(f"INFO: Webhook validation request received!")
print(f"INFO: Validation token: {validation_token}")
# Respond with validation token in plain text
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(validation_token.encode('utf-8'))
print("INFO: Validation response sent successfully!")
else:
# Regular GET request
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
html = """
<html>
<head><title>Webhook Receiver</title></head>
<body>
<h1>Microsoft Graph Webhook Receiver</h1>
<p>This server is ready to receive webhook notifications.</p>
<p>Current time: {}</p>
</body>
</html>
""".format(get_utc_display_time())
self.wfile.write(html.encode('utf-8'))
def do_POST(self):
"""Handle POST requests (notifications and validation)"""
self.log_request_details("POST")
# Parse query parameters for validation token
parsed_url = urllib.parse.urlparse(self.path)
query_params = urllib.parse.parse_qs(parsed_url.query)
# Check for validation token in POST request (some webhooks send POST for validation)
if 'validationToken' in query_params:
validation_token = query_params['validationToken'][0]
print(f"Webhook validation request received via POST!")
print(f"Validation token: {validation_token}")
# Respond with validation token in plain text
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(validation_token.encode('utf-8'))
print("Validation response sent successfully!")
return
# Read the request body for normal notifications
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length)
# Handle empty body (validation requests sometimes have empty body)
if content_length == 0 or not post_data.strip():
print("Empty POST request received (likely validation)")
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b"OK")
return
try:
# Parse JSON payload
notification_data = json.loads(post_data.decode('utf-8'))
print(f"\nWEBHOOK NOTIFICATION RECEIVED!")
print(f"Time: {get_utc_display_time()}")
print(f"Raw payload:")
print(json.dumps(notification_data, indent=2))
# Process notifications
if 'value' in notification_data:
notifications = notification_data['value']
print(f"\nFound {len(notifications)} notification(s):")
for i, notification in enumerate(notifications, 1):
print(f"\n--- Notification {i} ---")
print(f"Subscription ID: {notification.get('subscriptionId', 'N/A')}")
print(f"Change Type: {notification.get('changeType', 'N/A')}")
print(f"Resource: {notification.get('resource', 'N/A')}")
print(f"Client State: {notification.get('clientState', 'N/A')}")
print(f"Subscription Expiration: {notification.get('subscriptionExpirationDateTime', 'N/A')}")
# Check for resource data
if 'resourceData' in notification:
resource_data = notification['resourceData']
print(f"Resource Data:")
print(f" ID: {resource_data.get('id', 'N/A')}")
print(f" Type: {resource_data.get('@odata.type', 'N/A')}")
print(f" ETag: {resource_data.get('@odata.etag', 'N/A')}")
# Log to file
self.log_to_file(notification_data)
# Send success response
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {"status": "received", "timestamp": get_utc_timestamp()}
self.wfile.write(json.dumps(response).encode('utf-8'))
print("Notification processed successfully!")
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
print(f"Raw data: {post_data}")
self.send_response(400)
self.send_header('Content-type', 'application/json')
self.end_headers()
error_response = {"error": "Invalid JSON", "details": str(e)}
self.wfile.write(json.dumps(error_response).encode('utf-8'))
except Exception as e:
print(f"Error processing notification: {e}")
self.send_response(500)
self.send_header('Content-type', 'application/json')
self.end_headers()
error_response = {"error": "Processing failed", "details": str(e)}
self.wfile.write(json.dumps(error_response).encode('utf-8'))
def log_request_details(self, method: str):
"""Log request details"""
print(f"\n{method} request received:")
print(f"Path: {self.path}")
print(f"Client: {self.client_address}")
print(f"Headers:")
for header, value in self.headers.items():
print(f" {header}: {value}")
def log_to_file(self, data: dict):
"""Log notification to file"""
try:
# Create webhook_notifications directory if it doesn't exist
# Use absolute path to ensure we always write to the correct location
script_dir = os.path.dirname(os.path.abspath(__file__))
notifications_dir = os.path.join(script_dir, "webhook_notifications")
if not os.path.exists(notifications_dir):
os.makedirs(notifications_dir)
timestamp = get_utc_timestamp_filename()
filename = f"webhook_notification_{timestamp}.json"
filepath = os.path.join(notifications_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump({
"timestamp": get_utc_timestamp(),
"notification": data
}, f, indent=2)
print(f"Notification saved to: {filepath}")
except Exception as e:
print(f"Error saving notification to file: {e}")
def log_message(self, format, *args):
"""Override to reduce server logging noise"""
pass
def start_webhook_server(port: int = 8000):
"""Start the webhook server"""
print("Starting Microsoft Graph Webhook Receiver...")
print(f"Server will run on: http://localhost:{port}")
print(f"Webhook URL: http://localhost:{port}")
print("Press Ctrl+C to stop the server\n")
try:
with socketserver.TCPServer(("", port), WebhookHandler) as httpd:
print(f"Server started successfully on port {port}")
# Open browser to show the server is running
def open_browser():
import time
time.sleep(1) # Wait a moment for server to start
try:
webbrowser.open(f"http://localhost:{port}")
except:
pass
threading.Thread(target=open_browser, daemon=True).start()
print("Use this URL as your notification URL in the Graph webhook tester:")
print(f" http://localhost:{port}")
print("\nWaiting for webhook notifications...\n")
httpd.serve_forever()
except KeyboardInterrupt:
print("\nServer stopped by user")
except OSError as e:
if e.errno == 10048: # Port already in use
print(f"Port {port} is already in use. Try a different port:")
print(f" python webhook_receiver.py --port {port + 1}")
else:
print(f"Error starting server: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
def main():
"""Main function"""
import argparse
parser = argparse.ArgumentParser(description="Microsoft Graph Webhook Receiver")
parser.add_argument("--port", type=int, default=8000, help="Port to run the server on (default: 8000)")
args = parser.parse_args()
start_webhook_server(args.port)
if __name__ == "__main__":
main()