From e61de6bbbcd13c560288b1347859808ee3ae83f7 Mon Sep 17 00:00:00 2001 From: Vinay Parakala Date: Mon, 8 Jun 2026 18:12:57 -0400 Subject: [PATCH 1/3] Add native pHash deduplication acceleration --- .circleci/config.yml | 167 ++++++++- CHANGELOG.md | 11 +- build.py | 24 ++ nucleus/_native_dedup.c | 595 ++++++++++++++++++++++++++++++ nucleus/_native_dedup.pyi | 5 + nucleus/local_deduplication.py | 29 +- pyproject.toml | 9 +- tests/test_local_deduplication.py | 57 +++ 8 files changed, 884 insertions(+), 13 deletions(-) create mode 100644 build.py create mode 100644 nucleus/_native_dedup.c create mode 100644 nucleus/_native_dedup.pyi diff --git a/.circleci/config.yml b/.circleci/config.yml index 19b13e17..739e1e04 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,6 +4,7 @@ version: 2.1 orbs: slack: circleci/slack@4.4.2 python: circleci/python@2.1.0 + win: circleci/windows@5.0 jobs: build_test: @@ -75,11 +76,16 @@ jobs: event: fail template: basic_fail_1 - pypi_publish: + build_sdist: docker: - image: cimg/python:3.10 steps: - checkout # checkout source code to working directory + - run: + name: Install Build Tools + command: | + pip install --upgrade pip + pip install poetry - run: name: Validate Tag Version # Check if the tag name matches the package version command: | @@ -99,9 +105,121 @@ jobs: exit 1; fi - run: - name: Build + name: Build sdist command: | # install env dependencies - poetry build + rm -rf dist + poetry build --format sdist + - persist_to_workspace: + root: . + paths: + - dist + + build_linux_wheels: + docker: + - image: cimg/python:3.10 + steps: + - checkout # checkout source code to working directory + - setup_remote_docker + - run: + name: Build Linux wheels + command: | + pip install --upgrade pip + pip install cibuildwheel + + export CIBW_ARCHS_LINUX="x86_64" + export CIBW_BUILD="cp310-manylinux_x86_64 cp311-manylinux_x86_64 cp312-manylinux_x86_64 cp313-manylinux_x86_64 cp314-manylinux_x86_64" + export CIBW_SKIP="pp* *-musllinux_*" + export CIBW_TEST_COMMAND='python -c "import nucleus._native_dedup as native; assert native.deduplicate_phashes([0, 1023, 2047], 10) == [0, 2]"' + python -m cibuildwheel --platform linux --output-dir dist + + ls -lh dist + - persist_to_workspace: + root: . + paths: + - dist + + build_macos_wheels: + macos: + xcode: 16.4.0 + resource_class: m4pro.medium + steps: + - checkout # checkout source code to working directory + - run: + name: Build macOS wheels + command: | + python3 -m pip install --upgrade pip + python3 -m pip install cibuildwheel + softwareupdate --install-rosetta --agree-to-license || true + + export CIBW_ARCHS_MACOS="universal2" + export CIBW_BUILD="cp310-macosx_* cp311-macosx_* cp312-macosx_* cp313-macosx_* cp314-macosx_*" + export CIBW_SKIP="pp*" + export CIBW_TEST_COMMAND='python -c "import nucleus._native_dedup as native; assert native.deduplicate_phashes([0, 1023, 2047], 10) == [0, 2]"' + python3 -m cibuildwheel --platform macos --output-dir dist + + ls -lh dist + - persist_to_workspace: + root: . + paths: + - dist + + build_windows_wheels: + executor: + name: win/default + size: medium + steps: + - checkout # checkout source code to working directory + - run: + name: Build Windows wheels + command: | + python -m pip install --upgrade pip + python -m pip install cibuildwheel + + $env:CIBW_BUILD = "cp310-win_amd64 cp311-win_amd64 cp312-win_amd64 cp313-win_amd64 cp314-win_amd64" + $env:CIBW_SKIP = "pp*" + $env:CIBW_TEST_COMMAND = 'python -c "import nucleus._native_dedup as native; assert native.deduplicate_phashes([0, 1023, 2047], 10) == [0, 2]"' + python -m cibuildwheel --platform windows --output-dir dist + + Get-ChildItem dist + - persist_to_workspace: + root: . + paths: + - dist + + pypi_publish: + docker: + - image: cimg/python:3.10 + steps: + - checkout # checkout source code to working directory + - attach_workspace: + at: . + - run: + name: Install Publish Tools + command: | + pip install --upgrade pip + pip install poetry + - run: + name: Validate Tag Version # Check if the tag name matches the package version + command: | + PKG_VERSION=$(sed -n 's/^version = //p' pyproject.toml | sed -e 's/^"//' -e 's/"$//') + if [[ "$CIRCLE_TAG" != "v${PKG_VERSION}" ]]; then + echo "ERROR: Tag name ($CIRCLE_TAG) must match package version (v${PKG_VERSION})." + exit 1; + fi + - run: + name: Validate SDK Version Increment # Check if the version is already on PyPI + command: | + PKG_VERSION=$(sed -n 's/^version = //p' pyproject.toml | sed -e 's/^"//' -e 's/"$//') + if pip install "scale-nucleus>=${PKG_VERSION}" > /dev/null 2>&1; + then + echo "ERROR: You need to increment to a new version before publishing!" + echo "Version (${PKG_VERSION}) already exists on PyPI." + exit 1; + fi + - run: + name: List artifacts to publish + command: | + ls -lh dist - run: name: Publish to PyPI command: | @@ -165,7 +283,8 @@ workflows: filters: tags: only: /^v\d+\.\d+\.\d+$/ # Runs only for tags with the format [v1.2.3] - - pypi_publish: + - build_sdist: + context: Nucleus requires: - build_test filters: @@ -173,4 +292,42 @@ workflows: ignore: /.*/ # Runs for none of the branches tags: only: /^v\d+\.\d+\.\d+$/ # Runs only for tags with the format [v1.2.3] - + - build_linux_wheels: + context: Nucleus + requires: + - build_test + filters: + branches: + ignore: /.*/ # Runs for none of the branches + tags: + only: /^v\d+\.\d+\.\d+$/ # Runs only for tags with the format [v1.2.3] + - build_macos_wheels: + context: Nucleus + requires: + - build_test + filters: + branches: + ignore: /.*/ # Runs for none of the branches + tags: + only: /^v\d+\.\d+\.\d+$/ # Runs only for tags with the format [v1.2.3] + - build_windows_wheels: + context: Nucleus + requires: + - build_test + filters: + branches: + ignore: /.*/ # Runs for none of the branches + tags: + only: /^v\d+\.\d+\.\d+$/ # Runs only for tags with the format [v1.2.3] + - pypi_publish: + context: Nucleus + requires: + - build_sdist + - build_linux_wheels + - build_macos_wheels + - build_windows_wheels + filters: + branches: + ignore: /.*/ # Runs for none of the branches + tags: + only: /^v\d+\.\d+\.\d+$/ # Runs only for tags with the format [v1.2.3] diff --git a/CHANGELOG.md b/CHANGELOG.md index 132a7ef4..7d904ced 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,19 @@ All notable changes to the [Nucleus Python Client](https://github.com/scaleapi/n The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.18.5](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.4) - 2026-05-28 +## [0.18.6](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.6) - 2026-06-15 ### Added +- Native C acceleration for `deduplicate_by_phash`. When the compiled extension is available, all threshold values are handled in native code: thresholds `0` through `11` use the chunked Hamming index, thresholds `12` through `63` use a native linear scan, and threshold `64` uses the keep-first fast path. The public Python API is unchanged and falls back to the pure-Python implementation when the native extension is unavailable. -- **Evaluations V2** client support for COCO-style metrics on model runs via stored `evaluation_match_v2` rows. `NucleusClient` exposes `create_evaluation_v2()`, `get_evaluation_v2()`, and `list_evaluations_v2()`. The `EvaluationV2` resource supports `wait_for_completion()`, `charts()` (mAP, confusion matrix, PR curve, TIDE, and related aggregates), `examples()` (paginated TP/FP/FN rows), `delete()`, and `refresh()`. `AllowedLabelMatch` configures allowed ground-truth / prediction label pairs; filter and response types include `EvaluationV2FilterArgs`, `EvaluationV2Charts`, `EvaluationV2ExamplesPage`, and `EvaluationV2MatchExample`. Sphinx docs cover the workflow under Evaluations V2. +### Tooling / CI +- Publish Linux `x86_64`, macOS `universal2`, and Windows `amd64` wheels for Python 3.10 through 3.14 using `cibuildwheel`, alongside the source distribution. + +## [0.18.5](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.5) - 2026-05-28 +### Added + +- **Evaluations V2** client support for COCO-style metrics on model runs via stored `evaluation_match_v2` rows. `NucleusClient` exposes `create_evaluation_v2()`, `get_evaluation_v2()`, and `list_evaluations_v2()`. The `EvaluationV2` resource supports `wait_for_completion()`, `charts()` (mAP, confusion matrix, PR curve, TIDE, and related aggregates), `examples()` (paginated TP/FP/FN rows), `delete()`, and `refresh()`. `AllowedLabelMatch` configures allowed ground-truth / prediction label pairs; filter and response types include `EvaluationV2FilterArgs`, `EvaluationV2Charts`, `EvaluationV2ExamplesPage`, and `EvaluationV2MatchExample`. Sphinx docs cover the workflow under Evaluations V2. ## [0.18.4](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.4) - 2026-06-08 diff --git a/build.py b/build.py new file mode 100644 index 00000000..cca5aa44 --- /dev/null +++ b/build.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import sys + +from setuptools import Extension + + +def build(setup_kwargs): + extra_compile_args = [] + if sys.platform != "win32": + extra_compile_args.extend(["-std=c11", "-O3"]) + + setup_kwargs.update( + { + "ext_modules": [ + Extension( + "nucleus._native_dedup", + ["nucleus/_native_dedup.c"], + extra_compile_args=extra_compile_args, + optional=True, + ) + ], + } + ) diff --git a/nucleus/_native_dedup.c b/nucleus/_native_dedup.c new file mode 100644 index 00000000..48cf31b5 --- /dev/null +++ b/nucleus/_native_dedup.c @@ -0,0 +1,595 @@ +#define PY_SSIZE_T_CLEAN +#include + +#include +#include +#include + +#if defined(_MSC_VER) +#include +#endif + +#define DEDUP_THRESHOLD_MAX 64 +#define INDEX_MAX_THRESHOLD 11 +#define PARTITION_COUNT 2 +#define CHUNK_COUNT 6 +#define ROTATED_PARTITION_BITS 8 + +static const int CHUNK_BITS[CHUNK_COUNT] = {11, 11, 11, 11, 10, 10}; + +typedef struct { + Py_ssize_t *values; + Py_ssize_t size; + Py_ssize_t capacity; +} Bucket; + +typedef struct { + Bucket *buckets; + Py_ssize_t bucket_count; +} ChunkIndex; + +typedef struct { + int threshold; + int chunk_radius; + uint64_t *hashes; + uint8_t *candidate_marks; + Py_ssize_t kept_count; + Py_ssize_t hash_capacity; + Py_ssize_t *touched_indexes; + Py_ssize_t touched_count; + Py_ssize_t touched_capacity; + ChunkIndex indexes[PARTITION_COUNT][CHUNK_COUNT]; +} HammingIndex; + +static int popcount64(uint64_t value) { +#if defined(_MSC_VER) + return (int)__popcnt64(value); +#elif defined(__GNUC__) || defined(__clang__) + return __builtin_popcountll(value); +#else + int count = 0; + while (value != 0) { + value &= value - 1; + count++; + } + return count; +#endif +} + +static uint64_t rotate_right_64(uint64_t value, int bits) { + return (value >> bits) | (value << (64 - bits)); +} + +static void partition_chunks( + uint64_t phash_value, int partition_index, uint64_t chunks[CHUNK_COUNT] +) { + int chunk_index; + int shift = 64; + + if (partition_index == 1) { + phash_value = rotate_right_64(phash_value, ROTATED_PARTITION_BITS); + } + + for (chunk_index = 0; chunk_index < CHUNK_COUNT; chunk_index++) { + shift -= CHUNK_BITS[chunk_index]; + chunks[chunk_index] = + (phash_value >> shift) & + ((((uint64_t)1) << CHUNK_BITS[chunk_index]) - 1); + } +} + +static int append_py_ssize( + Py_ssize_t **values, + Py_ssize_t *size, + Py_ssize_t *capacity, + Py_ssize_t value +) { + Py_ssize_t new_capacity; + Py_ssize_t *new_values; + + if (*size == *capacity) { + new_capacity = *capacity == 0 ? 4 : *capacity * 2; + if (new_capacity < *capacity) { + PyErr_NoMemory(); + return -1; + } + new_values = (Py_ssize_t *)realloc( + *values, (size_t)new_capacity * sizeof(Py_ssize_t) + ); + if (new_values == NULL) { + PyErr_NoMemory(); + return -1; + } + *values = new_values; + *capacity = new_capacity; + } + + (*values)[*size] = value; + *size += 1; + return 0; +} + +static int bucket_append(Bucket *bucket, Py_ssize_t kept_index) { + return append_py_ssize( + &bucket->values, &bucket->size, &bucket->capacity, kept_index + ); +} + +static int touched_append(HammingIndex *index, Py_ssize_t kept_index) { + return append_py_ssize( + &index->touched_indexes, + &index->touched_count, + &index->touched_capacity, + kept_index + ); +} + +static void clear_candidate_marks(HammingIndex *index) { + Py_ssize_t i; + + for (i = 0; i < index->touched_count; i++) { + index->candidate_marks[index->touched_indexes[i]] = 0; + } + index->touched_count = 0; +} + +static int hamming_index_init( + HammingIndex *index, Py_ssize_t hash_capacity, int threshold +) { + int partition_index; + int chunk_index; + Py_ssize_t bucket_count; + Py_ssize_t allocation_capacity = hash_capacity > 0 ? hash_capacity : 1; + + memset(index, 0, sizeof(*index)); + index->threshold = threshold; + index->chunk_radius = threshold / CHUNK_COUNT; + index->hash_capacity = allocation_capacity; + index->hashes = (uint64_t *)calloc( + (size_t)allocation_capacity, sizeof(uint64_t) + ); + index->candidate_marks = (uint8_t *)calloc( + (size_t)allocation_capacity, sizeof(uint8_t) + ); + if (index->hashes == NULL || index->candidate_marks == NULL) { + PyErr_NoMemory(); + return -1; + } + + for (partition_index = 0; partition_index < PARTITION_COUNT; + partition_index++) { + for (chunk_index = 0; chunk_index < CHUNK_COUNT; chunk_index++) { + bucket_count = ((Py_ssize_t)1) << CHUNK_BITS[chunk_index]; + index->indexes[partition_index][chunk_index].bucket_count = + bucket_count; + index->indexes[partition_index][chunk_index].buckets = + (Bucket *)calloc((size_t)bucket_count, sizeof(Bucket)); + if (index->indexes[partition_index][chunk_index].buckets == NULL) { + PyErr_NoMemory(); + return -1; + } + } + } + + return 0; +} + +static void hamming_index_free(HammingIndex *index) { + int partition_index; + int chunk_index; + Py_ssize_t bucket_index; + ChunkIndex *chunk_index_ptr; + + for (partition_index = 0; partition_index < PARTITION_COUNT; + partition_index++) { + for (chunk_index = 0; chunk_index < CHUNK_COUNT; chunk_index++) { + chunk_index_ptr = &index->indexes[partition_index][chunk_index]; + if (chunk_index_ptr->buckets == NULL) { + continue; + } + for (bucket_index = 0; bucket_index < chunk_index_ptr->bucket_count; + bucket_index++) { + free(chunk_index_ptr->buckets[bucket_index].values); + } + free(chunk_index_ptr->buckets); + } + } + + free(index->hashes); + free(index->candidate_marks); + free(index->touched_indexes); +} + +static int mark_bucket_candidates(HammingIndex *index, Bucket *bucket) { + Py_ssize_t i; + Py_ssize_t kept_index; + + for (i = 0; i < bucket->size; i++) { + kept_index = bucket->values[i]; + if (index->candidate_marks[kept_index] == 0) { + index->candidate_marks[kept_index] = 1; + if (touched_append(index, kept_index) < 0) { + return -1; + } + } + } + return 0; +} + +static int mark_partition_zero_candidates( + HammingIndex *index, uint64_t phash_value +) { + uint64_t chunks[CHUNK_COUNT]; + int chunk_index; + int bit_index; + uint64_t chunk_value; + uint64_t variant_value; + ChunkIndex *chunk_index_ptr; + + partition_chunks(phash_value, 0, chunks); + for (chunk_index = 0; chunk_index < CHUNK_COUNT; chunk_index++) { + chunk_index_ptr = &index->indexes[0][chunk_index]; + chunk_value = chunks[chunk_index]; + if (mark_bucket_candidates( + index, &chunk_index_ptr->buckets[chunk_value] + ) < 0) { + return -1; + } + + if (index->chunk_radius == 0) { + continue; + } + + for (bit_index = 0; bit_index < CHUNK_BITS[chunk_index]; + bit_index++) { + variant_value = chunk_value ^ (((uint64_t)1) << bit_index); + if (mark_bucket_candidates( + index, &chunk_index_ptr->buckets[variant_value] + ) < 0) { + return -1; + } + } + } + return 0; +} + +static int find_partition_one_duplicate( + HammingIndex *index, uint64_t phash_value +) { + uint64_t chunks[CHUNK_COUNT]; + int chunk_index; + int bit_index; + uint64_t chunk_value; + uint64_t variant_value; + ChunkIndex *chunk_index_ptr; + Bucket *bucket; + Py_ssize_t bucket_offset; + Py_ssize_t kept_index; + + partition_chunks(phash_value, 1, chunks); + for (chunk_index = 0; chunk_index < CHUNK_COUNT; chunk_index++) { + chunk_index_ptr = &index->indexes[1][chunk_index]; + chunk_value = chunks[chunk_index]; + + for (bit_index = -1; bit_index < CHUNK_BITS[chunk_index]; + bit_index++) { + if (bit_index == -1) { + variant_value = chunk_value; + } else { + if (index->chunk_radius == 0) { + break; + } + variant_value = chunk_value ^ (((uint64_t)1) << bit_index); + } + + bucket = &chunk_index_ptr->buckets[variant_value]; + for (bucket_offset = 0; bucket_offset < bucket->size; + bucket_offset++) { + kept_index = bucket->values[bucket_offset]; + if (index->candidate_marks[kept_index] != 1) { + continue; + } + if (popcount64(phash_value ^ index->hashes[kept_index]) <= + index->threshold) { + return 1; + } + index->candidate_marks[kept_index] = 2; + } + } + } + return 0; +} + +static int hamming_index_find_duplicate( + HammingIndex *index, uint64_t phash_value +) { + int found_duplicate; + + if (index->kept_count == 0) { + return 0; + } + + index->touched_count = 0; + if (mark_partition_zero_candidates(index, phash_value) < 0) { + clear_candidate_marks(index); + return -1; + } + + if (index->touched_count == 0) { + return 0; + } + + found_duplicate = find_partition_one_duplicate(index, phash_value); + clear_candidate_marks(index); + return found_duplicate; +} + +static int hamming_index_add(HammingIndex *index, uint64_t phash_value) { + uint64_t chunks[CHUNK_COUNT]; + int partition_index; + int chunk_index; + Py_ssize_t kept_index = index->kept_count; + ChunkIndex *chunk_index_ptr; + + if (kept_index >= index->hash_capacity) { + PyErr_NoMemory(); + return -1; + } + + index->hashes[kept_index] = phash_value; + index->candidate_marks[kept_index] = 0; + for (partition_index = 0; partition_index < PARTITION_COUNT; + partition_index++) { + partition_chunks(phash_value, partition_index, chunks); + for (chunk_index = 0; chunk_index < CHUNK_COUNT; chunk_index++) { + chunk_index_ptr = &index->indexes[partition_index][chunk_index]; + if (bucket_append( + &chunk_index_ptr->buckets[chunks[chunk_index]], kept_index + ) < 0) { + return -1; + } + } + } + + index->kept_count += 1; + return 0; +} + +static PyObject *build_kept_index_list( + const Py_ssize_t *kept_indexes, Py_ssize_t kept_count +) { + PyObject *result = PyList_New(0); + PyObject *index_obj; + Py_ssize_t i; + + if (result == NULL) { + return NULL; + } + + for (i = 0; i < kept_count; i++) { + index_obj = PyLong_FromSsize_t(kept_indexes[i]); + if (index_obj == NULL) { + Py_DECREF(result); + return NULL; + } + if (PyList_Append(result, index_obj) < 0) { + Py_DECREF(index_obj); + Py_DECREF(result); + return NULL; + } + Py_DECREF(index_obj); + } + + return result; +} + +static PyObject *native_deduplicate_phashes( + PyObject *self, PyObject *args +); + +static PyObject *deduplicate_with_index( + const uint64_t *phashes, Py_ssize_t input_count, int threshold +) { + HammingIndex index; + Py_ssize_t *kept_indexes = NULL; + Py_ssize_t i; + Py_ssize_t kept_count = 0; + int is_duplicate; + PyObject *result = NULL; + + memset(&index, 0, sizeof(index)); + + if (input_count > 0) { + kept_indexes = (Py_ssize_t *)malloc( + (size_t)input_count * sizeof(Py_ssize_t) + ); + if (kept_indexes == NULL) { + PyErr_NoMemory(); + return NULL; + } + } + + if (hamming_index_init(&index, input_count, threshold) < 0) { + goto cleanup; + } + + for (i = 0; i < input_count; i++) { + is_duplicate = hamming_index_find_duplicate(&index, phashes[i]); + if (is_duplicate < 0) { + goto cleanup; + } + if (!is_duplicate) { + if (hamming_index_add(&index, phashes[i]) < 0) { + goto cleanup; + } + kept_indexes[kept_count] = i; + kept_count += 1; + } + + if ((i & 0x3fff) == 0 && PyErr_CheckSignals() < 0) { + goto cleanup; + } + } + + result = build_kept_index_list(kept_indexes, kept_count); + +cleanup: + hamming_index_free(&index); + free(kept_indexes); + return result; +} + +static PyObject *deduplicate_with_linear_scan( + const uint64_t *phashes, Py_ssize_t input_count, int threshold +) { + Py_ssize_t *kept_indexes = NULL; + Py_ssize_t i; + Py_ssize_t kept_offset; + Py_ssize_t kept_count = 0; + int is_duplicate; + PyObject *result; + + if (input_count > 0) { + kept_indexes = (Py_ssize_t *)malloc( + (size_t)input_count * sizeof(Py_ssize_t) + ); + if (kept_indexes == NULL) { + PyErr_NoMemory(); + return NULL; + } + } + + for (i = 0; i < input_count; i++) { + is_duplicate = 0; + for (kept_offset = 0; kept_offset < kept_count; kept_offset++) { + if (popcount64(phashes[i] ^ phashes[kept_indexes[kept_offset]]) <= + threshold) { + is_duplicate = 1; + break; + } + } + + if (!is_duplicate) { + kept_indexes[kept_count] = i; + kept_count += 1; + } + + if ((i & 0x3fff) == 0 && PyErr_CheckSignals() < 0) { + free(kept_indexes); + return NULL; + } + } + + result = build_kept_index_list(kept_indexes, kept_count); + free(kept_indexes); + return result; +} + +static PyObject *deduplicate_keep_first(Py_ssize_t input_count) { + PyObject *result; + PyObject *index_obj; + + result = PyList_New(0); + if (result == NULL || input_count == 0) { + return result; + } + + index_obj = PyLong_FromLong(0); + if (index_obj == NULL) { + Py_DECREF(result); + return NULL; + } + if (PyList_Append(result, index_obj) < 0) { + Py_DECREF(index_obj); + Py_DECREF(result); + return NULL; + } + Py_DECREF(index_obj); + return result; +} + +static PyObject *native_deduplicate_phashes( + PyObject *self, PyObject *args +) { + PyObject *phashes_obj; + PyObject *phashes_sequence; + PyObject **phash_items; + PyObject *result = NULL; + uint64_t *phashes = NULL; + Py_ssize_t input_count; + Py_ssize_t i; + int threshold; + + (void)self; + + if (!PyArg_ParseTuple(args, "Oi", &phashes_obj, &threshold)) { + return NULL; + } + if (threshold < 0 || threshold > DEDUP_THRESHOLD_MAX) { + PyErr_SetString( + PyExc_ValueError, + "native pHash deduplication supports thresholds between 0 and 64" + ); + return NULL; + } + + phashes_sequence = PySequence_Fast( + phashes_obj, "phashes must be an iterable of 64-bit integer pHashes" + ); + if (phashes_sequence == NULL) { + return NULL; + } + + input_count = PySequence_Fast_GET_SIZE(phashes_sequence); + phash_items = PySequence_Fast_ITEMS(phashes_sequence); + + if (input_count > 0) { + phashes = (uint64_t *)malloc((size_t)input_count * sizeof(uint64_t)); + if (phashes == NULL) { + PyErr_NoMemory(); + goto cleanup; + } + } + + for (i = 0; i < input_count; i++) { + phashes[i] = (uint64_t)PyLong_AsUnsignedLongLong(phash_items[i]); + if (PyErr_Occurred()) { + goto cleanup; + } + } + + if (threshold == DEDUP_THRESHOLD_MAX) { + result = deduplicate_keep_first(input_count); + } else if (threshold <= INDEX_MAX_THRESHOLD) { + result = deduplicate_with_index(phashes, input_count, threshold); + } else { + result = deduplicate_with_linear_scan(phashes, input_count, threshold); + } + +cleanup: + Py_DECREF(phashes_sequence); + free(phashes); + return result; +} + +static PyMethodDef NativeDedupMethods[] = { + { + "deduplicate_phashes", + native_deduplicate_phashes, + METH_VARARGS, + "Return indexes of pHashes kept by exact Hamming-distance deduplication.", + }, + {NULL, NULL, 0, NULL}, +}; + +static struct PyModuleDef NativeDedupModule = { + PyModuleDef_HEAD_INIT, + "_native_dedup", + "Native pHash deduplication helpers.", + -1, + NativeDedupMethods, +}; + +PyMODINIT_FUNC PyInit__native_dedup(void) { + return PyModule_Create(&NativeDedupModule); +} diff --git a/nucleus/_native_dedup.pyi b/nucleus/_native_dedup.pyi new file mode 100644 index 00000000..d8e32ba7 --- /dev/null +++ b/nucleus/_native_dedup.pyi @@ -0,0 +1,5 @@ +from collections.abc import Iterable + +def deduplicate_phashes( + phashes: Iterable[int], threshold: int +) -> list[int]: ... diff --git a/nucleus/local_deduplication.py b/nucleus/local_deduplication.py index 46643345..c48e5f74 100644 --- a/nucleus/local_deduplication.py +++ b/nucleus/local_deduplication.py @@ -2,23 +2,30 @@ from __future__ import annotations +import importlib import itertools import re from collections.abc import Callable, Iterable, Mapping from dataclasses import dataclass -from typing import Generic, List, Optional, Sequence, TypeVar, Union +from typing import Any, Generic, List, Optional, Sequence, TypeVar, Union from .constants import ITEM_KEY from .dataset_item import DatasetItem from .deduplication import DeduplicationStats +_NATIVE_DEDUP: Optional[Any] +try: + _NATIVE_DEDUP = importlib.import_module("nucleus._native_dedup") +except ImportError: + _NATIVE_DEDUP = None + InputT = TypeVar("InputT") _TieBreakKey = tuple[int, Union[int, str]] PHASH_REGEX = re.compile(r"^[01]{64}$") DEDUP_THRESHOLD_MIN = 0 DEDUP_THRESHOLD_MAX = 64 -LOCAL_INDEX_MAX_THRESHOLD = 10 +LOCAL_INDEX_MAX_THRESHOLD = 11 PARTITION_COUNT = 2 INDEX_CHUNK_BITS = (11, 11, 11, 11, 10, 10) INDEX_CHUNK_COUNT = len(INDEX_CHUNK_BITS) @@ -63,7 +70,7 @@ class _HammingIndex: chunking. If two hashes are within threshold ``t``, then each partition has at least one chunk within ``floor(t / 6)``. Querying both partitions and intersecting the candidates keeps the result exact while avoiding a full - scan for the intended ``t <= 10`` case. + scan for the intended ``t <= 11`` case. The chunk tolerance and the rotation are separate mechanisms. For threshold 10, ``floor(10 / 6) == 1``, so a query chunk ``x`` probes the @@ -203,7 +210,10 @@ def deduplicate_by_phash( records.sort(key=lambda record: (record.phash_value, record.stable_id)) - if threshold == 0: + native_unique_records = _deduplicate_with_native(records, threshold) + if native_unique_records is not None: + unique_records = native_unique_records + elif threshold == 0: unique_records = _deduplicate_exact(records) elif threshold == DEDUP_THRESHOLD_MAX: unique_records = [records[0]] @@ -323,6 +333,17 @@ def _deduplicate_with_index( return unique_records +def _deduplicate_with_native( + records: Sequence[_DeduplicationRecord[InputT]], threshold: int +) -> Optional[List[_DeduplicationRecord[InputT]]]: + if _NATIVE_DEDUP is None: + return None + + phash_values = [record.phash_value for record in records] + kept_indexes = _NATIVE_DEDUP.deduplicate_phashes(phash_values, threshold) + return [records[index] for index in kept_indexes] + + def _deduplicate_with_linear_scan( records: Sequence[_DeduplicationRecord[InputT]], threshold: int ) -> List[_DeduplicationRecord[InputT]]: diff --git a/pyproject.toml b/pyproject.toml index 9cc4dfd5..0bc7448d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ ignore = ["E501", "E741", "E731", "F401"] # Easy ignore for getting it running [tool.poetry] name = "scale-nucleus" -version = "0.18.5" +version = "0.18.6" description = "The official Python client library for Nucleus, the Data Platform for AI" license = "MIT" authors = ["Scale AI Nucleus Team "] @@ -34,6 +34,11 @@ homepage = "https://scale.com/nucleus" repository = "https://github.com/scaleapi/nucleus-python-client" documentation = "https://dashboard.scale.com/nucleus/docs/api" packages = [{include="nucleus"}, {include="cli"}] +include = [ + { path = "nucleus/_native_dedup.c", format = "sdist" }, + { path = "nucleus/_native_dedup.pyi", format = ["sdist", "wheel"] }, +] +build = { script = "build.py", generate-setup-file = true } [tool.poetry.dependencies] python = ">=3.10,<4.0" @@ -94,5 +99,5 @@ markers = [ [build-system] -requires = ["poetry-core>=1.0.0"] +requires = ["poetry-core>=1.0.0", "setuptools>=68"] build-backend = "poetry.core.masonry.api" diff --git a/tests/test_local_deduplication.py b/tests/test_local_deduplication.py index 2f9278db..64dc0c86 100644 --- a/tests/test_local_deduplication.py +++ b/tests/test_local_deduplication.py @@ -2,9 +2,15 @@ import pytest +import nucleus.local_deduplication as local_deduplication from nucleus import DatasetItem, deduplicate_by_phash from nucleus.local_deduplication import LocalDeduplicationResult +try: + from nucleus import _native_dedup +except ImportError: + _native_dedup = None + def _item(reference_id, phash): return DatasetItem( @@ -223,3 +229,54 @@ def test_deduplicate_by_phash_preserves_missing_reference_ids(): ) assert result.unique_reference_ids == [None, "b"] + + +@pytest.mark.parametrize("threshold", [0, 10, 12, 64]) +def test_deduplicate_by_phash_uses_native_when_available( + monkeypatch, threshold +): + row_a = _row("a", "0" * 64) + row_b = _row("b", "1" * 64) + + class FakeNativeDedup: + @staticmethod + def deduplicate_phashes(phashes, actual_threshold): + assert phashes == [0, (1 << 64) - 1] + assert actual_threshold == threshold + return [1] + + monkeypatch.setattr(local_deduplication, "_NATIVE_DEDUP", FakeNativeDedup) + + result = deduplicate_by_phash([row_a, row_b], threshold=threshold) + + assert result.unique == [row_b] + + +@pytest.mark.skipif( + _native_dedup is None, + reason="native deduplication extension is not built in this environment", +) +def test_native_deduplication_matches_python_index(): + phashes = [ + int("0" * 64, 2), + int(_flip_bits("0" * 64, range(10)), 2), + int(_flip_bits("0" * 64, range(11)), 2), + int("1" * 64, 2), + ] + + assert _native_dedup.deduplicate_phashes(phashes, 10) == [0, 2, 3] + + +@pytest.mark.skipif( + _native_dedup is None, + reason="native deduplication extension is not built in this environment", +) +def test_native_deduplication_handles_linear_scan_threshold(): + phashes = [ + int("0" * 64, 2), + int(_flip_bits("0" * 64, range(12)), 2), + int(_flip_bits("0" * 64, range(13)), 2), + int("1" * 64, 2), + ] + + assert _native_dedup.deduplicate_phashes(phashes, 12) == [0, 2, 3] From 539909f4a7d93ab98059855cb23338f5cb5ce047 Mon Sep 17 00:00:00 2001 From: Vinay Parakala Date: Tue, 9 Jun 2026 16:07:09 -0400 Subject: [PATCH 2/3] Name candidate mark states in native dedup index. Replace magic 0/1/2 literals with a documented CandidateMark enum so Hamming-index query state is easier to follow. Co-authored-by: Cursor --- nucleus/_native_dedup.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/nucleus/_native_dedup.c b/nucleus/_native_dedup.c index 48cf31b5..8340f650 100644 --- a/nucleus/_native_dedup.c +++ b/nucleus/_native_dedup.c @@ -17,6 +17,12 @@ static const int CHUNK_BITS[CHUNK_COUNT] = {11, 11, 11, 11, 10, 10}; +typedef enum { + CANDIDATE_MARK_UNMARKED = 0, /* default; not involved in this query */ + CANDIDATE_MARK_CANDIDATE = 1, /* partition 0 hit; check in partition 1 */ + CANDIDATE_MARK_REJECTED = 2, /* partition 1 checked; not within threshold */ +} CandidateMark; + typedef struct { Py_ssize_t *values; Py_ssize_t size; @@ -128,7 +134,8 @@ static void clear_candidate_marks(HammingIndex *index) { Py_ssize_t i; for (i = 0; i < index->touched_count; i++) { - index->candidate_marks[index->touched_indexes[i]] = 0; + index->candidate_marks[index->touched_indexes[i]] = + CANDIDATE_MARK_UNMARKED; } index->touched_count = 0; } @@ -206,8 +213,8 @@ static int mark_bucket_candidates(HammingIndex *index, Bucket *bucket) { for (i = 0; i < bucket->size; i++) { kept_index = bucket->values[i]; - if (index->candidate_marks[kept_index] == 0) { - index->candidate_marks[kept_index] = 1; + if (index->candidate_marks[kept_index] == CANDIDATE_MARK_UNMARKED) { + index->candidate_marks[kept_index] = CANDIDATE_MARK_CANDIDATE; if (touched_append(index, kept_index) < 0) { return -1; } @@ -286,14 +293,15 @@ static int find_partition_one_duplicate( for (bucket_offset = 0; bucket_offset < bucket->size; bucket_offset++) { kept_index = bucket->values[bucket_offset]; - if (index->candidate_marks[kept_index] != 1) { + if (index->candidate_marks[kept_index] != + CANDIDATE_MARK_CANDIDATE) { continue; } if (popcount64(phash_value ^ index->hashes[kept_index]) <= index->threshold) { return 1; } - index->candidate_marks[kept_index] = 2; + index->candidate_marks[kept_index] = CANDIDATE_MARK_REJECTED; } } } @@ -337,7 +345,7 @@ static int hamming_index_add(HammingIndex *index, uint64_t phash_value) { } index->hashes[kept_index] = phash_value; - index->candidate_marks[kept_index] = 0; + index->candidate_marks[kept_index] = CANDIDATE_MARK_UNMARKED; for (partition_index = 0; partition_index < PARTITION_COUNT; partition_index++) { partition_chunks(phash_value, partition_index, chunks); From fc377e0b4f0c20d4d583c371210f8c150f88a610 Mon Sep 17 00:00:00 2001 From: Vinay Parakala Date: Mon, 15 Jun 2026 16:34:03 -0400 Subject: [PATCH 3/3] Address native dedup review nits --- nucleus/_native_dedup.c | 9 +++------ tests/test_local_deduplication.py | 14 +++++++++++--- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/nucleus/_native_dedup.c b/nucleus/_native_dedup.c index 8340f650..a94e92d9 100644 --- a/nucleus/_native_dedup.c +++ b/nucleus/_native_dedup.c @@ -55,6 +55,7 @@ static int popcount64(uint64_t value) { #else int count = 0; while (value != 0) { + /* Kernighan's algorithm drops one set bit per iteration. */ value &= value - 1; count++; } @@ -392,7 +393,7 @@ static PyObject *build_kept_index_list( } static PyObject *native_deduplicate_phashes( - PyObject *self, PyObject *args + PyObject *Py_UNUSED(self), PyObject *args ); static PyObject *deduplicate_with_index( @@ -405,8 +406,6 @@ static PyObject *deduplicate_with_index( int is_duplicate; PyObject *result = NULL; - memset(&index, 0, sizeof(index)); - if (input_count > 0) { kept_indexes = (Py_ssize_t *)malloc( (size_t)input_count * sizeof(Py_ssize_t) @@ -517,7 +516,7 @@ static PyObject *deduplicate_keep_first(Py_ssize_t input_count) { } static PyObject *native_deduplicate_phashes( - PyObject *self, PyObject *args + PyObject *Py_UNUSED(self), PyObject *args ) { PyObject *phashes_obj; PyObject *phashes_sequence; @@ -528,8 +527,6 @@ static PyObject *native_deduplicate_phashes( Py_ssize_t i; int threshold; - (void)self; - if (!PyArg_ParseTuple(args, "Oi", &phashes_obj, &threshold)) { return NULL; } diff --git a/tests/test_local_deduplication.py b/tests/test_local_deduplication.py index 64dc0c86..ed6c8c5a 100644 --- a/tests/test_local_deduplication.py +++ b/tests/test_local_deduplication.py @@ -231,7 +231,7 @@ def test_deduplicate_by_phash_preserves_missing_reference_ids(): assert result.unique_reference_ids == [None, "b"] -@pytest.mark.parametrize("threshold", [0, 10, 12, 64]) +@pytest.mark.parametrize("threshold", [0, 10, 11, 12, 64]) def test_deduplicate_by_phash_uses_native_when_available( monkeypatch, threshold ): @@ -256,7 +256,12 @@ def deduplicate_phashes(phashes, actual_threshold): _native_dedup is None, reason="native deduplication extension is not built in this environment", ) -def test_native_deduplication_matches_python_index(): +@pytest.mark.parametrize( + ("threshold", "expected_indexes"), [(10, [0, 2, 3]), (11, [0, 3])] +) +def test_native_deduplication_matches_python_index( + threshold, expected_indexes +): phashes = [ int("0" * 64, 2), int(_flip_bits("0" * 64, range(10)), 2), @@ -264,7 +269,10 @@ def test_native_deduplication_matches_python_index(): int("1" * 64, 2), ] - assert _native_dedup.deduplicate_phashes(phashes, 10) == [0, 2, 3] + assert ( + _native_dedup.deduplicate_phashes(phashes, threshold) + == expected_indexes + ) @pytest.mark.skipif(