Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.venv/
node_modules/
src/vendor/
.vscode/
.vscode/
.wrangler/
35 changes: 3 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
# Vendoring Packages: FastAPI + Jinja2 Example
# Python Workers: FastAPI-MCP Example

*Note: You must have Python Packages enabled on your account for built-in packages to work. Request Access to our Closed Beta using [This Form](https://forms.gle/FcjjhV3YtPyjRPaL8)*

This is an example of a Python Worker that uses a built-in package (FastAPI) with a vendored package (Jinja2).
This is an example of a Python Worker that uses the FastAPI-MCP package.

## Adding Packages

Built-in packages can be selected from [this list](https://developers.cloudflare.com/workers/languages/python/packages/#supported-packages) and added to your `requirements.txt` file. These can be used with no other explicit install step.

Vendored packages are added to your source files and need to be installed in a special manner. The Python Workers team plans to make this process automatic in the future, but for now, manual steps need to be taken.

### Vendoring Packages

[//]: # (NOTE: when updating the instructions below, be sure to also update the vendoring.yml CI workflow)

First, install Python3.12 and pip for Python 3.12.

*Currently, other versions of Python will not work - use 3.12!*
Expand All @@ -30,34 +24,11 @@ Within our virtual environment, install the pyodide CLI:
.venv/bin/pyodide venv .venv-pyodide
```

Next, add packages to your vendor.txt file. Here we'll add jinja2
```
jinja2
```

Lastly, add these packages to your source files at `src/vendor`. For any additional packages, re-run this command.
Lastly, download the vendored packages. For any additional packages, re-run this command.
```console
.venv-pyodide/bin/pip install -t src/vendor -r vendor.txt
```

### Using Vendored packages

In your wrangler.toml, make the vendor directory available:

```toml
[[rules]]
globs = ["vendor/**"]
type = "Data"
fallthrough = true
```

Now, you can import and use the packages:

```python
import jinja2
# ... etc ...
```

### Developing and Deploying

To develop your Worker, run `npx wrangler@latest dev`.
Expand Down
198 changes: 22 additions & 176 deletions src/asgi.py
Original file line number Diff line number Diff line change
@@ -1,159 +1,6 @@
from asyncio import Future, Event, Queue, ensure_future, sleep, create_task
from asyncio import Event, Future, Queue, create_task, ensure_future, sleep
from contextlib import contextmanager
from inspect import isawaitable
import typing

if typing.TYPE_CHECKING:
from typing import (
Any,
Callable,
Literal,
Optional,
Protocol,
TypedDict,
Union,
NotRequired,
)
from collections.abc import Awaitable, Iterable, MutableMapping

class HTTPRequestEvent(TypedDict):
type: Literal["http.request"]
body: bytes
more_body: bool

class HTTPResponseDebugEvent(TypedDict):
type: Literal["http.response.debug"]
info: dict[str, object]

class HTTPResponseStartEvent(TypedDict):
type: Literal["http.response.start"]
status: int
headers: NotRequired[Iterable[tuple[bytes, bytes]]]
trailers: NotRequired[bool]

class HTTPResponseBodyEvent(TypedDict):
type: Literal["http.response.body"]
body: bytes
more_body: NotRequired[bool]

class HTTPResponseTrailersEvent(TypedDict):
type: Literal["http.response.trailers"]
headers: Iterable[tuple[bytes, bytes]]
more_trailers: bool

class HTTPServerPushEvent(TypedDict):
type: Literal["http.response.push"]
path: str
headers: Iterable[tuple[bytes, bytes]]

class HTTPDisconnectEvent(TypedDict):
type: Literal["http.disconnect"]

class WebSocketConnectEvent(TypedDict):
type: Literal["websocket.connect"]

class WebSocketAcceptEvent(TypedDict):
type: Literal["websocket.accept"]
subprotocol: NotRequired[str | None]
headers: NotRequired[Iterable[tuple[bytes, bytes]]]

class _WebSocketReceiveEventBytes(TypedDict):
type: Literal["websocket.receive"]
bytes: bytes
text: NotRequired[None]

class _WebSocketReceiveEventText(TypedDict):
type: Literal["websocket.receive"]
bytes: NotRequired[None]
text: str

WebSocketReceiveEvent = Union[
_WebSocketReceiveEventBytes, _WebSocketReceiveEventText
]

class _WebSocketSendEventBytes(TypedDict):
type: Literal["websocket.send"]
bytes: bytes
text: NotRequired[None]

class _WebSocketSendEventText(TypedDict):
type: Literal["websocket.send"]
bytes: NotRequired[None]
text: str

WebSocketSendEvent = Union[_WebSocketSendEventBytes, _WebSocketSendEventText]

class WebSocketResponseStartEvent(TypedDict):
type: Literal["websocket.http.response.start"]
status: int
headers: Iterable[tuple[bytes, bytes]]

class WebSocketResponseBodyEvent(TypedDict):
type: Literal["websocket.http.response.body"]
body: bytes
more_body: NotRequired[bool]

class WebSocketDisconnectEvent(TypedDict):
type: Literal["websocket.disconnect"]
code: int
reason: NotRequired[str | None]

class WebSocketCloseEvent(TypedDict):
type: Literal["websocket.close"]
code: NotRequired[int]
reason: NotRequired[str | None]

class LifespanStartupEvent(TypedDict):
type: Literal["lifespan.startup"]

class LifespanShutdownEvent(TypedDict):
type: Literal["lifespan.shutdown"]

class LifespanStartupCompleteEvent(TypedDict):
type: Literal["lifespan.startup.complete"]

class LifespanStartupFailedEvent(TypedDict):
type: Literal["lifespan.startup.failed"]
message: str

class LifespanShutdownCompleteEvent(TypedDict):
type: Literal["lifespan.shutdown.complete"]

class LifespanShutdownFailedEvent(TypedDict):
type: Literal["lifespan.shutdown.failed"]
message: str

WebSocketEvent = Union[
WebSocketReceiveEvent, WebSocketDisconnectEvent, WebSocketConnectEvent
]

ASGIReceiveEvent = Union[
HTTPRequestEvent,
HTTPDisconnectEvent,
WebSocketConnectEvent,
WebSocketReceiveEvent,
WebSocketDisconnectEvent,
LifespanStartupEvent,
LifespanShutdownEvent,
]

ASGISendEvent = Union[
HTTPResponseStartEvent,
HTTPResponseBodyEvent,
HTTPResponseTrailersEvent,
HTTPServerPushEvent,
HTTPDisconnectEvent,
WebSocketAcceptEvent,
WebSocketSendEvent,
WebSocketResponseStartEvent,
WebSocketResponseBodyEvent,
WebSocketCloseEvent,
LifespanStartupCompleteEvent,
LifespanStartupFailedEvent,
LifespanShutdownCompleteEvent,
LifespanShutdownFailedEvent,
]


ASGI = {"spec_version": "2.0", "version": "3.0"}

Expand Down Expand Up @@ -251,8 +98,9 @@ async def send(got):
return shutdown


async def process_request(app, req, env):
async def process_request(app, req, env, ctx):
from js import Object, Response, TransformStream

from pyodide.ffi import create_proxy

status = None
Expand All @@ -274,14 +122,12 @@ async def process_request(app, req, env):
await receive_queue.put({"body": b"", "more_body": False, "type": "http.request"})

async def receive():
print("Receiving")
message = None
if not receive_queue.empty():
message = await receive_queue.get()
else:
await finished_response.wait()
message = {"type": "http.disconnect"}
print(f"Received {message}")
return message

# Create a transform stream for handling streaming responses
Expand All @@ -290,12 +136,11 @@ async def receive():
writable = transform_stream.writable
writer = writable.getWriter()

async def send(got: "ASGISendEvent"):
async def send(got):
nonlocal status
nonlocal headers
nonlocal is_sse

print(got)
if got["type"] == "http.response.start":
status = got["status"]
# Like above, we need to convert byte-pairs into string explicitly.
Expand All @@ -305,20 +150,18 @@ async def send(got: "ASGISendEvent"):
if k.lower() == "content-type" and v.lower().startswith(
"text/event-stream"
):
print("SSE RESPONSE")
is_sse = True

# For SSE, create and return the response immediately after http.response.start
resp = Response.new(
readable, headers=Object.fromEntries(headers), status=status
)
result.set_result(resp)
break
if is_sse:
# For SSE, create and return the response immediately after http.response.start
resp = Response.new(
readable, headers=Object.fromEntries(headers), status=status
)
result.set_result(resp)

elif got["type"] == "http.response.body":
body = got["body"]
more_body = got.get("more_body", False)
print(f"{body=}, {more_body=}")

# Convert body to JS buffer
px = create_proxy(body)
Expand All @@ -337,6 +180,7 @@ async def send(got: "ASGISendEvent"):
buf.data, headers=Object.fromEntries(headers), status=status
)
result.set_result(resp)
await writer.close()
finished_response.set()

# Run the application in the background to handle SSE
Expand All @@ -346,17 +190,13 @@ async def run_app():

# If we get here and no response has been set yet, the app didn't generate a response
if not result.done():
await writer.close() # Close the writer
finished_response.set()
result.set_exception(
RuntimeError("The application did not generate a response")
)
raise RuntimeError("The application did not generate a response") # noqa: TRY301
except Exception as e:
# Handle any errors in the application
if not result.done():
result.set_exception(e)
await writer.close() # Close the writer
finished_response.set()
result.set_exception(e)

# Create task to run the application in the background
app_task = create_task(run_app())
Expand All @@ -367,7 +207,13 @@ async def run_app():
# For non-SSE responses, we need to wait for the application to complete
if not is_sse:
await app_task
print(f"Returning response! {is_sse}")
else: # noqa: PLR5501
if ctx is not None:
ctx.waitUntil(create_proxy(app_task))
else:
raise RuntimeError(
"Server-Side-Events require ctx to be passed to asgi.fetch"
)
return response


Expand Down Expand Up @@ -423,9 +269,9 @@ async def ws_receive():
return Response.new(None, status=101, webSocket=client)


async def fetch(app, req, env):
async def fetch(app, req, env, ctx=None):
shutdown = await start_application(app)
result = await process_request(app, req, env)
result = await process_request(app, req, env, ctx)
await shutdown()
return result

Expand Down
2 changes: 2 additions & 0 deletions src/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from logger import logger
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import PlainTextResponse, Response
async def http_exception(request: Request, exc: Exception) -> Response:
assert isinstance(exc, HTTPException)
logger.exception(exc)
if exc.status_code in {204, 304}:
return Response(status_code=exc.status_code, headers=exc.headers)
return PlainTextResponse(exc.detail, status_code=exc.status_code, headers=exc.headers)
11 changes: 11 additions & 0 deletions src/httpx_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from httpx._transports.jsfetch import AsyncJavascriptFetchTransport

orig_handle_async_request = AsyncJavascriptFetchTransport.handle_async_request

async def handle_async_request(self, request):
response = await orig_handle_async_request(self, request)
# fix content-encoding headers because the javascript fetch handles that
response.headers.update({"content-encoding": "identity"})
return response

AsyncJavascriptFetchTransport.handle_async_request = handle_async_request
Loading