Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7559.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a configurable ResourceBudget for preventing over-subscription of the disk "properly". Adds a backpressure mechanism + flushing mechanism in order to ensure that batches get fully processed even if minsize hasn't yet been reached. Allows previous performance-reducing mitigations to be removed.
37 changes: 34 additions & 3 deletions docs/admin/reference/settings.md
Comment thread
dralley marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,41 @@ Defaults to `/var/lib/pulp/tmp/`.

### MAX\_CONCURRENT\_CONTENT

The size of the batch of content processed in one go when syncing content from
a remote.
The maximum number of concurrent downloads during sync. Controls how many HTTP
download tasks can run in parallel within the the sync pipeline.

Defaults to 25.
Defaults to 200.

!!! warning "Deprecated"
This setting is deprecated and may be removed in a future release.
Use `SYNC_MAX_IN_FLIGHT_ITEMS` instead, which provides similar
functionality. If `MAX_CONCURRENT_CONTENT` is set by the user and
`SYNC_MAX_IN_FLIGHT_ITEMS` is not, its value will be used as
`SYNC_MAX_IN_FLIGHT_ITEMS` automatically.


### SYNC\_MAX\_IN\_FLIGHT\_MB

The maximum total size (in megabytes) of downloaded artifacts that are waiting to be
saved. This limits the temporary disk space consumed by artifacts that have been
downloaded but not yet persisted.

When set, small artifacts will download with high concurrency while large artifacts
will automatically throttle to avoid exhausting disk space. This does not prevent
individual artifacts larger than the threshold from being synced, only limits
concurrency in such situations.

Defaults to 5120 (5gb)
Comment thread
dralley marked this conversation as resolved.

### SYNC\_MAX\_IN\_FLIGHT\_ITEMS
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the unit of measurement here is content, not artifacts, so this description is a little misleading - but it's a little difficult to describe.


The maximum number of downloaded artifacts that are waiting to be saved. Like
`SYNC_MAX_IN_FLIGHT_MB`, this limits unpersisted artifacts, but counts items
rather than bytes.

This is useful as a fallback when artifact sizes are not known ahead of time.

Defaults to `None` (no limit).

## Redis Settings

Expand Down
12 changes: 11 additions & 1 deletion pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,17 @@

DOMAIN_ENABLED = False

MAX_CONCURRENT_CONTENT = 25
MAX_CONCURRENT_CONTENT = 200
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset this back to where it was before we reduced it to limit worst-case scenarios


# Resource budget for sync pipeline: limits total in-flight artifact data between
# the ArtifactDownloader and ArtifactSaver stages. When set, these allow higher download
# concurrency for small artifacts while preventing disk exhaustion for large ones.
# None means no limit for that dimension.

# Maximum megabytes of in-flight downloaded artifacts
SYNC_MAX_IN_FLIGHT_MB = 5120
# Maximum number of items (content) able to be buffered between downloading and saving
SYNC_MAX_IN_FLIGHT_ITEMS = None

SHELL_PLUS_IMPORTS = [
"from pulpcore.app.util import get_domain, get_domain_pk, set_domain, get_url, extract_pk",
Expand Down
1 change: 1 addition & 0 deletions pulpcore/plugin/stages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .artifact_stages import (
ACSArtifactHandler,
ArtifactDownloader,
ArtifactResourceBudget,
ArtifactSaver,
GenericDownloader,
QueryExistingArtifacts,
Expand Down
29 changes: 25 additions & 4 deletions pulpcore/plugin/stages/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,25 @@ async def run(self):
break
yield content

async def batches(self, minsize=500):
async def batches(self, minsize=500, flush_event=None):
"""
Asynchronous iterator yielding batches of [DeclarativeContent][] from `self._in_q`.

The iterator will try to get as many instances of
[DeclarativeContent][] as possible without blocking, but
at least `minsize` instances.

A batch may be yielded early (before reaching `minsize`) in two cases:

- A queued item's `resolution()` is called while it sits in the pending batch,
signaling that another task is waiting on it. This prevents deadlock in the
content-futures pattern where related items depend on each other being saved.
- A `flush_event` is set by an external signal (e.g. resource budget pressure).

Args:
minsize (int): The minimum batch size to yield (unless it is the final batch)
flush_event (asyncio.Event): Optional event that triggers an early batch yield
when set. Cleared after yielding. Used for pressure-based flushing.

Yields:
A list of [DeclarativeContent][] instances
Expand Down Expand Up @@ -124,13 +133,20 @@ def add_to_batch(content):

get_listener = asyncio.ensure_future(self._in_q.get())
thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait())
flush_event_listener = asyncio.ensure_future(flush_event.wait()) if flush_event else None
while not shutdown:
done, pending = await asyncio.wait(
[thaw_event_listener, get_listener], return_when=asyncio.FIRST_COMPLETED
)
waitables = [thaw_event_listener, get_listener]
if flush_event_listener:
waitables.append(flush_event_listener)
done, pending = await asyncio.wait(waitables, return_when=asyncio.FIRST_COMPLETED)
if thaw_event_listener in done:
thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait())
no_block = True
if flush_event_listener and flush_event_listener in done:
# Don't re-arm until after we yield a batch, to avoid a spin loop
# when the event stays set but the batch is empty.
flush_event_listener = None
no_block = True
Comment on lines +145 to +149
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the fundamental difference here between the two events?
If I read it correctly, the thaw_event is cleared right before yielding.
Can these events be consolidated into one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure they can, I think they have different lifetimes and the way they get re-armed is different.

if get_listener in done:
content = await get_listener
add_to_batch(content)
Expand All @@ -153,8 +169,13 @@ def add_to_batch(content):
yield batch
batch = []
no_block = False
# Re-arm the flush listener after yielding
if flush_event and flush_event_listener is None:
flush_event_listener = asyncio.ensure_future(flush_event.wait())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to .clear() the event to "rearm" it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets cleared in release(), self.pressure and flush_event are the same

thaw_event_listener.cancel()
get_listener.cancel()
if flush_event_listener:
flush_event_listener.cancel()

async def put(self, item):
"""
Expand Down
166 changes: 158 additions & 8 deletions pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,107 @@
log = logging.getLogger(__name__)


class ArtifactResourceBudget:
"""Tracks aggregate resource consumption of in-flight artifacts.

Coordinates between `ArtifactDownloader` (acquires budget) and
`ArtifactSaver` (releases budget) to limit total temporary disk usage
from downloaded-but-not-yet-saved artifacts.

This allows higher download concurrency for small artifacts while still
protecting against disk exhaustion when syncing large artifacts.

Args:
max_bytes (int): Maximum total bytes of in-flight downloaded artifacts.
`None` means no byte limit (only item limit applies).
max_items (int): Maximum number of in-flight downloaded artifacts.
`None` means no item limit (only byte limit applies).
"""

def __init__(self, max_bytes=None, max_items=None):
self.max_bytes = max_bytes
self.max_items = max_items
self._current_bytes = 0 # total bytes currently acquired and not yet released
self._current_items = 0 # total items currently acquired and not yet released
self._available = asyncio.Event() # cleared when budget is exhausted; set on release
self._available.set()
self._lock = asyncio.Lock() # serializes acquire checks to prevent races
# set when acquire is blocked; signals the stage to flush its batch early
self.pressure = asyncio.Event()

@classmethod
def from_settings(cls):
"""Create an `ArtifactResourceBudget` from Django settings, or return `None`.

Reads `SYNC_MAX_IN_FLIGHT_MB` and `SYNC_MAX_IN_FLIGHT_ITEMS` from settings.
Falls back to the deprecated `MAX_CONCURRENT_CONTENT` for `max_items` if the
user set it and `SYNC_MAX_IN_FLIGHT_ITEMS` is not configured.
Returns `None` if no settings are configured.
"""
max_mb = settings.SYNC_MAX_IN_FLIGHT_MB
max_items = settings.SYNC_MAX_IN_FLIGHT_ITEMS

# Backward compatibility: honor deprecated MAX_CONCURRENT_CONTENT
if max_items is None:
max_items = settings.MAX_CONCURRENT_CONTENT

if max_mb is None and max_items is None:
return None
return cls(
max_bytes=max_mb * 1024 * 1024 if max_mb is not None else None,
max_items=max_items,
)

async def acquire(self, nbytes=0):
"""Block until resource budget is available.

Always allows at least one item through (even if over budget) when nothing
is currently in flight, to prevent deadlock.

When the budget is exhausted and `acquire` must wait, the `pressure` event
is set to signal downstream stages (e.g. `ArtifactSaver`) to flush their
batches early and free up budget.

Args:
nbytes (int): Size in bytes of the artifact(s) to be downloaded.
When 0 (e.g. because the remote did not advertise a size), the
artifact is invisible to the byte budget but still counts toward
the item limit ()`SYNC_MAX_IN_FLIGHT_ITEMS`).
"""
while True:
async with self._lock:
# Always allow if nothing is in flight (prevents deadlock)
if self._current_items == 0:
self._current_bytes += nbytes
self._current_items += 1
return

bytes_ok = self.max_bytes is None or (
self._current_bytes + nbytes <= self.max_bytes
)
items_ok = self.max_items is None or self._current_items < self.max_items

if bytes_ok and items_ok:
self._current_bytes += nbytes
self._current_items += 1
return

self._available.clear()
self.pressure.set()
await self._available.wait()

def release(self, nbytes=0):
"""Release resources after an artifact is saved and its temp file deleted.

Args:
nbytes (int): Size in bytes of the artifact that was saved.
"""
self._current_bytes = max(0, self._current_bytes - nbytes)
self._current_items = max(0, self._current_items - 1)
self.pressure.clear()
self._available.set()


def _check_for_forbidden_checksum_type(artifact):
"""Check if content doesn't have forbidden checksum type.

Expand Down Expand Up @@ -122,7 +223,7 @@ class GenericDownloader(Stage):
downloads completed. Since it's a stream the total count isn't known until it's finished.

This stage drains all available items from `self._in_q` and starts as many concurrent
downloading tasks as possible, up to the limit defined by ``self.max_concurrent_content``.
downloading tasks as possible, up to the limit defined by `self.max_concurrent_content`.

Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `_handle_content_unit`,
which must be implemented by the subclass, to handle processing the content unit and starting
Expand Down Expand Up @@ -220,28 +321,55 @@ class ArtifactDownloader(GenericDownloader):

Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `self._out_q` after all of
its [pulpcore.plugin.stages.DeclarativeArtifact][] objects have been handled.

Args:
resource_budget (ArtifactResourceBudget): Optional shared resource budget that
limits total in-flight artifact bytes/items between download and save.
args: unused positional arguments passed along to
[pulpcore.plugin.stages.GenericDownloader][].
kwargs: unused keyword arguments passed along to
[pulpcore.plugin.stages.GenericDownloader][].
"""

PROGRESS_REPORTING_MESSAGE = "Downloading Artifacts"
PROGRESS_REPORTING_CODE = "sync.downloading.artifacts"

def __init__(self, resource_budget=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.resource_budget = resource_budget

async def _handle_content_unit(self, d_content):
"""Handle one content unit.

Returns:
The number of downloads
"""
downloaders_for_content = [
d_artifact.download()
d_artifacts_to_download = [
d_artifact
for d_artifact in d_content.d_artifacts
if d_artifact.artifact._state.adding
and not d_artifact.deferred_download
and not d_artifact.artifact.file
]
if downloaders_for_content:
await asyncio.gather(*downloaders_for_content)
await self.put(d_content)
return len(downloaders_for_content)

budget_bytes = 0
if d_artifacts_to_download and self.resource_budget:
budget_bytes = sum(
d_artifact.artifact.size or 0 for d_artifact in d_artifacts_to_download
)
await self.resource_budget.acquire(budget_bytes)

try:
if d_artifacts_to_download:
await asyncio.gather(*(da.download() for da in d_artifacts_to_download))

await self.put(d_content)
except BaseException:
if budget_bytes and self.resource_budget:
self.resource_budget.release(budget_bytes)
raise

return len(d_artifacts_to_download)


class ArtifactSaver(Stage):
Expand All @@ -259,16 +387,28 @@ class ArtifactSaver(Stage):

This stage drains all available items from `self._in_q` and batches everything into one large
call to the db for efficiency.

Args:
resource_budget (ArtifactResourceBudget): Optional shared resource budget.
When provided, budget is released after artifacts are saved and temp files deleted.
args: unused positional arguments passed along to [pulpcore.plugin.stages.Stage][].
kwargs: unused keyword arguments passed along to [pulpcore.plugin.stages.Stage][].
"""

def __init__(self, resource_budget=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.resource_budget = resource_budget

async def run(self):
"""
The coroutine for this stage.

Returns:
The coroutine for this stage.
"""
async for batch in self.batches(minsize=settings.MAX_CONCURRENT_CONTENT):
flush_event = self.resource_budget.pressure if self.resource_budget else None
minsize = self.resource_budget.max_items if self.resource_budget else 200
Copy link
Copy Markdown
Contributor Author

@dralley dralley May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little weird because, on one hand actually the resource budget ought to manage this completely, and on the other hand it's hard to actually see the downside of setting a reasonably sized (fairly large) batch size by default? It's going to exist anyway (defaults to 500 if you don't set it manually).

async for batch in self.batches(minsize=minsize, flush_event=flush_event):
da_to_save = []
for d_content in batch:
for d_artifact in d_content.d_artifacts:
Expand All @@ -291,6 +431,16 @@ async def run(self):
if await aos.path.exists(tmp_file_path):
await aos.remove(tmp_file_path)

# Release budget after temp files are cleaned up so the downloader can proceed
if self.resource_budget:
for d_content in batch:
budget_bytes = sum(
d_artifact.artifact.size or 0
for d_artifact in d_content.d_artifacts
if not d_artifact.deferred_download
)
self.resource_budget.release(budget_bytes)

for d_content in batch:
await self.put(d_content)

Expand Down
7 changes: 5 additions & 2 deletions pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pulpcore.plugin.stages.artifact_stages import (
ACSArtifactHandler,
ArtifactDownloader,
ArtifactResourceBudget,
ArtifactSaver,
QueryExistingArtifacts,
RemoteArtifactSaver,
Expand Down Expand Up @@ -129,6 +130,8 @@ def pipeline_stages(self, new_version):
list: List of [pulpcore.plugin.stages.Stage][] instances

"""
resource_budget = ArtifactResourceBudget.from_settings()

pipeline = [
self.first_stage,
QueryExistingArtifacts(),
Expand All @@ -137,8 +140,8 @@ def pipeline_stages(self, new_version):
pipeline.append(ACSArtifactHandler())
pipeline.extend(
[
ArtifactDownloader(),
ArtifactSaver(),
ArtifactDownloader(resource_budget=resource_budget),
ArtifactSaver(resource_budget=resource_budget),
QueryExistingContents(),
ContentSaver(),
RemoteArtifactSaver(),
Expand Down
Loading
Loading