BQ to SquashFS download pipeline with test suite#79
BQ to SquashFS download pipeline with test suite#79mohamedelabbas1996 wants to merge 26 commits into
Conversation
Five-stage pipeline for building a WebDataset from the BigQuery
training_images table on the fir cluster:
1. download - parallel SLURM array job (×10), downloads images from
iNaturalist, verifies with PIL, writes results back to BQ,
packs into per-chunk sqfs files
2. bq_export - streams qualifying images metadata from BQ to CSV
3. split - stratified train/val/test split with occurrence-level
grouping to prevent data leakage
4. webdataset - two implementations: fir-specific NVMe-optimised packer
(create_webdataset.py) and a generic version for
pre-mounted directories (create_webdataset_generic.py)
5. train - ResNet-50 classifier with auto-resume from checkpoint
README documents each stage, the task_0…task_9 sqfs scheme, why two
webdataset implementations exist, and a one-liner to chain the full pipeline.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…hfs error handling Scale issues fixed from real 10M-image run logs: - Retry with exponential backoff + jitter (fixes Errno 16, 429, 503, timeouts) - Per-thread requests.Session to reduce socket churn - BQ write retry (3 attempts, 30s backoff) — previously silent loss on failure - mksquashfs failures now raise RuntimeError so SLURM marks task failed cleanly - Chunk accumulation warning when >20 chunk sqfs files build up in staging New behaviour: - Inline MERGE into training_images after each chunk write via temp table — no separate update job needed; status is current by end of each chunk - --table-prefix flag for testing against test_training_images tables - Per-chunk throughput logging (img/s) to detect throttling Documentation: - Mid-chunk restart behaviour documented in module docstring (re-download cost, no data loss) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Creates test_training_images (50 rows, fetch_status=pending) and test_training_images_downloads (empty) from production table samples. Use --table-prefix test_ with download_images.py to run against these. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
22 tests covering all failure paths identified from real 10M-image run logs: Retry logic (_fetch_with_retry): - 429 rate limit → retries then succeeds - 503 server error → retries then succeeds - ConnectionError Errno 16 (too many sockets) → retries then succeeds - Timeout → retries then succeeds - 404 → raises immediately, no retry - Exhausted retries (connection error / timeout) → raises Download + verify: - Valid JPEG → downloaded, dimensions populated - Network failure → fetch_status=failed - Corrupted/truncated image → fetch_status=corrupted BQ write retry: - Transient BQ error → retries then succeeds - All retries exhausted → raises SquashFS packing: - mksquashfs non-zero exit → RuntimeError (SLURM marks task failed) - Empty staging dir → returns None without calling mksquashfs Inline MERGE: - Only failed results → skips merge entirely - Downloaded/corrupted results → load temp table + MERGE + cleanup - MERGE failure → temp table still deleted (finally block) Accumulation warning: - Below threshold → no warning - At or above threshold → prints WARNING Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
9 new tests covering the task partitioning logic: - No overlap between tasks across 10-way split (100 images) - Full coverage: union of all task subsets == complete image set - Task 3 of 10 receives only photo_ids where id % 10 == 3 - Uneven split (101 images / 10 tasks): sizes differ by at most 1 - Single job (num_jobs=1): task 0 receives all images - Resumability: LEFT JOIN correctly excludes already-attempted images - force_redownload=True: query has no LEFT JOIN - Normal query: LEFT JOIN present with correct MOD clause - --limit N: LIMIT clause present in generated SQL Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
4 fixtures available to all tests under tests/ with no imports:
mock_bq_client — create_autospec(bigquery.Client) with defaults:
query().result() → [], load_table_from_dataframe → None
Rejects typos in method names unlike plain MagicMock
small_df — raw DataFrame: 5 species × 10 images, photo_ids 0-49
small_csv — same written to a CSV file (direct input for split/export)
photo_ids span all 10 tasks, 2 images share each gbif_id,
all species have >= 5 images for min_instances tests
small_sqfs — session-scoped real sqfs with 10 PIL JPEGs built via
mksquashfs; skipped automatically if binary unavailable
sample_sql_file — minimal .sql file for bq_export.py tests
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Each flag now explains: --staging-dir : use scratch not home, inode quota warning, chunk files persist here --num-jobs : must match SLURM --array range, typical value 10 --task-id : set to $SLURM_ARRAY_TASK_ID in array jobs --num-workers : 320 concurrent connections at scale, Errno 16 context --chunk-size : inode impact, one chunk_NNNN.sqfs per chunk --limit : test-only, pair with --table-prefix --force-redownload: when to use it (staging deleted after failed pack) --table-prefix : create test tables first with create_test_tables.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
8 new tests in TestMultiTaskDistributionAndMerge covering how work is split across parallel jobs and how results are merged back: Partitioning: - num_jobs=2: task 0 + task 1 cover all images, no overlap - num_jobs=10: all 10 tasks together cover every image exactly once - task 0 completing does not affect task 1's pending query (disjoint UUIDs) - non-sequential real iNat photo_ids (large ints) partition correctly - empty task (0 images assigned) handled gracefully BQ writes: - both tasks append to downloads table independently (WRITE_APPEND, no conflict) - each task's MERGE touches only its own rows, one temp table per task - after all tasks complete, every image is accounted for Also verified with real 2-task simulation against test BQ tables: - task 0: 26 images (even photo_ids), chunk_0001.sqfs 7.7MB - task 1: 24 images (odd photo_ids), chunk_0001.sqfs 6.1MB - BQ: 50/50 downloaded, 0 pending, 50 rows in downloads table - resumability: re-run both tasks → 0 pending each Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Streams chunk sqfs files as a single continuous tar to stdout for piping
to sqfstar. Processes chunks one at a time (mount → stream → unmount) to
keep peak scratch usage manageable at scale.
Key behaviours tested:
stream_dir_to_tar:
- files added with paths relative to mount dir (bucket/filename.jpg)
- dirs included, only files counted
- multiple bucket dirs all streamed
squashfuse_mount/unmount:
- successful mount returns True
- failed mount returns False + logs ERROR
- unmount calls fusermount -u
main flow:
- empty/missing staging dir → exit 1
- --dry-run lists chunks in sorted order without mounting
- single chunk → valid tar stream
- two chunks → one continuous stream (same tar object, one EOF)
- --delete-after-stream removes each chunk after streaming
- without flag, chunks preserved on disk
error handling:
- failed mount skipped, remaining chunks continue, exit 1
- all mounts fail → exit 1
- chunks always processed in sorted order
Verified with real simulation:
2 tasks × 1 chunk each → 2 merged sqfs files
task_0_test.sqfs: 26 images (7.7MB) | task_1_test.sqfs: 24 images (6.1MB)
Total: 50/50 images ✓
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously BrokenPipeError produced a Python traceback that made it hard to diagnose the root cause in SLURM logs. Now prints a clear message: FATAL: BrokenPipeError — the downstream process (sqfstar) died unexpectedly. This usually means sqfstar was OOM killed. Check sqfstar exit code and increase --mem in the job script. Then exits with code 1 so the SLURM job is correctly marked failed. Root cause context: in the real 10M-image run, sqfstar was OOM killed when merging all 10 tasks globally (~10M files, inode table needs 4-8TB). The BrokenPipeError appeared in every failed pack log alongside exit=137 from sqfstar. Fix was to merge per task (~1M files, 83-101GB RAM) instead. Simulated and confirmed: - Global merge OOM → BrokenPipeError → exit=1 (correct, clear message) - Per-task merge success → 200/200 images across 4 task sqfs files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Failures simulated from actual 10M-image run logs + new scenarios:
Corrupt chunk (invalid sqfs bytes):
- squashfuse rejects it with clear error + retries once
- Skipped, remaining valid chunks continue streaming
- exit=1 so job script detects partial failure
Empty chunk (squashfuse mounts, 0 images found):
- Previously: silent, exit=0 — could mask download failures
- Now: WARNING logged, empty_chunks counter, exit=1
SIGKILL crash (OOM / kill -9):
- Previously: /tmp/sqfs_stream_* dirs leaked on disk
- Now: atexit handler cleans up on any exit including SIGKILL
SIGTERM (SLURM wall-time timeout):
- Previously: unhandled, temp dirs leaked
- Now: signal handler calls cleanup + exits 1
squashfuse mount transient failure:
- Previously: single attempt, permanent failure on transient error
- Now: 1 retry with 5s delay before giving up
squashfuse unmount failure (busy mount):
- Previously: silently ignored via capture_output=True
- Now: 3 retries with 2s delay, warning logged on all failures
- Does not raise — node exit will clean up the mount
sqfstar not in PATH (exit=127):
- BrokenPipeError message now lists all common causes including
"sqfstar not found (exit=127): check module load and PATH"
--delete-after-stream data loss risk:
- Warning printed before starting when flag is active
New summary line: total_images=N errors=M empty_chunks=K
exit=1 if errors>0 OR empty_chunks>0
23 tests total (18 original + 5 new hardening tests)
Verified with real 4-task simulation: 200/200 images across all scenarios
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…dening Renamed for clarity — the script merges sqfs chunks, not just streams to tar. Changes from stream_chunks_to_tar.py: - --delete-after-stream removed entirely: chunks are never deleted by this script. Deletion is the job script's responsibility after verification, preventing all data loss on OOM or other failures. - Full --help with WHAT IT DOES / INPUT / OUTPUT / DISK SPACE / EXIT CODES / TYPICAL USAGE sections including copy-paste SLURM examples - staging_dir argument (was staging_base) with clear description - Log prefix [stream] → [merge], temp dir prefix sqfs_stream_ → sqfs_merge_ - BrokenPipeError message updated: "Chunk files are preserved — fix the cause and re-submit the pack job" (removed the --delete-after-stream caveat) - Empty chunk WARNING now mentions download_images.py as likely cause - SIGTERM handler message updated to [merge] prefix Hardening carried over: squashfuse retry, unmount retry, empty chunk detection, BrokenPipeError exit code 120 fix, SIGTERM cleanup, atexit cleanup Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Renamed test_stream_chunks_to_tar.py → test_merge_sqfs_chunks.py.
Updated references:
- import merge_sqfs_chunks as sct (was stream_chunks_to_tar)
- sys.argv uses merge_sqfs_chunks.py filename
- log assertions check [merge] prefix (was [stream])
Test changes for --delete-after-stream removal:
- test_delete_after_stream_removes_chunk → test_chunks_always_preserved_after_stream
Verifies chunks remain on disk after streaming (deletion is job script's job)
- test_delete_after_stream_warning_printed → test_delete_after_stream_flag_removed
Verifies argparse rejects the removed flag with exit=2
22 tests, all passing.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously failed rows (404, 403, exhausted retries) were written to training_images_downloads as 'failed' but training_images stayed 'pending' permanently. This meant: - 'pending' was ambiguous: "not tried" vs "tried and failed" - retry_failed_downloads.py would waste time retrying 404s Now all three outcomes are merged into training_images: downloaded → fetch_status='downloaded', dims + corrupted populated corrupted → fetch_status='corrupted', corrupted=True, dims NULL failed → fetch_status='failed', all extra fields NULL Permanent failures are now excluded from future re-runs via WHERE fetch_status='pending' without needing the LEFT JOIN check. Retrying is still possible intentionally via retry_failed_downloads.py. 7 new/updated tests in TestMergeChunkIntoTrainingImages: - downloaded, corrupted, failed each trigger merge independently - all three statuses merged in one temp table + MERGE call - failed rows confirmed present in temp table dataframe - temp table cleanup on MERGE failure still works Verified end-to-end against real BQ: - 5 scenarios: clean, corrupt, 404, 403, exhausted retries - all 5 statuses correct in training_images after merge - 0 rows re-queued on re-run (WHERE fetch_status='pending') - 400-image scale test: all downloaded, merged, verified ✓ Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Three-stage merge job replacing the old version from backup branch:
Stage 1 — Count expected images across all chunks (unsquashfs -l)
Stage 2 — Stream chunks → sqfstar → task_N.sqfs
Stage 3 — Verify: ACTUAL == EXPECTED before deleting anything
Chunks only deleted when all three conditions pass:
stream_exit=0, sqfstar_exit=0, image count matches
Safety design:
- Chunks are NEVER deleted on failure — always preserved for retry
- PIPESTATUS captured atomically: PIPE_STATUS=("${PIPESTATUS[@]}")
(assigning PIPESTATUS[0] resets PIPESTATUS — common bash trap)
- References merge_sqfs_chunks.py (renamed from stream_chunks_to_tar)
- Updated --array=0-9 (was 2-9 from old incident where tasks 0+1
had staging files deleted by a failed global pack run)
- Uses merge_sqfs_chunks.py which never deletes chunks itself
Verified with 1000-image simulation:
- Clean merge: verify passes, chunks deleted ✓
- OOM: chunks preserved, re-submit succeeds ✓
- Corrupt chunk: stream exits 1, chunks preserved ✓
- Empty chunk: stream exits 1, chunks preserved ✓
- sqfstar missing: BrokenPipeError caught, chunks preserved ✓
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cleans training_images table in-place by removing 3 duplicate types: 1. Exact duplicate rows (same photo+taxon+gbif) 2. Same photo mapped to multiple taxa — drop strategy by default 3. Same photo+taxon with multiple gbif_ids — keep MIN gbif_id Supports --dry-run, --min-images-per-taxon, --multi-taxon-strategy, and --log-file for JSON output. Operates on any dataset via --dataset. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two layers: - Mock-based: SQL generation, BQ client calls, log arithmetic, main() wiring - DuckDB integration: executes real CTE SQL against in-memory rows, covers all 3 dup types + edge cases (overlapping types, 3+ copies, empty table) No real BQ connection needed — all tests run locally in ~2.5s. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add --dataset CLI arg (default: global_butterflies_2604, backwards compatible) - Handle NULL fetch_status in get_pending_rows and MERGE condition for global_all_leps_2605 - Derive tmp_table dataset from training_table string, not hardcoded BQ_DATASET constant - Add 6 new tests covering dataset routing, NULL query handling, and tmp_table derivation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Pass --dataset global_all_leps_2605 to download_images.py
- Use separate staging dir /scratch/melabbas/global_all_leps_2605/task_{N}/
- Rename job to bq_dl_2605 for squeue clarity
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Concurrent array tasks each MERGE chunk results into training_images; BigQuery aborts colliding DML with "Could not serialize access ... due to concurrent update" (400), which the client does not retry. Observed in production with 8 concurrent tasks (job 43176702_0). Retry only serialization conflicts, up to 10 attempts with jittered exponential backoff (2s base, 60s cap). Other BadRequest errors still raise immediately. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- --dataset flag replaces hardcoded global_butterflies_2604 - sample WHERE includes fetch_status IS NULL (new datasets have no 'downloaded' rows yet) and uses ORDER BY RAND() for unbiased size/distribution estimates - test tables renamed with leading underscore (_test_training_images, _test_training_images_downloads) so test artifacts sort together, separate from production tables; use with --table-prefix _test_ Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…eline Each array task now performs the full lifecycle for its MOD(photo_id, 60) slice — download (BQ-checkpointed 10k chunks) → count-verify → stream-merge to task_N.sqfs → count-verify → upload to Arbutus object store → byte-size verify → only then delete local files. Peak scratch is bounded by the array throttle (%K × ~2× task archive), not the dataset size. Replaces the previous all-at-once design which required the whole dataset on scratch before merging and never propagated failure exit codes (last command was notify, so afterok chaining and FAIL mail were broken). Tasks are idempotent (skip if remote archive exists) and resume-safe (prior chunks stashed to a resume_* subdir to avoid the chunk-numbering overwrite; already-attempted images skipped via the downloads table). Validated in production: jobs 43176702/43176730 (global_all_leps_2605). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR introduces a complete end-to-end pipeline for preparing training datasets: BigQuery exports are deduplicated and filtered, split into train/val/test sets, organized into WebDataset tar shards with class IDs, images are downloaded from cloud storage, verified, and chunked into SquashFS archives, then passed to a ResNet-50 training job. SLURM orchestration scripts chain the stages together. ChangesBigQuery to WebDataset training pipeline
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes The PR introduces a large, multi-component data pipeline with significant heterogeneity: BigQuery SQL generation with three duplicate-resolution strategies, diverse Python modules handling filesystem, cloud storage, and compression operations, bash orchestration scripts, and comprehensive test coverage. Logic density is highest in the download, merge, and WebDataset creation modules, and coordination complexity spans SLURM script integration points and BQ transaction semantics (MERGE with retry/backoff). Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…b_bq_download.sh) The staged per-task lifecycle in job_bq_download.sh now handles download -> merge -> verify -> upload -> delete inline. The standalone pack job is no longer needed for new runs. Local copy kept for reference/repacking old global_butterflies_2604 chunks if needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Required by tests/conftest.py and download_images.py. Missing from dev group caused CI test runner to fail with ModuleNotFoundError. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fixes CI lint failure (pre-commit run on changed files). No logic changes — formatting only. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 13
🧹 Nitpick comments (7)
src/dataset_tools/bq_squashfs/create_test_tables.py (1)
74-79: 💤 Low valueRemove unnecessary f-string prefixes.
Lines 75-79 use f-strings without any placeholders.
Clean up f-strings
- print(f" python download_images.py \\") - print(f" --staging-dir /scratch/$USER/test_download \\") - print(f" --num-jobs 1 --task-id 0 \\") + print(" python download_images.py \\") + print(" --staging-dir /scratch/$USER/test_download \\") + print(" --num-jobs 1 --task-id 0 \\") print(f" --num-workers 8 --chunk-size {args.n_rows} \\") print(f" --limit {args.n_rows} --table-prefix _test_")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/dataset_tools/bq_squashfs/create_test_tables.py` around lines 74 - 79, The print statements in create_test_tables.py use f-strings where there are no placeholders; remove the unnecessary f prefix on the static print lines (the ones printing " python download_images.py \\", "--staging-dir ... \\", "--num-jobs 1 --task-id 0 \\", and the "--num-workers 8 --chunk-size ..." line that does not include {args.n_rows}) so they become normal string literals, but keep the f-string only for the final print that actually uses the {args.n_rows} placeholder; update the print calls around the args.n_rows reference accordingly.src/dataset_tools/bq_squashfs/queries/global_max2000img.sql (1)
1-31: ⚡ Quick winHardcoded project/dataset references in
global_max2000img.sqlandglobal_min25occ.sqllimit reusability.Both query files embed
leps-ai.global_butterflies_2604table references, preventing reuse with other datasets. Sincebq_export.pyexecutes these SQL files as-is without variable substitution, users must manually edit the queries for different datasets. Consider either:
- Adding BigQuery scripting variables (e.g.,
DECLARE project STRING DEFAULT 'leps-ai';) at the top of each query file- Documenting in the query file comments that users should copy and customize for their dataset
- Extending
bq_export.pyto support template variable substitution🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/dataset_tools/bq_squashfs/queries/global_max2000img.sql` around lines 1 - 31, The SQL files hardcode `leps-ai.global_butterflies_2604` making them non-reusable; add BigQuery script variables at the top (e.g., DECLARE project STRING DEFAULT 'leps-ai'; DECLARE dataset STRING DEFAULT 'global_butterflies_2604';) and replace literal table references (`leps-ai.global_butterflies_2604.training_images`, `leps-ai.global_butterflies_2604.inat_taxa`) in the CTE `ranked` with a table name built from those variables via FORMAT/CONCAT and run the query with EXECUTE IMMEDIATE (or document that users must edit the variables), so the query in global_max2000img.sql (and mirror change in global_min25occ.sql) can be reused across datasets.src/dataset_tools/bq_squashfs/bq_export.py (1)
33-93: 💤 Low valueRemove unnecessary f-string prefixes.
Lines 43 and 85 use f-strings without any placeholders.
Clean up f-strings
- log(f"Submitting query ...") + log("Submitting query ...") - log(f"\n=== Export complete ===") + log("\n=== Export complete ===")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/dataset_tools/bq_squashfs/bq_export.py` around lines 33 - 93, In export(), remove unnecessary f-string prefixes for log calls that contain no placeholders: replace log(f"Submitting query ...") and log(f"\n=== Export complete ===") with plain string calls (log("Submitting query ...") and log("\n=== Export complete ===")); also scan other log(...) calls in export to ensure only strings with placeholders use f-strings (e.g., keep f-strings for Query file, Output, Job ID, and formatted stats).src/dataset_tools/bq_squashfs/README.md (2)
8-27: ⚡ Quick winAdd language identifier to fenced code block.
The code block should specify a language for proper syntax highlighting. Since this is a text diagram, use
textorplaintext.📝 Suggested fix
-``` +```text [BigQuery: training_images] │🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/dataset_tools/bq_squashfs/README.md` around lines 8 - 27, The fenced code block in README.md (the ASCII diagram starting with “[BigQuery: training_images]”) lacks a language identifier; update the opening triple-backtick to include a language such as text (e.g., change ``` to ```text) so the diagram is rendered with proper plaintext syntax highlighting.Source: Linters/SAST tools
53-55: ⚡ Quick winAdd language identifier to fenced code block.
Specify the language for proper rendering. Use
pythonortextdepending on the intended representation.📝 Suggested fix
-``` +```python task_id = photo_id % 10</details> <details> <summary>🤖 Prompt for AI Agents</summary>Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.In
@src/dataset_tools/bq_squashfs/README.mdaround lines 53 - 55, The fenced
code block containing "task_id = photo_id % 10" in README.md lacks a language
identifier; update the opening fence fromtopython (or ```text if
intended otherwise) so the snippet is rendered with proper syntax highlighting
and clarity for the expression involving task_id and photo_id.</details> <!-- cr-comment:v1:f5b946516ae1296fd08e11f7 --> _Source: Linters/SAST tools_ </blockquote></details> <details> <summary>tests/dataset_tools/test_download_images.py (1)</summary><blockquote> `484-494`: _⚡ Quick win_ **Remove dead code with closure issue.** Lines 484-491 construct a complex MagicMock with lambda functions that have a closure bug (they don't bind the loop variable `row` correctly). This code is replaced by line 494, which correctly returns `matching` directly. Remove the dead code to avoid confusion. <details> <summary>🗑️ Proposed cleanup</summary> ```diff client = MagicMock() - client.query.return_value.result.return_value = [ - MagicMock( - **{k: v for k, v in row.items()}, - **{ - "__iter__": lambda self: iter(row.items()), - "keys": lambda self: row.keys(), - }, - ) - for row in matching - ] - # Simpler: just return dicts directly via side_effect client.query.return_value.result.return_value = matching return client ``` </details> <details> <summary>🤖 Prompt for AI Agents</summary> ``` Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/dataset_tools/test_download_images.py` around lines 484 - 494, Remove the unused MagicMock construction that iterates over matching (the block creating MagicMock(**{...}, **{"__iter__": ..., "keys": ...}) for row in matching) because it’s dead and contains a closure bug; keep the simpler line that sets client.query.return_value.result.return_value = matching. Edit the test in test_download_images.py to delete the MagicMock list comprehension and any references to that variable so the test only uses the direct matching return via client.query.return_value.result.return_value. ``` </details> <!-- cr-comment:v1:f276aee41534c5bf7f35886a --> _Source: Linters/SAST tools_ </blockquote></details> <details> <summary>tests/dataset_tools/test_merge_sqfs_chunks.py (1)</summary><blockquote> `186-187`: _⚡ Quick win_ **Replace ambiguous variable name `l`.** The single-character variable `l` is difficult to distinguish from the digit `1` or uppercase `I`. Use `line` for clarity. <details> <summary>📝 Suggested fix</summary> ```diff - lines = [l for l in capsys.readouterr().out.strip().splitlines() if l] - names = [Path(l).name for l in lines] + lines = [line for line in capsys.readouterr().out.strip().splitlines() if line] + names = [Path(line).name for line in lines] assert names == ["chunk_0001.sqfs", "chunk_0002.sqfs", "chunk_0003.sqfs"] ``` </details> <details> <summary>🤖 Prompt for AI Agents</summary> ``` Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/dataset_tools/test_merge_sqfs_chunks.py` around lines 186 - 187, Replace the ambiguous single-character loop variable `l` in the list comprehension that collects output lines and computes `names` with a clearer identifier (e.g., `line`): change the comprehension "lines = [l for l in capsys.readouterr().out.strip().splitlines() if l]" and the next line "names = [Path(l).name for l in lines]" to use `line` instead of `l` so they read "lines = [line for line in capsys.readouterr().out.strip().splitlines() if line]" and "names = [Path(line).name for line in lines]". ``` </details> <!-- cr-comment:v1:6589979e4df04e8a4f938e60 --> _Source: Linters/SAST tools_ </blockquote></details> </blockquote></details> <details> <summary>🤖 Prompt for all review comments with AI agents</summary>Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.Inline comments:
In@pyproject.toml:
- Around line 56-57: The two BigQuery packages (google-cloud-bigquery and
pandas-gbq) are currently listed under the dev dependency group but are imported
by production scripts (clean.py, bq_export.py, create_test_table.py,
create_test_tables.py); move the entries for google-cloud-bigquery = "^3.0" and
pandas-gbq = "^0.19" from [tool.poetry.group.dev.dependencies] into the main
[tool.poetry.dependencies] section, remove them from the dev group, then update
the lockfile / reinstall (poetry lock && poetry install) so production installs
include these libraries.In
@scripts/job_bq_download.sh:
- Line 67: The shell invocation uses "set -uo pipefail" which omits "-e" so the
script won't exit on command failures; update the set command to include "-e"
(i.e., "set -euo pipefail") so any failing command causes an immediate exit,
ensuring staged failure handling works (this affects commands such as mkdir -p
"${TASK_DIR}" and downstream steps); locate the top-of-script invocation "set
-uo pipefail" and change it to include "-e", and run a quick smoke test to
verify failures now abort the script as expected.- Line 105: The script currently runs the command "cd
/project/6068129/melabbas/ami-ml" without checking failure; add error handling
immediately after that cd (or enable safe-fail behavior earlier) so the script
aborts with a clear error message if the directory change fails. Specifically,
detect the cd failure (non-zero exit status) and print a descriptive error and
exit non-zero (or wrap the cd with a guard) so subsequent Python/module commands
do not run in the wrong directory.In
@scripts/job_bq_webdataset.sh:
- Line 87: The script blindly runs the directory change command "cd
/project/6068129/melabbas/ami-ml" which can fail and let the script continue in
the wrong directory; update the script to check the cd exit status and abort on
failure (e.g. test the result and call exit with a clear error message) or
enable safe-fail behavior (set -e) before that command so subsequent
Python/module commands don't run if the cd fails.- Line 43: The echo references an undefined dead variable CSV_PATH in
scripts/job_bq_webdataset.sh; remove the dead reference or replace it with a
valid variable (e.g., one of the split CSV vars used later) to avoid
unset-variable errors. Locate the echo line that mentions CSV_PATH and either
delete that line or change it to echo an existing variable used by the script
(such as the split CSV array/name) so the script no longer prints an undefined
variable.In
@src/dataset_tools/bq_squashfs/bq_export.py:
- Around line 29-30: The helper function log currently defined as log(msg: str)
-> None is incompatible with calls that pass file=sys.stderr; update the log
function (symbol: log) to accept an optional file parameter (e.g., def log(msg:
str, file: Optional[IO[str]] = sys.stdout) -> None) or accept **kwargs/*args and
forward them to print so calls that pass file=sys.stderr work; ensure the
implementation calls print(msg, file=file, flush=True) and keep the type
hint/import for sys if needed.In
@src/dataset_tools/bq_squashfs/create_webdataset.py:
- Around line 329-365: The shared rng = random.Random(seed) is captured by
pack_one and used concurrently by ThreadPoolExecutor, which is thread-unsafe;
change to create a per-shard RNG inside pack_one (e.g., instantiate rng =
random.Random(seed + shard_id) at the start of pack_one) and then call
rng.shuffle(keys) so each worker has its own deterministic RNG; remove the outer
rng variable and apply the same change in the analogous function in
create_webdataset_generic.py (use shard_id to derive the per-shard seed).- Around line 285-288: The log calls use len(all_paths) which fails when
all_paths is a generator (limit>0); fix by computing a total_paths value when
all_paths is created and use that in the log: when you materialize a list (the
code path that currently makes all_paths a list) set total_paths =
len(all_paths), otherwise set total_paths = None (or a placeholder like "?" or
"unknown"), then replace len(all_paths) in the logging line with total_paths
(formatted if not None, otherwise the placeholder). Refer to the all_paths
variable and the logging block that references written to implement this guarded
total_paths usage.In
@src/dataset_tools/bq_squashfs/download_images.py:
- Line 616: The print call print(f" Staging cleared (chunk sqfs kept)",
flush=True) uses an unnecessary f-string; change it to a plain string by
removing the leading 'f' so it reads print(" Staging cleared (chunk sqfs
kept)", flush=True). Update this occurrence in the same scope where the print is
located (search for the exact print(...) statement) and ensure no other
identical extraneous f-prefixes remain in the surrounding function or block.- Around line 104-119: The response returned by session.get(..., stream=True)
isn't closed when a retryable status is detected, risking connection pool leaks;
in the retry branch (the block that checks resp.status_code in _RETRY_STATUSES
and attempt < _MAX_RETRIES) call resp.close() before sleeping/continuing, and
ensure any early-exit paths (e.g., after resp.raise_for_status() failures) also
close resp or use a context manager for the successful download. Update the
logic around session.get, resp, _RETRY_STATUSES, and the retry loop to close
resp on retry and on error to avoid leaking connections.In
@src/dataset_tools/bq_squashfs/split.py:
- Around line 168-172: The derived caps for val/test can truncate to zero (e.g.,
int(max_instances * val_frac) == 0) causing empty splits; change the derived
caps passed to _subsample so they use at least 1 when the fraction is > 0 (e.g.,
val_cap = 1 if val_frac > 0 and int(max_instances * val_frac) == 0 else
int(...)), then call _subsample(train_set, max_instances, category_key),
_subsample(val_set, val_cap, category_key), and _subsample(test_set, test_cap,
category_key); update the print message if needed to reflect the adjusted caps.In
@tests/dataset_tools/test_clean.py:
- Around line 18-20: Remove the duplicate import of pytest: keep a single
"import pytest" statement and delete the redundant one so the module only
imports pytest once (search for the duplicate "import pytest" lines in
tests/dataset_tools/test_clean.py and remove the extra occurrence).In
@tests/dataset_tools/test_merge_sqfs_chunks.py:
- Around line 196-198: The _run_stream function uses a mutable default
extra_args: list[str] = [], which can lead to shared-state bugs; change the
signature to accept extra_args: list[str] | None = None (or extra_args:
Optional[list[str]] = None) and inside _run_stream set extra_args = extra_args
or [] (or if extra_args is None: extra_args = []) before using it so each call
gets a fresh list; update any type annotations/imports as needed to reference
the new optional type while keeping the rest of the function unchanged.
Nitpick comments:
In@src/dataset_tools/bq_squashfs/bq_export.py:
- Around line 33-93: In export(), remove unnecessary f-string prefixes for log
calls that contain no placeholders: replace log(f"Submitting query ...") and
log(f"\n=== Export complete ===") with plain string calls (log("Submitting query
...") and log("\n=== Export complete ===")); also scan other log(...) calls in
export to ensure only strings with placeholders use f-strings (e.g., keep
f-strings for Query file, Output, Job ID, and formatted stats).In
@src/dataset_tools/bq_squashfs/create_test_tables.py:
- Around line 74-79: The print statements in create_test_tables.py use f-strings
where there are no placeholders; remove the unnecessary f prefix on the static
print lines (the ones printing " python download_images.py \", "--staging-dir
... \", "--num-jobs 1 --task-id 0 \", and the "--num-workers 8 --chunk-size
..." line that does not include {args.n_rows}) so they become normal string
literals, but keep the f-string only for the final print that actually uses the
{args.n_rows} placeholder; update the print calls around the args.n_rows
reference accordingly.In
@src/dataset_tools/bq_squashfs/queries/global_max2000img.sql:
- Around line 1-31: The SQL files hardcode
leps-ai.global_butterflies_2604
making them non-reusable; add BigQuery script variables at the top (e.g.,
DECLARE project STRING DEFAULT 'leps-ai'; DECLARE dataset STRING DEFAULT
'global_butterflies_2604';) and replace literal table references
(leps-ai.global_butterflies_2604.training_images,
leps-ai.global_butterflies_2604.inat_taxa) in the CTErankedwith a table
name built from those variables via FORMAT/CONCAT and run the query with EXECUTE
IMMEDIATE (or document that users must edit the variables), so the query in
global_max2000img.sql (and mirror change in global_min25occ.sql) can be reused
across datasets.In
@src/dataset_tools/bq_squashfs/README.md:
- Around line 8-27: The fenced code block in README.md (the ASCII diagram
starting with “[BigQuery: training_images]”) lacks a language identifier; update
the opening triple-backtick to include a language such as text (e.g., changetotext) so the diagram is rendered with proper plaintext syntax
highlighting.- Around line 53-55: The fenced code block containing "task_id = photo_id % 10"
in README.md lacks a language identifier; update the opening fence fromtopython (or ```text if intended otherwise) so the snippet is rendered with
proper syntax highlighting and clarity for the expression involving task_id and
photo_id.In
@tests/dataset_tools/test_download_images.py:
- Around line 484-494: Remove the unused MagicMock construction that iterates
over matching (the block creating MagicMock(**{...}, **{"iter": ..., "keys":
...}) for row in matching) because it’s dead and contains a closure bug; keep
the simpler line that sets client.query.return_value.result.return_value =
matching. Edit the test in test_download_images.py to delete the MagicMock list
comprehension and any references to that variable so the test only uses the
direct matching return via client.query.return_value.result.return_value.In
@tests/dataset_tools/test_merge_sqfs_chunks.py:
- Around line 186-187: Replace the ambiguous single-character loop variable
l
in the list comprehension that collects output lines and computesnameswith a
clearer identifier (e.g.,line): change the comprehension "lines = [l for l in
capsys.readouterr().out.strip().splitlines() if l]" and the next line "names =
[Path(l).name for l in lines]" to uselineinstead oflso they read "lines
= [line for line in capsys.readouterr().out.strip().splitlines() if line]" and
"names = [Path(line).name for line in lines]".</details> <details> <summary>🪄 Autofix (Beta)</summary> Fix all unresolved CodeRabbit comments on this PR: - [ ] <!-- {"checkboxId": "4b0d0e0a-96d7-4f10-b296-3a18ea78f0b9"} --> Push a commit to this branch (recommended) - [ ] <!-- {"checkboxId": "ff5b1114-7d8c-49e6-8ac1-43f82af23a33"} --> Create a new PR with the fixes </details> --- <details> <summary>ℹ️ Review info</summary> <details> <summary>⚙️ Run configuration</summary> **Configuration used**: defaults **Review profile**: CHILL **Plan**: Pro **Run ID**: `35fa1c3c-417a-47e9-a29c-b0b4918430c8` </details> <details> <summary>📥 Commits</summary> Reviewing files that changed from the base of the PR and between 0c2e9816ce7094c026557cba6eaa31d5366abc9d and caa04de9737f20412f93f08f1e45f6fa83301561. </details> <details> <summary>📒 Files selected for processing (23)</summary> * `.gitignore` * `pyproject.toml` * `scripts/job_bq_download.sh` * `scripts/job_bq_export.sh` * `scripts/job_bq_split.sh` * `scripts/job_bq_train.sh` * `scripts/job_bq_webdataset.sh` * `src/dataset_tools/bigquery_pipeline/clean.py` * `src/dataset_tools/bigquery_pipeline/create_test_table.py` * `src/dataset_tools/bq_squashfs/README.md` * `src/dataset_tools/bq_squashfs/bq_export.py` * `src/dataset_tools/bq_squashfs/create_test_tables.py` * `src/dataset_tools/bq_squashfs/create_webdataset.py` * `src/dataset_tools/bq_squashfs/create_webdataset_generic.py` * `src/dataset_tools/bq_squashfs/download_images.py` * `src/dataset_tools/bq_squashfs/merge_sqfs_chunks.py` * `src/dataset_tools/bq_squashfs/queries/global_max2000img.sql` * `src/dataset_tools/bq_squashfs/queries/global_min25occ.sql` * `src/dataset_tools/bq_squashfs/split.py` * `tests/conftest.py` * `tests/dataset_tools/test_clean.py` * `tests/dataset_tools/test_download_images.py` * `tests/dataset_tools/test_merge_sqfs_chunks.py` </details> </details> <!-- This is an auto-generated comment by CodeRabbit for review status -->
| google-cloud-bigquery = "^3.0" | ||
| pandas-gbq = "^0.19" |
There was a problem hiding this comment.
Move BigQuery dependencies to main dependencies section.
The google-cloud-bigquery and pandas-gbq libraries are imported by production pipeline scripts (clean.py, bq_export.py, create_test_table.py, create_test_tables.py), not just dev/test tools. Placing them in [tool.poetry.group.dev.dependencies] means they won't be installed in production environments, causing import failures.
📦 Move to main dependencies
[tool.poetry.dependencies]
python = "^3.9"
click = "^8.1.6"
numpy = "^1.25.1"
torch = "^2.0,!=2.0.1"
tqdm = "^4.65.0"
segment-anything = "^1.0"
torchvision = "^0.15.1,!=0.15.2"
torchmetrics = "^0.11.4"
pillow = "^10.0.0"
matplotlib = "^3.7.2"
python-dwca-reader = "^0.15.1"
pandas = "^2.1.0"
scikit-learn = "^1.3.0"
webdataset = "^0.2.48"
timm = "^0.9.8"
typer = "^0.12.3"
python-dotenv = "^1.0.1"
wandb = "^0.17.5"
pygbif = "^0.6.5"
opencv-python = "^4.11.0.86"
+google-cloud-bigquery = "^3.0"
+pandas-gbq = "^0.19"
[tool.poetry.group.research]
optional = true
[tool.poetry.group.research.dependencies]
awscli = "^1.33.44"
awscli-plugin-endpoint = "^0.4"
absl-py = "^2.1.0"
[tool.poetry.group.dev.dependencies]
pre-commit = "^3.3.3"
black = "^23.7.0"
flake8 = "^6.0.0"
Flake8-pyproject = "^1.2.3"
isort = "^5.12.0"
mypy = "^1.4.1"
ipdb = "^0.13.13"
python-devtools = "^2"
ipykernel = "^6.29.4"
pytest = "^8.1.1"
-google-cloud-bigquery = "^3.0"
-pandas-gbq = "^0.19"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| google-cloud-bigquery = "^3.0" | |
| pandas-gbq = "^0.19" | |
| [tool.poetry.group.research] | |
| optional = true | |
| [tool.poetry.group.research.dependencies] | |
| awscli = "^1.33.44" | |
| awscli-plugin-endpoint = "^0.4" | |
| absl-py = "^2.1.0" | |
| [tool.poetry.group.dev.dependencies] | |
| pre-commit = "^3.3.3" | |
| black = "^23.7.0" | |
| flake8 = "^6.0.0" | |
| Flake8-pyproject = "^1.2.3" | |
| isort = "^5.12.0" | |
| mypy = "^1.4.1" | |
| ipdb = "^0.13.13" | |
| python-devtools = "^2" | |
| ipykernel = "^6.29.4" | |
| pytest = "^8.1.1" |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pyproject.toml` around lines 56 - 57, The two BigQuery packages
(google-cloud-bigquery and pandas-gbq) are currently listed under the dev
dependency group but are imported by production scripts (clean.py, bq_export.py,
create_test_table.py, create_test_tables.py); move the entries for
google-cloud-bigquery = "^3.0" and pandas-gbq = "^0.19" from
[tool.poetry.group.dev.dependencies] into the main [tool.poetry.dependencies]
section, remove them from the dev group, then update the lockfile / reinstall
(poetry lock && poetry install) so production installs include these libraries.
| #SBATCH --mail-type=END,FAIL | ||
| #SBATCH --mail-user=hack1996man@gmail.com | ||
|
|
||
| set -uo pipefail |
There was a problem hiding this comment.
Critical: Missing -e flag in set command breaks error handling.
The script uses set -uo pipefail but is missing the -e flag that causes the shell to exit immediately on any non-zero exit status. Without -e, errors in commands can be silently ignored, undermining the staged failure handling and potentially corrupting data or wasting resources.
For example, if mkdir -p "${TASK_DIR}" on line 93 fails, the script would continue and try to operate on a nonexistent directory.
🔧 Proposed fix
-set -uo pipefail
+set -euo pipefail📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| set -uo pipefail | |
| set -euo pipefail |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@scripts/job_bq_download.sh` at line 67, The shell invocation uses "set -uo
pipefail" which omits "-e" so the script won't exit on command failures; update
the set command to include "-e" (i.e., "set -euo pipefail") so any failing
command causes an immediate exit, ensuring staged failure handling works (this
affects commands such as mkdir -p "${TASK_DIR}" and downstream steps); locate
the top-of-script invocation "set -uo pipefail" and change it to include "-e",
and run a quick smoke test to verify failures now abort the script as expected.
| echo "Resume: moved $(ls ${RESUME_DIR} | wc -l) existing chunks to ${RESUME_DIR}" | ||
| fi | ||
|
|
||
| cd /project/6068129/melabbas/ami-ml |
There was a problem hiding this comment.
Add error handling for directory change.
If cd fails, the script will continue executing in the wrong directory, causing module loading and Python invocation to fail in confusing ways.
🛡️ Proposed fix
-cd /project/6068129/melabbas/ami-ml
+cd /project/6068129/melabbas/ami-ml || exit 1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| cd /project/6068129/melabbas/ami-ml | |
| cd /project/6068129/melabbas/ami-ml || exit 1 |
🧰 Tools
🪛 Shellcheck (0.11.0)
[warning] 105-105: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.
(SC2164)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@scripts/job_bq_download.sh` at line 105, The script currently runs the
command "cd /project/6068129/melabbas/ami-ml" without checking failure; add
error handling immediately after that cd (or enable safe-fail behavior earlier)
so the script aborts with a clear error message if the directory change fails.
Specifically, detect the cd failure (non-zero exit status) and print a
descriptive error and exit non-zero (or wrap the cd with a guard) so subsequent
Python/module commands do not run in the wrong directory.
| echo "=== build_wds_global started at $(date) ===" | ||
| echo "Node: $(hostname)" | ||
| echo "NVMe: ${NVME} ($(df -h ${NVME} | tail -1 | awk '{print $4}') free)" | ||
| echo "CSV: ${CSV_PATH} ($(du -sh ${CSV_PATH} | cut -f1))" |
There was a problem hiding this comment.
Undefined variable CSV_PATH referenced but never set.
Line 43 echoes ${CSV_PATH} but the variable is never defined in the script. This will either print an empty value or cause an error with set -u if enabled. Based on the script logic (lines 46-52 validate split CSVs, not CSV_PATH), this appears to be dead code from an earlier version.
🐛 Proposed fix: remove the dead reference
echo "=== build_wds_global started at $(date) ==="
echo "Node: $(hostname)"
echo "NVMe: ${NVME} ($(df -h ${NVME} | tail -1 | awk '{print $4}') free)"
-echo "CSV: ${CSV_PATH} ($(du -sh ${CSV_PATH} | cut -f1))"
echo ""📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| echo "CSV: ${CSV_PATH} ($(du -sh ${CSV_PATH} | cut -f1))" | |
| echo "=== build_wds_global started at $(date) ===" | |
| echo "Node: $(hostname)" | |
| echo "NVMe: ${NVME} ($(df -h ${NVME} | tail -1 | awk '{print $4}') free)" | |
| echo "" |
🧰 Tools
🪛 Shellcheck (0.11.0)
[info] 43-43: Double quote to prevent globbing and word splitting.
(SC2086)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@scripts/job_bq_webdataset.sh` at line 43, The echo references an undefined
dead variable CSV_PATH in scripts/job_bq_webdataset.sh; remove the dead
reference or replace it with a valid variable (e.g., one of the split CSV vars
used later) to avoid unset-variable errors. Locate the echo line that mentions
CSV_PATH and either delete that line or change it to echo an existing variable
used by the script (such as the split CSV array/name) so the script no longer
prints an undefined variable.
| echo "" | ||
|
|
||
| # ── Python environment ──────────────────────────────────────────────────────── | ||
| cd /project/6068129/melabbas/ami-ml |
There was a problem hiding this comment.
Add error handling for directory change.
If cd fails, the script will continue executing in the wrong directory, causing Python and module commands to fail in confusing ways.
🛡️ Proposed fix
-cd /project/6068129/melabbas/ami-ml
+cd /project/6068129/melabbas/ami-ml || exit 1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| cd /project/6068129/melabbas/ami-ml | |
| cd /project/6068129/melabbas/ami-ml || exit 1 |
🧰 Tools
🪛 Shellcheck (0.11.0)
[warning] 87-87: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.
(SC2164)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@scripts/job_bq_webdataset.sh` at line 87, The script blindly runs the
directory change command "cd /project/6068129/melabbas/ami-ml" which can fail
and let the script continue in the wrong directory; update the script to check
the cd exit status and abort on failure (e.g. test the result and call exit with
a clear error message) or enable safe-fail behavior (set -e) before that command
so subsequent Python/module commands don't run if the cd fails.
| resp = session.get(url, timeout=30, stream=True) | ||
| if resp.status_code in _RETRY_STATUSES and attempt < _MAX_RETRIES: | ||
| delay = min(_BACKOFF_BASE * (2**attempt), _BACKOFF_MAX) | ||
| delay += random.uniform(0, delay * 0.25) | ||
| print( | ||
| f" HTTP {resp.status_code} {url} — retry {attempt+1}/{_MAX_RETRIES} " | ||
| f"in {delay:.1f}s", | ||
| flush=True, | ||
| ) | ||
| time.sleep(delay) | ||
| continue | ||
| resp.raise_for_status() | ||
| with open(dest, "wb") as f: | ||
| for chunk in resp.iter_content(chunk_size=8192): | ||
| f.write(chunk) | ||
| return |
There was a problem hiding this comment.
Close response before retry to avoid connection leaks.
When stream=True and a retryable status is returned, the response body is not consumed or closed before sleeping and continuing. This can exhaust the connection pool over many retries.
Proposed fix
resp = session.get(url, timeout=30, stream=True)
if resp.status_code in _RETRY_STATUSES and attempt < _MAX_RETRIES:
+ resp.close()
delay = min(_BACKOFF_BASE * (2**attempt), _BACKOFF_MAX)
delay += random.uniform(0, delay * 0.25)
print(📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| resp = session.get(url, timeout=30, stream=True) | |
| if resp.status_code in _RETRY_STATUSES and attempt < _MAX_RETRIES: | |
| delay = min(_BACKOFF_BASE * (2**attempt), _BACKOFF_MAX) | |
| delay += random.uniform(0, delay * 0.25) | |
| print( | |
| f" HTTP {resp.status_code} {url} — retry {attempt+1}/{_MAX_RETRIES} " | |
| f"in {delay:.1f}s", | |
| flush=True, | |
| ) | |
| time.sleep(delay) | |
| continue | |
| resp.raise_for_status() | |
| with open(dest, "wb") as f: | |
| for chunk in resp.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return | |
| resp = session.get(url, timeout=30, stream=True) | |
| if resp.status_code in _RETRY_STATUSES and attempt < _MAX_RETRIES: | |
| resp.close() | |
| delay = min(_BACKOFF_BASE * (2**attempt), _BACKOFF_MAX) | |
| delay += random.uniform(0, delay * 0.25) | |
| print( | |
| f" HTTP {resp.status_code} {url} — retry {attempt+1}/{_MAX_RETRIES} " | |
| f"in {delay:.1f}s", | |
| flush=True, | |
| ) | |
| time.sleep(delay) | |
| continue | |
| resp.raise_for_status() | |
| with open(dest, "wb") as f: | |
| for chunk in resp.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return |
🧰 Tools
🪛 ast-grep (0.43.0)
[info] 106-106: use secrets package over random package
Context: random.uniform(0, delay * 0.25)
Note: [CWE-330].
(avoid-random-python)
🪛 Ruff (0.15.15)
[error] 107-107: Standard pseudo-random generators are not suitable for cryptographic purposes
(S311)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/dataset_tools/bq_squashfs/download_images.py` around lines 104 - 119, The
response returned by session.get(..., stream=True) isn't closed when a retryable
status is detected, risking connection pool leaks; in the retry branch (the
block that checks resp.status_code in _RETRY_STATUSES and attempt <
_MAX_RETRIES) call resp.close() before sleeping/continuing, and ensure any
early-exit paths (e.g., after resp.raise_for_status() failures) also close resp
or use a context manager for the successful download. Update the logic around
session.get, resp, _RETRY_STATUSES, and the retry loop to close resp on retry
and on error to avoid leaking connections.
| pack_chunk_to_sqfs(staging_dir, chunk_num, num_workers=4) | ||
| clear_staging(staging_dir) | ||
| warn_chunk_accumulation(staging_dir) | ||
| print(f" Staging cleared (chunk sqfs kept)", flush=True) |
There was a problem hiding this comment.
Remove extraneous f prefix from string literal.
This f-string has no placeholders.
Proposed fix
- print(f" Staging cleared (chunk sqfs kept)", flush=True)
+ print(" Staging cleared (chunk sqfs kept)", flush=True)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| print(f" Staging cleared (chunk sqfs kept)", flush=True) | |
| print(" Staging cleared (chunk sqfs kept)", flush=True) |
🧰 Tools
🪛 Ruff (0.15.15)
[error] 616-616: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/dataset_tools/bq_squashfs/download_images.py` at line 616, The print call
print(f" Staging cleared (chunk sqfs kept)", flush=True) uses an unnecessary
f-string; change it to a plain string by removing the leading 'f' so it reads
print(" Staging cleared (chunk sqfs kept)", flush=True). Update this occurrence
in the same scope where the print is located (search for the exact print(...)
statement) and ensure no other identical extraneous f-prefixes remain in the
surrounding function or block.
Source: Linters/SAST tools
| if max_instances > 0: | ||
| print(f"Capping at max_instances={max_instances} per category ...") | ||
| train_set = _subsample(train_set, max_instances, category_key) | ||
| val_set = _subsample(val_set, int(max_instances * val_frac), category_key) | ||
| test_set = _subsample(test_set, int(max_instances * test_frac), category_key) |
There was a problem hiding this comment.
Derived max_instances cap can become zero, emptying val/test splits.
When max_instances is small relative to val_frac/test_frac, the derived cap truncates to zero. For example, --max-instances 5 --val-frac 0.1 yields int(5 * 0.1) = 0, causing _subsample to produce an empty DataFrame for that split.
🐛 Proposed fix to ensure non-zero caps
if max_instances > 0:
print(f"Capping at max_instances={max_instances} per category ...")
train_set = _subsample(train_set, max_instances, category_key)
- val_set = _subsample(val_set, int(max_instances * val_frac), category_key)
- test_set = _subsample(test_set, int(max_instances * test_frac), category_key)
+ val_set = _subsample(val_set, max(1, int(max_instances * val_frac)), category_key)
+ test_set = _subsample(test_set, max(1, int(max_instances * test_frac)), category_key)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if max_instances > 0: | |
| print(f"Capping at max_instances={max_instances} per category ...") | |
| train_set = _subsample(train_set, max_instances, category_key) | |
| val_set = _subsample(val_set, int(max_instances * val_frac), category_key) | |
| test_set = _subsample(test_set, int(max_instances * test_frac), category_key) | |
| if max_instances > 0: | |
| print(f"Capping at max_instances={max_instances} per category ...") | |
| train_set = _subsample(train_set, max_instances, category_key) | |
| val_set = _subsample(val_set, max(1, int(max_instances * val_frac)), category_key) | |
| test_set = _subsample(test_set, max(1, int(max_instances * test_frac)), category_key) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/dataset_tools/bq_squashfs/split.py` around lines 168 - 172, The derived
caps for val/test can truncate to zero (e.g., int(max_instances * val_frac) ==
0) causing empty splits; change the derived caps passed to _subsample so they
use at least 1 when the fraction is > 0 (e.g., val_cap = 1 if val_frac > 0 and
int(max_instances * val_frac) == 0 else int(...)), then call
_subsample(train_set, max_instances, category_key), _subsample(val_set, val_cap,
category_key), and _subsample(test_set, test_cap, category_key); update the
print message if needed to reflect the adjusted caps.
| import pytest | ||
|
|
||
| import pytest |
There was a problem hiding this comment.
Remove duplicate import.
pytest is imported twice on lines 18 and 20.
🔧 Proposed fix
import pandas as pd
import pytest
-import pytest
-
import src.dataset_tools.bigquery_pipeline.clean as clean📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import pytest | |
| import pytest | |
| import pandas as pd | |
| import pytest | |
| import src.dataset_tools.bigquery_pipeline.clean as clean |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/dataset_tools/test_clean.py` around lines 18 - 20, Remove the duplicate
import of pytest: keep a single "import pytest" statement and delete the
redundant one so the module only imports pytest once (search for the duplicate
"import pytest" lines in tests/dataset_tools/test_clean.py and remove the extra
occurrence).
| def _run_stream( | ||
| self, staging: Path, extra_args: list[str] = [] | ||
| ) -> tuple[bytes, str]: |
There was a problem hiding this comment.
Fix mutable default argument.
Using [] as a default argument creates a single shared list across all calls. If any caller mutates extra_args, it will affect subsequent calls.
🐛 Proposed fix
def _run_stream(
- self, staging: Path, extra_args: list[str] = []
+ self, staging: Path, extra_args: list[str] | None = None
) -> tuple[bytes, str]:
"""Run main(), capture stdout bytes and stderr text."""
+ if extra_args is None:
+ extra_args = []
stdout_buf = io.BytesIO()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _run_stream( | |
| self, staging: Path, extra_args: list[str] = [] | |
| ) -> tuple[bytes, str]: | |
| def _run_stream( | |
| self, staging: Path, extra_args: list[str] | None = None | |
| ) -> tuple[bytes, str]: | |
| """Run main(), capture stdout bytes and stderr text.""" | |
| if extra_args is None: | |
| extra_args = [] |
🧰 Tools
🪛 Ruff (0.15.15)
[warning] 197-197: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/dataset_tools/test_merge_sqfs_chunks.py` around lines 196 - 198, The
_run_stream function uses a mutable default extra_args: list[str] = [], which
can lead to shared-state bugs; change the signature to accept extra_args:
list[str] | None = None (or extra_args: Optional[list[str]] = None) and inside
_run_stream set extra_args = extra_args or [] (or if extra_args is None:
extra_args = []) before using it so each call gets a fresh list; update any type
annotations/imports as needed to reference the new optional type while keeping
the rest of the function unchanged.
Source: Linters/SAST tools
Summary
Adds a production-ready pipeline for downloading the leps-ai BigQuery training
image corpus to SquashFS archives on the Compute Canada fir cluster. Validated
on global_all_leps_2605 (23.8M images, ~27 TB).
Each SLURM array task runs a full lifecycle: download images from BQ, pack into
SquashFS chunks, merge into a single task archive, upload to Arbutus object
storage, verify, and delete local files. Peak scratch usage is bounded by the
array concurrency throttle rather than the full dataset size. Jobs are
idempotent and resume-safe.
List of changes
download_images.py
merge_sqfs_chunks.py
job_bq_download.sh
Other pipeline scripts
Tests (155 passing)
Summary by CodeRabbit
New Features
Documentation
Chores
Tests