-
Notifications
You must be signed in to change notification settings - Fork 145
Add a ResourceBudget mechanism which keeps disk usage in check during syncs #7649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Add a configurable ResourceBudget for preventing over-subscription of the disk "properly". Adds a backpressure mechanism + flushing mechanism in order to ensure that batches get fully processed even if minsize hasn't yet been reached. Allows previous performance-reducing mitigations to be removed. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -474,10 +474,41 @@ Defaults to `/var/lib/pulp/tmp/`. | |
|
|
||
| ### MAX\_CONCURRENT\_CONTENT | ||
|
|
||
| The size of the batch of content processed in one go when syncing content from | ||
| a remote. | ||
| The maximum number of concurrent downloads during sync. Controls how many HTTP | ||
| download tasks can run in parallel within the the sync pipeline. | ||
|
|
||
| Defaults to 25. | ||
| Defaults to 200. | ||
|
|
||
| !!! warning "Deprecated" | ||
| This setting is deprecated and may be removed in a future release. | ||
| Use `SYNC_MAX_IN_FLIGHT_ITEMS` instead, which provides similar | ||
| functionality. If `MAX_CONCURRENT_CONTENT` is set by the user and | ||
| `SYNC_MAX_IN_FLIGHT_ITEMS` is not, its value will be used as | ||
| `SYNC_MAX_IN_FLIGHT_ITEMS` automatically. | ||
|
|
||
|
|
||
| ### SYNC\_MAX\_IN\_FLIGHT\_MB | ||
|
|
||
| The maximum total size (in megabytes) of downloaded artifacts that are waiting to be | ||
| saved. This limits the temporary disk space consumed by artifacts that have been | ||
| downloaded but not yet persisted. | ||
|
|
||
| When set, small artifacts will download with high concurrency while large artifacts | ||
| will automatically throttle to avoid exhausting disk space. This does not prevent | ||
| individual artifacts larger than the threshold from being synced, only limits | ||
| concurrency in such situations. | ||
|
|
||
| Defaults to 5120 (5gb) | ||
|
dralley marked this conversation as resolved.
|
||
|
|
||
| ### SYNC\_MAX\_IN\_FLIGHT\_ITEMS | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the unit of measurement here is content, not artifacts, so this description is a little misleading - but it's a little difficult to describe. |
||
|
|
||
| The maximum number of downloaded artifacts that are waiting to be saved. Like | ||
| `SYNC_MAX_IN_FLIGHT_MB`, this limits unpersisted artifacts, but counts items | ||
| rather than bytes. | ||
|
|
||
| This is useful as a fallback when artifact sizes are not known ahead of time. | ||
|
|
||
| Defaults to `None` (no limit). | ||
|
|
||
| ## Redis Settings | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -403,7 +403,17 @@ | |
|
|
||
| DOMAIN_ENABLED = False | ||
|
|
||
| MAX_CONCURRENT_CONTENT = 25 | ||
| MAX_CONCURRENT_CONTENT = 200 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reset this back to where it was before we reduced it to limit worst-case scenarios |
||
|
|
||
| # Resource budget for sync pipeline: limits total in-flight artifact data between | ||
| # the ArtifactDownloader and ArtifactSaver stages. When set, these allow higher download | ||
| # concurrency for small artifacts while preventing disk exhaustion for large ones. | ||
| # None means no limit for that dimension. | ||
|
|
||
| # Maximum megabytes of in-flight downloaded artifacts | ||
| SYNC_MAX_IN_FLIGHT_MB = 5120 | ||
| # Maximum number of items (content) able to be buffered between downloading and saving | ||
| SYNC_MAX_IN_FLIGHT_ITEMS = None | ||
|
|
||
| SHELL_PLUS_IMPORTS = [ | ||
| "from pulpcore.app.util import get_domain, get_domain_pk, set_domain, get_url, extract_pk", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,16 +77,25 @@ async def run(self): | |
| break | ||
| yield content | ||
|
|
||
| async def batches(self, minsize=500): | ||
| async def batches(self, minsize=500, flush_event=None): | ||
| """ | ||
| Asynchronous iterator yielding batches of [DeclarativeContent][] from `self._in_q`. | ||
|
|
||
| The iterator will try to get as many instances of | ||
| [DeclarativeContent][] as possible without blocking, but | ||
| at least `minsize` instances. | ||
|
|
||
| A batch may be yielded early (before reaching `minsize`) in two cases: | ||
|
|
||
| - A queued item's `resolution()` is called while it sits in the pending batch, | ||
| signaling that another task is waiting on it. This prevents deadlock in the | ||
| content-futures pattern where related items depend on each other being saved. | ||
| - A `flush_event` is set by an external signal (e.g. resource budget pressure). | ||
|
|
||
| Args: | ||
| minsize (int): The minimum batch size to yield (unless it is the final batch) | ||
| flush_event (asyncio.Event): Optional event that triggers an early batch yield | ||
| when set. Cleared after yielding. Used for pressure-based flushing. | ||
|
|
||
| Yields: | ||
| A list of [DeclarativeContent][] instances | ||
|
|
@@ -124,13 +133,20 @@ def add_to_batch(content): | |
|
|
||
| get_listener = asyncio.ensure_future(self._in_q.get()) | ||
| thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait()) | ||
| flush_event_listener = asyncio.ensure_future(flush_event.wait()) if flush_event else None | ||
| while not shutdown: | ||
| done, pending = await asyncio.wait( | ||
| [thaw_event_listener, get_listener], return_when=asyncio.FIRST_COMPLETED | ||
| ) | ||
| waitables = [thaw_event_listener, get_listener] | ||
| if flush_event_listener: | ||
| waitables.append(flush_event_listener) | ||
| done, pending = await asyncio.wait(waitables, return_when=asyncio.FIRST_COMPLETED) | ||
| if thaw_event_listener in done: | ||
| thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait()) | ||
| no_block = True | ||
| if flush_event_listener and flush_event_listener in done: | ||
| # Don't re-arm until after we yield a batch, to avoid a spin loop | ||
| # when the event stays set but the batch is empty. | ||
| flush_event_listener = None | ||
| no_block = True | ||
|
Comment on lines
+145
to
+149
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the fundamental difference here between the two events?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure they can, I think they have different lifetimes and the way they get re-armed is different. |
||
| if get_listener in done: | ||
| content = await get_listener | ||
| add_to_batch(content) | ||
|
|
@@ -153,8 +169,13 @@ def add_to_batch(content): | |
| yield batch | ||
| batch = [] | ||
| no_block = False | ||
| # Re-arm the flush listener after yielding | ||
| if flush_event and flush_event_listener is None: | ||
| flush_event_listener = asyncio.ensure_future(flush_event.wait()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't you need to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It gets cleared in |
||
| thaw_event_listener.cancel() | ||
| get_listener.cancel() | ||
| if flush_event_listener: | ||
| flush_event_listener.cancel() | ||
|
|
||
| async def put(self, item): | ||
| """ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,107 @@ | |
| log = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class ArtifactResourceBudget: | ||
| """Tracks aggregate resource consumption of in-flight artifacts. | ||
|
|
||
| Coordinates between `ArtifactDownloader` (acquires budget) and | ||
| `ArtifactSaver` (releases budget) to limit total temporary disk usage | ||
| from downloaded-but-not-yet-saved artifacts. | ||
|
|
||
| This allows higher download concurrency for small artifacts while still | ||
| protecting against disk exhaustion when syncing large artifacts. | ||
|
|
||
| Args: | ||
| max_bytes (int): Maximum total bytes of in-flight downloaded artifacts. | ||
| `None` means no byte limit (only item limit applies). | ||
| max_items (int): Maximum number of in-flight downloaded artifacts. | ||
| `None` means no item limit (only byte limit applies). | ||
| """ | ||
|
|
||
| def __init__(self, max_bytes=None, max_items=None): | ||
| self.max_bytes = max_bytes | ||
| self.max_items = max_items | ||
| self._current_bytes = 0 # total bytes currently acquired and not yet released | ||
| self._current_items = 0 # total items currently acquired and not yet released | ||
| self._available = asyncio.Event() # cleared when budget is exhausted; set on release | ||
| self._available.set() | ||
| self._lock = asyncio.Lock() # serializes acquire checks to prevent races | ||
| # set when acquire is blocked; signals the stage to flush its batch early | ||
| self.pressure = asyncio.Event() | ||
|
|
||
| @classmethod | ||
| def from_settings(cls): | ||
| """Create an `ArtifactResourceBudget` from Django settings, or return `None`. | ||
|
|
||
| Reads `SYNC_MAX_IN_FLIGHT_MB` and `SYNC_MAX_IN_FLIGHT_ITEMS` from settings. | ||
| Falls back to the deprecated `MAX_CONCURRENT_CONTENT` for `max_items` if the | ||
| user set it and `SYNC_MAX_IN_FLIGHT_ITEMS` is not configured. | ||
| Returns `None` if no settings are configured. | ||
| """ | ||
| max_mb = settings.SYNC_MAX_IN_FLIGHT_MB | ||
| max_items = settings.SYNC_MAX_IN_FLIGHT_ITEMS | ||
|
|
||
| # Backward compatibility: honor deprecated MAX_CONCURRENT_CONTENT | ||
| if max_items is None: | ||
| max_items = settings.MAX_CONCURRENT_CONTENT | ||
|
|
||
| if max_mb is None and max_items is None: | ||
| return None | ||
| return cls( | ||
| max_bytes=max_mb * 1024 * 1024 if max_mb is not None else None, | ||
| max_items=max_items, | ||
| ) | ||
|
|
||
| async def acquire(self, nbytes=0): | ||
| """Block until resource budget is available. | ||
|
|
||
| Always allows at least one item through (even if over budget) when nothing | ||
| is currently in flight, to prevent deadlock. | ||
|
|
||
| When the budget is exhausted and `acquire` must wait, the `pressure` event | ||
| is set to signal downstream stages (e.g. `ArtifactSaver`) to flush their | ||
| batches early and free up budget. | ||
|
|
||
| Args: | ||
| nbytes (int): Size in bytes of the artifact(s) to be downloaded. | ||
| When 0 (e.g. because the remote did not advertise a size), the | ||
| artifact is invisible to the byte budget but still counts toward | ||
| the item limit ()`SYNC_MAX_IN_FLIGHT_ITEMS`). | ||
| """ | ||
| while True: | ||
| async with self._lock: | ||
| # Always allow if nothing is in flight (prevents deadlock) | ||
| if self._current_items == 0: | ||
| self._current_bytes += nbytes | ||
| self._current_items += 1 | ||
| return | ||
|
|
||
| bytes_ok = self.max_bytes is None or ( | ||
| self._current_bytes + nbytes <= self.max_bytes | ||
| ) | ||
| items_ok = self.max_items is None or self._current_items < self.max_items | ||
|
|
||
| if bytes_ok and items_ok: | ||
| self._current_bytes += nbytes | ||
| self._current_items += 1 | ||
| return | ||
|
|
||
| self._available.clear() | ||
| self.pressure.set() | ||
| await self._available.wait() | ||
|
|
||
| def release(self, nbytes=0): | ||
| """Release resources after an artifact is saved and its temp file deleted. | ||
|
|
||
| Args: | ||
| nbytes (int): Size in bytes of the artifact that was saved. | ||
| """ | ||
| self._current_bytes = max(0, self._current_bytes - nbytes) | ||
| self._current_items = max(0, self._current_items - 1) | ||
| self.pressure.clear() | ||
| self._available.set() | ||
|
|
||
|
|
||
| def _check_for_forbidden_checksum_type(artifact): | ||
| """Check if content doesn't have forbidden checksum type. | ||
|
|
||
|
|
@@ -122,7 +223,7 @@ class GenericDownloader(Stage): | |
| downloads completed. Since it's a stream the total count isn't known until it's finished. | ||
|
|
||
| This stage drains all available items from `self._in_q` and starts as many concurrent | ||
| downloading tasks as possible, up to the limit defined by ``self.max_concurrent_content``. | ||
| downloading tasks as possible, up to the limit defined by `self.max_concurrent_content`. | ||
|
|
||
| Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `_handle_content_unit`, | ||
| which must be implemented by the subclass, to handle processing the content unit and starting | ||
|
|
@@ -220,28 +321,55 @@ class ArtifactDownloader(GenericDownloader): | |
|
|
||
| Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `self._out_q` after all of | ||
| its [pulpcore.plugin.stages.DeclarativeArtifact][] objects have been handled. | ||
|
|
||
| Args: | ||
| resource_budget (ArtifactResourceBudget): Optional shared resource budget that | ||
| limits total in-flight artifact bytes/items between download and save. | ||
| args: unused positional arguments passed along to | ||
| [pulpcore.plugin.stages.GenericDownloader][]. | ||
| kwargs: unused keyword arguments passed along to | ||
| [pulpcore.plugin.stages.GenericDownloader][]. | ||
| """ | ||
|
|
||
| PROGRESS_REPORTING_MESSAGE = "Downloading Artifacts" | ||
| PROGRESS_REPORTING_CODE = "sync.downloading.artifacts" | ||
|
|
||
| def __init__(self, resource_budget=None, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
| self.resource_budget = resource_budget | ||
|
|
||
| async def _handle_content_unit(self, d_content): | ||
| """Handle one content unit. | ||
|
|
||
| Returns: | ||
| The number of downloads | ||
| """ | ||
| downloaders_for_content = [ | ||
| d_artifact.download() | ||
| d_artifacts_to_download = [ | ||
| d_artifact | ||
| for d_artifact in d_content.d_artifacts | ||
| if d_artifact.artifact._state.adding | ||
| and not d_artifact.deferred_download | ||
| and not d_artifact.artifact.file | ||
| ] | ||
| if downloaders_for_content: | ||
| await asyncio.gather(*downloaders_for_content) | ||
| await self.put(d_content) | ||
| return len(downloaders_for_content) | ||
|
|
||
| budget_bytes = 0 | ||
| if d_artifacts_to_download and self.resource_budget: | ||
| budget_bytes = sum( | ||
| d_artifact.artifact.size or 0 for d_artifact in d_artifacts_to_download | ||
| ) | ||
| await self.resource_budget.acquire(budget_bytes) | ||
|
|
||
| try: | ||
| if d_artifacts_to_download: | ||
| await asyncio.gather(*(da.download() for da in d_artifacts_to_download)) | ||
|
|
||
| await self.put(d_content) | ||
| except BaseException: | ||
| if budget_bytes and self.resource_budget: | ||
| self.resource_budget.release(budget_bytes) | ||
| raise | ||
|
|
||
| return len(d_artifacts_to_download) | ||
|
|
||
|
|
||
| class ArtifactSaver(Stage): | ||
|
|
@@ -259,16 +387,28 @@ class ArtifactSaver(Stage): | |
|
|
||
| This stage drains all available items from `self._in_q` and batches everything into one large | ||
| call to the db for efficiency. | ||
|
|
||
| Args: | ||
| resource_budget (ArtifactResourceBudget): Optional shared resource budget. | ||
| When provided, budget is released after artifacts are saved and temp files deleted. | ||
| args: unused positional arguments passed along to [pulpcore.plugin.stages.Stage][]. | ||
| kwargs: unused keyword arguments passed along to [pulpcore.plugin.stages.Stage][]. | ||
| """ | ||
|
|
||
| def __init__(self, resource_budget=None, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
| self.resource_budget = resource_budget | ||
|
|
||
| async def run(self): | ||
| """ | ||
| The coroutine for this stage. | ||
|
|
||
| Returns: | ||
| The coroutine for this stage. | ||
| """ | ||
| async for batch in self.batches(minsize=settings.MAX_CONCURRENT_CONTENT): | ||
| flush_event = self.resource_budget.pressure if self.resource_budget else None | ||
| minsize = self.resource_budget.max_items if self.resource_budget else 200 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a little weird because, on one hand actually the resource budget ought to manage this completely, and on the other hand it's hard to actually see the downside of setting a reasonably sized (fairly large) batch size by default? It's going to exist anyway (defaults to 500 if you don't set it manually). |
||
| async for batch in self.batches(minsize=minsize, flush_event=flush_event): | ||
| da_to_save = [] | ||
| for d_content in batch: | ||
| for d_artifact in d_content.d_artifacts: | ||
|
|
@@ -291,6 +431,16 @@ async def run(self): | |
| if await aos.path.exists(tmp_file_path): | ||
| await aos.remove(tmp_file_path) | ||
|
|
||
| # Release budget after temp files are cleaned up so the downloader can proceed | ||
| if self.resource_budget: | ||
| for d_content in batch: | ||
| budget_bytes = sum( | ||
| d_artifact.artifact.size or 0 | ||
| for d_artifact in d_content.d_artifacts | ||
| if not d_artifact.deferred_download | ||
| ) | ||
| self.resource_budget.release(budget_bytes) | ||
|
|
||
| for d_content in batch: | ||
| await self.put(d_content) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.