Skip to content

Commit a95c7a9

Browse files
committed
Updated code example
1 parent 4c20159 commit a95c7a9

4 files changed

Lines changed: 64 additions & 60 deletions

File tree

2026/cqrs/EXAMPLES.md

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,38 @@ source .venv/bin/activate
1212
uvicorn before:app --reload # to run the before version of the code
1313
```
1414

15-
Here are a few example requests you can use to test the FastAPI app:
15+
Here are a few example requests you can use to test the FastAPI app:
16+
17+
# Create a ticket
18+
19+
curl -X POST http://localhost:8000/tickets \
20+
-H "Content-Type: application/json" \
21+
-d '{
22+
"customer_id": "cust-123",
23+
"subject": "Login not working",
24+
"message": "I cannot log in after resetting my password."
25+
}'
26+
27+
# Update ticket status
28+
29+
curl -X PATCH http://localhost:8000/tickets/69722457531892ec7a576522 \
30+
-H "Content-Type: application/json" \
31+
-d '{
32+
"status": "triaged"
33+
}'
34+
35+
# Update ticket status (after version)
36+
37+
curl -X POST http://localhost:8000/tickets/697239ff4e89771fad8ed94f/status \
38+
-H "Content-Type: application/json" \
39+
-d '{
40+
"new_status": "triaged"
41+
}'
42+
43+
# Add a note
44+
45+
curl -X POST http://localhost:8000/tickets/697239ff4e89771fad8ed94f/agent-note \
46+
-H "Content-Type: application/json" \
47+
-d '{
48+
"note": "Customer reset their password twice; investigating auth service."
49+
}'

2026/cqrs/after.py

Lines changed: 10 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33
from datetime import datetime, timezone
44
from typing import Any, Literal
55

6-
from bson import ObjectId
6+
from db import Database, get_db, oid_str, parse_object_id, shutdown_db
77
from fastapi import FastAPI, HTTPException
88
from pydantic import BaseModel
9-
from pymongo import AsyncMongoClient
10-
from pymongo.asynchronous.database import AsyncDatabase
119

1210
# ----------------------------
1311
# Types / helpers
@@ -22,17 +20,6 @@ def utcnow() -> datetime:
2220
return datetime.now(timezone.utc)
2321

2422

25-
def oid_str(oid: ObjectId) -> str:
26-
return str(oid)
27-
28-
29-
def parse_object_id(ticket_id: str) -> ObjectId:
30-
try:
31-
return ObjectId(ticket_id)
32-
except Exception as e:
33-
raise HTTPException(status_code=400, detail="Invalid ticket id") from e
34-
35-
3623
def make_preview(message: str) -> str:
3724
msg = message.strip().replace("\n", " ")
3825
return msg if len(msg) <= PREVIEW_LEN else msg[: PREVIEW_LEN - 1] + "…"
@@ -42,8 +29,6 @@ def make_preview(message: str) -> str:
4229
# MongoDB configuration (local + auth)
4330
# ----------------------------
4431

45-
MONGODB_URI = "mongodb://root:example@localhost:27017/?authSource=admin"
46-
DATABASE_NAME = "cqrs_demo"
4732

4833
COMMANDS_COLL = "ticket_commands" # source of truth
4934
READS_COLL = "ticket_reads" # read projection for list/dashboard
@@ -54,16 +39,10 @@ def make_preview(message: str) -> str:
5439

5540
app = FastAPI()
5641

57-
mongo_client: AsyncMongoClient | None = None
58-
db: AsyncDatabase | None = None
59-
6042

6143
@app.on_event("startup")
6244
async def startup() -> None:
63-
global mongo_client, db
64-
65-
mongo_client = AsyncMongoClient(MONGODB_URI)
66-
db = mongo_client[DATABASE_NAME]
45+
db = get_db()
6746

6847
# Write-side indexes (source of truth)
6948
await db[COMMANDS_COLL].create_index("status")
@@ -77,13 +56,7 @@ async def startup() -> None:
7756

7857
@app.on_event("shutdown")
7958
async def shutdown() -> None:
80-
if mongo_client is not None:
81-
mongo_client.close()
82-
83-
84-
def get_db() -> AsyncDatabase:
85-
assert db is not None
86-
return db
59+
await shutdown_db()
8760

8861

8962
# ----------------------------
@@ -135,7 +108,7 @@ class TicketDetails(BaseModel):
135108
# ----------------------------
136109

137110

138-
async def cmd_create_ticket(db: AsyncDatabase, cmd: CreateTicket) -> str:
111+
async def cmd_create_ticket(db: Database, cmd: CreateTicket) -> str:
139112
now = utcnow()
140113
doc: dict[str, Any] = {
141114
"customer_id": cmd.customer_id,
@@ -150,9 +123,7 @@ async def cmd_create_ticket(db: AsyncDatabase, cmd: CreateTicket) -> str:
150123
return oid_str(res.inserted_id)
151124

152125

153-
async def cmd_update_status(
154-
db: AsyncDatabase, ticket_id: str, cmd: UpdateStatus
155-
) -> None:
126+
async def cmd_update_status(db: Database, ticket_id: str, cmd: UpdateStatus) -> None:
156127
_id = parse_object_id(ticket_id)
157128

158129
existing = await db[COMMANDS_COLL].find_one({"_id": _id})
@@ -168,9 +139,7 @@ async def cmd_update_status(
168139
)
169140

170141

171-
async def cmd_add_agent_note(
172-
db: AsyncDatabase, ticket_id: str, cmd: AddAgentNote
173-
) -> None:
142+
async def cmd_add_agent_note(db: Database, ticket_id: str, cmd: AddAgentNote) -> None:
174143
_id = parse_object_id(ticket_id)
175144

176145
existing = await db[COMMANDS_COLL].find_one({"_id": _id})
@@ -188,7 +157,7 @@ async def cmd_add_agent_note(
188157
# ----------------------------
189158

190159

191-
async def project_ticket(db: AsyncDatabase, ticket_id: str) -> None:
160+
async def project_ticket(db: Database, ticket_id: str) -> None:
192161
"""
193162
Read model goal:
194163
- store exactly what the list/dashboard needs
@@ -345,6 +314,8 @@ async def dashboard() -> dict[str, int]:
345314
pipeline = [{"$group": {"_id": "$status", "count": {"$sum": 1}}}]
346315

347316
counts: dict[str, int] = {"open": 0, "triaged": 0, "closed": 0}
348-
async for row in db[READS_COLL].aggregate(pipeline):
317+
318+
cursor = await db[READS_COLL].aggregate(pipeline)
319+
async for row in cursor:
349320
counts[str(row["_id"])] = int(row["count"])
350321
return counts

2026/cqrs/before.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,20 @@
33
from datetime import datetime, timezone
44
from typing import Any, Literal
55

6-
from bson import ObjectId
7-
from db import get_db, shutdown_db
6+
from db import get_db, oid_str, shutdown_db
87
from fastapi import FastAPI, HTTPException
98
from pydantic import BaseModel
109

1110
Status = Literal["open", "triaged", "closed"]
1211

1312
PREVIEW_LEN = 80
13+
TICKETS_COLL = "tickets"
1414

1515

1616
def utcnow() -> datetime:
1717
return datetime.now(timezone.utc)
1818

1919

20-
def oid_str(oid: ObjectId) -> str:
21-
return str(oid)
22-
23-
24-
def parse_object_id(ticket_id: str) -> ObjectId:
25-
try:
26-
return ObjectId(ticket_id)
27-
except Exception as e:
28-
raise HTTPException(status_code=400, detail="Invalid ticket id") from e
29-
30-
3120
def make_preview(message: str) -> str:
3221
msg = message.strip().replace("\n", " ")
3322
return msg if len(msg) <= PREVIEW_LEN else msg[: PREVIEW_LEN - 1] + "…"

2026/cqrs/db.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
1-
from __future__ import annotations
1+
from typing import Any, Mapping
22

3+
from bson import ObjectId
34
from pymongo.asynchronous.database import AsyncDatabase
5+
from pymongo.asynchronous.mongo_client import AsyncMongoClient
46

57
MONGODB_URI = "mongodb://root:example@localhost:27017/?authSource=admin"
68
DATABASE_NAME = "cqrs_demo"
7-
TICKETS_COLL = "tickets"
89

9-
mongo_client = AsyncMongoClient(MONGODB_URI)
10-
db: AsyncDatabase | None = None
10+
type Database = AsyncDatabase[Mapping[str, Any]]
11+
mongo_client = AsyncMongoClient[Mapping[str, Any]](MONGODB_URI)
12+
db: Database | None = None
1113

1214

13-
def get_db() -> AsyncDatabase:
15+
def get_db() -> Database:
1416
return mongo_client[DATABASE_NAME]
1517

1618

17-
def shutdown_db() -> None:
18-
mongo_client.close()
19+
async def shutdown_db() -> None:
20+
await mongo_client.close()
21+
22+
23+
def oid_str(oid: ObjectId) -> str:
24+
return str(oid)
25+
26+
27+
def parse_object_id(ticket_id: str) -> ObjectId:
28+
return ObjectId(ticket_id)

0 commit comments

Comments
 (0)