From a12e059658225a179fda4e36c1817db0433da2c1 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Thu, 18 Dec 2025 21:58:18 +0530 Subject: [PATCH 01/11] feat: expose refresh parameter in ElasticsearchDocumentStore Add configurable refresh parameter to write, delete, and update methods in ElasticsearchDocumentStore. This allows users to control when index changes become visible to search operations, enabling read-your-writes consistency without relying on time.sleep() workarounds. Methods updated: - write_documents / write_documents_async - delete_documents / delete_documents_async - delete_all_documents / delete_all_documents_async - delete_by_filter / delete_by_filter_async - update_by_filter / update_by_filter_async The refresh parameter accepts: - True: Force immediate refresh - False: No refresh (best for bulk performance) - 'wait_for': Wait for next refresh cycle (default) Closes #2065 --- .../elasticsearch/document_store.py | 100 ++++++++++++++---- .../tests/test_document_store.py | 27 +++-- 2 files changed, 90 insertions(+), 37 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index b6400af95..06669e461 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -25,6 +25,12 @@ logger = logging.getLogger(__name__) +# Type alias for the refresh parameter +# - True: Refresh immediately (force refresh) +# - False: Don't refresh (best for bulk performance) +# - "wait_for": Wait for next refresh cycle to make docs visible +RefreshType = Union[bool, Literal["wait_for"]] + Hosts = Union[str, list[Union[str, Mapping[str, Union[str, int]], NodeConfig]]] # document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to @@ -410,12 +416,21 @@ def _deserialize_document(hit: dict[str, Any]) -> Document: return Document.from_dict(data) - def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int: + def write_documents( + self, + documents: list[Document], + policy: DuplicatePolicy = DuplicatePolicy.NONE, + refresh: RefreshType = "wait_for", + ) -> int: """ Writes `Document`s to Elasticsearch. :param documents: List of Documents to write to the document store. :param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store. + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). :raises ValueError: If `documents` is not a list of `Document`s. :raises DuplicateDocumentError: If a document with the same ID already exists in the document store and `policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`. @@ -456,7 +471,7 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D documents_written, errors = helpers.bulk( client=self.client, actions=elasticsearch_actions, - refresh="wait_for", + refresh=refresh, index=self._index, raise_on_error=False, stats_only=False, @@ -488,13 +503,20 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D return documents_written async def write_documents_async( - self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE + self, + documents: list[Document], + policy: DuplicatePolicy = DuplicatePolicy.NONE, + refresh: RefreshType = "wait_for", ) -> int: """ Asynchronously writes `Document`s to Elasticsearch. :param documents: List of Documents to write to the document store. :param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store. + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). :raises ValueError: If `documents` is not a list of `Document`s. :raises DuplicateDocumentError: If a document with the same ID already exists in the document store and `policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`. @@ -537,7 +559,7 @@ async def write_documents_async( client=self.async_client, actions=actions, index=self._index, - refresh=True, + refresh=refresh, raise_on_error=False, stats_only=False, ) @@ -556,33 +578,41 @@ async def write_documents_async( msg = f"Failed to write documents to Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_documents(self, document_ids: list[str]) -> None: + def delete_documents(self, document_ids: list[str], refresh: RefreshType = "wait_for") -> None: """ Deletes all documents with a matching document_ids from the document store. :param document_ids: the document ids to delete + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). """ helpers.bulk( client=self.client, actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), - refresh="wait_for", + refresh=refresh, index=self._index, raise_on_error=False, ) - def _prepare_delete_all_request(self, *, is_async: bool) -> dict[str, Any]: + def _prepare_delete_all_request(self, *, is_async: bool, refresh: RefreshType) -> dict[str, Any]: return { "index": self._index, "body": {"query": {"match_all": {}}}, # Delete all documents "wait_for_completion": False if is_async else True, # block until done (set False for async) - "refresh": True, # Ensure changes are visible immediately + "refresh": refresh, } - async def delete_documents_async(self, document_ids: list[str]) -> None: + async def delete_documents_async(self, document_ids: list[str], refresh: RefreshType = "wait_for") -> None: """ Asynchronously deletes all documents with a matching document_ids from the document store. :param document_ids: the document ids to delete + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). """ self._ensure_initialized() @@ -591,13 +621,13 @@ async def delete_documents_async(self, document_ids: list[str]) -> None: client=self.async_client, actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), index=self._index, - refresh=True, + refresh=refresh, ) except Exception as e: msg = f"Failed to delete documents from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_all_documents(self, recreate_index: bool = False) -> None: + def delete_all_documents(self, recreate_index: bool = False, refresh: RefreshType = True) -> None: """ Deletes all documents in the document store. @@ -605,6 +635,10 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation (default). + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle. """ self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists @@ -630,20 +664,24 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: self._client.indices.create(index=self._index, mappings=mappings) # type: ignore else: - result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) # type: ignore + result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False, refresh=refresh)) # type: ignore logger.info( "Deleted all the {n_docs} documents from the index '{index}'.", index=self._index, n_docs=result["deleted"], ) - async def delete_all_documents_async(self, recreate_index: bool = False) -> None: + async def delete_all_documents_async(self, recreate_index: bool = False, refresh: RefreshType = True) -> None: """ Asynchronously deletes all documents in the document store. A fast way to clear all documents from the document store while preserving any index settings and mappings. :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation (default). + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle. """ self._ensure_initialized() # ensures _async_client is not None @@ -670,7 +708,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None else: # use delete_by_query for more efficient deletion without index recreation # For async, we need to wait for completion to get the deleted count - delete_request = self._prepare_delete_all_request(is_async=True) + delete_request = self._prepare_delete_all_request(is_async=True, refresh=refresh) delete_request["wait_for_completion"] = True # Override to wait for completion in async result = await self._async_client.delete_by_query(**delete_request) # type: ignore logger.info( @@ -683,12 +721,16 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None msg = f"Failed to delete all documents from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_by_filter(self, filters: dict[str, Any]) -> int: + def delete_by_filter(self, filters: dict[str, Any], refresh: RefreshType = "wait_for") -> int: """ Deletes all documents that match the provided filters. :param filters: The filters to apply to select documents for deletion. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -696,7 +738,7 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int: try: normalized_filters = _normalize_filters(filters) body = {"query": {"bool": {"filter": normalized_filters}}} - result = self.client.delete_by_query(index=self._index, body=body) # type: ignore + result = self.client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore deleted_count = result.get("deleted", 0) logger.info( "Deleted {n_docs} documents from index '{index}' using filters.", @@ -708,12 +750,16 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int: msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: + async def delete_by_filter_async(self, filters: dict[str, Any], refresh: RefreshType = "wait_for") -> int: """ Asynchronously deletes all documents that match the provided filters. :param filters: The filters to apply to select documents for deletion. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -721,7 +767,7 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: try: normalized_filters = _normalize_filters(filters) body = {"query": {"bool": {"filter": normalized_filters}}} - result = await self.async_client.delete_by_query(index=self._index, body=body) # type: ignore + result = await self.async_client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore deleted_count = result.get("deleted", 0) logger.info( "Deleted {n_docs} documents from index '{index}' using filters.", @@ -733,13 +779,17 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: RefreshType = "wait_for") -> int: """ Updates the metadata of all documents that match the provided filters. :param filters: The filters to apply to select documents for updating. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param meta: The metadata fields to update. + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). :returns: The number of documents updated. """ self._ensure_initialized() @@ -753,7 +803,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int "script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"}, } - result = self.client.update_by_query(index=self._index, body=body) # type: ignore + result = self.client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore updated_count = result.get("updated", 0) logger.info( "Updated {n_docs} documents in index '{index}' using filters.", @@ -765,13 +815,19 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int msg = f"Failed to update documents by filter in Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + async def update_by_filter_async( + self, filters: dict[str, Any], meta: dict[str, Any], refresh: RefreshType = "wait_for" + ) -> int: """ Asynchronously updates the metadata of all documents that match the provided filters. :param filters: The filters to apply to select documents for updating. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param meta: The metadata fields to update. + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). :returns: The number of documents updated. """ self._ensure_initialized() @@ -785,7 +841,7 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, "script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"}, } - result = await self.async_client.update_by_query(index=self._index, body=body) # type: ignore + result = await self.async_client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore updated_count = result.get("updated", 0) logger.info( "Updated {n_docs} documents in index '{index}' using filters.", diff --git a/integrations/elasticsearch/tests/test_document_store.py b/integrations/elasticsearch/tests/test_document_store.py index ed7c763a1..0ded2d619 100644 --- a/integrations/elasticsearch/tests/test_document_store.py +++ b/integrations/elasticsearch/tests/test_document_store.py @@ -2,9 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 -import asyncio import random -import time from unittest.mock import Mock, patch import pytest @@ -525,8 +523,7 @@ def test_delete_all_documents_no_index_recreation(self, document_store: Elastics document_store.write_documents(docs) assert document_store.count_documents() == 2 - document_store.delete_all_documents(recreate_index=False) - time.sleep(2) # need to wait for the deletion to be reflected in count_documents + document_store.delete_all_documents(recreate_index=False, refresh=True) assert document_store.count_documents() == 0 new_doc = Document(id="3", content="New document after delete all") @@ -547,8 +544,9 @@ def test_delete_by_filter(self, document_store: ElasticsearchDocumentStore): assert document_store.count_documents() == 3 # Delete documents with category="A" - deleted_count = document_store.delete_by_filter(filters={"field": "category", "operator": "==", "value": "A"}) - time.sleep(2) # wait for deletion to be reflected + deleted_count = document_store.delete_by_filter( + filters={"field": "category", "operator": "==", "value": "A"}, refresh=True + ) assert deleted_count == 2 assert document_store.count_documents() == 1 @@ -568,9 +566,10 @@ def test_update_by_filter(self, document_store: ElasticsearchDocumentStore): # Update status for category="A" documents updated_count = document_store.update_by_filter( - filters={"field": "category", "operator": "==", "value": "A"}, meta={"status": "published"} + filters={"field": "category", "operator": "==", "value": "A"}, + meta={"status": "published"}, + refresh=True, ) - time.sleep(2) # wait for update to be reflected assert updated_count == 2 # Verify the updates @@ -803,9 +802,7 @@ async def test_delete_all_documents_async_no_index_recreation(self, document_sto await document_store.write_documents_async(docs) assert await document_store.count_documents_async() == 2 - await document_store.delete_all_documents_async(recreate_index=False) - # Need to wait for the deletion to be reflected in count_documents - await asyncio.sleep(2) + await document_store.delete_all_documents_async(recreate_index=False, refresh=True) assert await document_store.count_documents_async() == 0 new_doc = Document(id="3", content="New document after delete all") @@ -824,9 +821,8 @@ async def test_delete_by_filter_async(self, document_store): # Delete documents with category="A" deleted_count = await document_store.delete_by_filter_async( - filters={"field": "category", "operator": "==", "value": "A"} + filters={"field": "category", "operator": "==", "value": "A"}, refresh=True ) - await asyncio.sleep(2) # wait for deletion to be reflected assert deleted_count == 2 assert await document_store.count_documents_async() == 1 @@ -848,9 +844,10 @@ async def test_update_by_filter_async(self, document_store): # Update status for category="A" documents updated_count = await document_store.update_by_filter_async( - filters={"field": "category", "operator": "==", "value": "A"}, meta={"status": "published"} + filters={"field": "category", "operator": "==", "value": "A"}, + meta={"status": "published"}, + refresh=True, ) - await asyncio.sleep(2) # wait for update to be reflected assert updated_count == 2 # Verify the updates From 8ccbdb3853707cccb6d29eafd76f4b2f31899bf4 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Thu, 18 Dec 2025 22:22:15 +0530 Subject: [PATCH 02/11] docs: add Elasticsearch refresh documentation link to all refresh parameters --- .../document_stores/elasticsearch/document_store.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 06669e461..88a68cbd0 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -431,6 +431,7 @@ def write_documents( - `True`: Force refresh immediately after the operation. - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :raises ValueError: If `documents` is not a list of `Document`s. :raises DuplicateDocumentError: If a document with the same ID already exists in the document store and `policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`. @@ -517,6 +518,7 @@ async def write_documents_async( - `True`: Force refresh immediately after the operation. - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :raises ValueError: If `documents` is not a list of `Document`s. :raises DuplicateDocumentError: If a document with the same ID already exists in the document store and `policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`. @@ -587,6 +589,7 @@ def delete_documents(self, document_ids: list[str], refresh: RefreshType = "wait - `True`: Force refresh immediately after the operation. - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ helpers.bulk( client=self.client, @@ -613,6 +616,7 @@ async def delete_documents_async(self, document_ids: list[str], refresh: Refresh - `True`: Force refresh immediately after the operation. - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ self._ensure_initialized() @@ -639,6 +643,7 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: RefreshTyp - `True`: Force refresh immediately after the operation (default). - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle. + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists @@ -682,6 +687,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh - `True`: Force refresh immediately after the operation (default). - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle. + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ self._ensure_initialized() # ensures _async_client is not None @@ -731,6 +737,7 @@ def delete_by_filter(self, filters: dict[str, Any], refresh: RefreshType = "wait - `True`: Force refresh immediately after the operation. - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -760,6 +767,7 @@ async def delete_by_filter_async(self, filters: dict[str, Any], refresh: Refresh - `True`: Force refresh immediately after the operation. - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -790,6 +798,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refres - `True`: Force refresh immediately after the operation. - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents updated. """ self._ensure_initialized() @@ -828,6 +837,7 @@ async def update_by_filter_async( - `True`: Force refresh immediately after the operation. - `False`: Do not refresh (better performance for bulk operations). - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents updated. """ self._ensure_initialized() From 289b61dcbc9b68979f8cda0eb27aa9dd91917651 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 16:30:13 +0530 Subject: [PATCH 03/11] fix: restrict refresh param to bool for delete_by_query and update_by_query methods (per ES docs and review) --- .../elasticsearch/document_store.py | 48 +++++-------------- 1 file changed, 12 insertions(+), 36 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 88a68cbd0..cf22723e5 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -631,7 +631,7 @@ async def delete_documents_async(self, document_ids: list[str], refresh: Refresh msg = f"Failed to delete documents from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_all_documents(self, recreate_index: bool = False, refresh: RefreshType = True) -> None: + def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None: """ Deletes all documents in the document store. @@ -639,11 +639,7 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: RefreshTyp :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. - :param refresh: Controls when changes are made visible to search operations. - - `True`: Force refresh immediately after the operation (default). - - `False`: Do not refresh (better performance for bulk operations). - - `"wait_for"`: Wait for the next refresh cycle. - For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists @@ -676,18 +672,14 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: RefreshTyp n_docs=result["deleted"], ) - async def delete_all_documents_async(self, recreate_index: bool = False, refresh: RefreshType = True) -> None: + async def delete_all_documents_async(self, recreate_index: bool = False, refresh: bool = True) -> None: """ Asynchronously deletes all documents in the document store. A fast way to clear all documents from the document store while preserving any index settings and mappings. :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. - :param refresh: Controls when changes are made visible to search operations. - - `True`: Force refresh immediately after the operation (default). - - `False`: Do not refresh (better performance for bulk operations). - - `"wait_for"`: Wait for the next refresh cycle. - For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ self._ensure_initialized() # ensures _async_client is not None @@ -727,17 +719,13 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh msg = f"Failed to delete all documents from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_by_filter(self, filters: dict[str, Any], refresh: RefreshType = "wait_for") -> int: + def delete_by_filter(self, filters: dict[str, Any], refresh: bool = True) -> int: """ Deletes all documents that match the provided filters. :param filters: The filters to apply to select documents for deletion. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) - :param refresh: Controls when changes are made visible to search operations. - - `True`: Force refresh immediately after the operation. - - `False`: Do not refresh (better performance for bulk operations). - - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). - For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -757,17 +745,13 @@ def delete_by_filter(self, filters: dict[str, Any], refresh: RefreshType = "wait msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - async def delete_by_filter_async(self, filters: dict[str, Any], refresh: RefreshType = "wait_for") -> int: + async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = True) -> int: """ Asynchronously deletes all documents that match the provided filters. :param filters: The filters to apply to select documents for deletion. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) - :param refresh: Controls when changes are made visible to search operations. - - `True`: Force refresh immediately after the operation. - - `False`: Do not refresh (better performance for bulk operations). - - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). - For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -787,18 +771,14 @@ async def delete_by_filter_async(self, filters: dict[str, Any], refresh: Refresh msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: RefreshType = "wait_for") -> int: + def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = True) -> int: """ Updates the metadata of all documents that match the provided filters. :param filters: The filters to apply to select documents for updating. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param meta: The metadata fields to update. - :param refresh: Controls when changes are made visible to search operations. - - `True`: Force refresh immediately after the operation. - - `False`: Do not refresh (better performance for bulk operations). - - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). - For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents updated. """ self._ensure_initialized() @@ -825,7 +805,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refres raise DocumentStoreError(msg) from e async def update_by_filter_async( - self, filters: dict[str, Any], meta: dict[str, Any], refresh: RefreshType = "wait_for" + self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = True ) -> int: """ Asynchronously updates the metadata of all documents that match the provided filters. @@ -833,11 +813,7 @@ async def update_by_filter_async( :param filters: The filters to apply to select documents for updating. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param meta: The metadata fields to update. - :param refresh: Controls when changes are made visible to search operations. - - `True`: Force refresh immediately after the operation. - - `False`: Do not refresh (better performance for bulk operations). - - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). - For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents updated. """ self._ensure_initialized() From 55354fc6e2b9c72aa293652211f30c86ce484acf Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 16:38:19 +0530 Subject: [PATCH 04/11] chore: remove RefreshType alias and fix docstring line length (E501) --- .../elasticsearch/document_store.py | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index cf22723e5..01562ed89 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -29,7 +29,6 @@ # - True: Refresh immediately (force refresh) # - False: Don't refresh (best for bulk performance) # - "wait_for": Wait for next refresh cycle to make docs visible -RefreshType = Union[bool, Literal["wait_for"]] Hosts = Union[str, list[Union[str, Mapping[str, Union[str, int]], NodeConfig]]] @@ -639,7 +638,9 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: bool = Tru :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. - :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists @@ -679,7 +680,9 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh A fast way to clear all documents from the document store while preserving any index settings and mappings. :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. - :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ self._ensure_initialized() # ensures _async_client is not None @@ -725,7 +728,9 @@ def delete_by_filter(self, filters: dict[str, Any], refresh: bool = True) -> int :param filters: The filters to apply to select documents for deletion. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) - :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -751,7 +756,9 @@ async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = :param filters: The filters to apply to select documents for deletion. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) - :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -778,7 +785,9 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refres :param filters: The filters to apply to select documents for updating. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param meta: The metadata fields to update. - :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents updated. """ self._ensure_initialized() @@ -813,7 +822,9 @@ async def update_by_filter_async( :param filters: The filters to apply to select documents for updating. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param meta: The metadata fields to update. - :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request completes. If False, no refresh is performed. For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents updated. """ self._ensure_initialized() From dea46d0646a69b79f787a2a97bbea57241ce8fe3 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 16:48:51 +0530 Subject: [PATCH 05/11] docs: update refresh param doc links to operation-specific Elasticsearch docs --- .../elasticsearch/document_store.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 01562ed89..b8b156db9 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -419,7 +419,7 @@ def write_documents( self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE, - refresh: RefreshType = "wait_for", + refresh: Literal["wait_for", True, False] = "wait_for", ) -> int: """ Writes `Document`s to Elasticsearch. @@ -506,7 +506,7 @@ async def write_documents_async( self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE, - refresh: RefreshType = "wait_for", + refresh: Literal["wait_for", True, False] = "wait_for", ) -> int: """ Asynchronously writes `Document`s to Elasticsearch. @@ -579,7 +579,7 @@ async def write_documents_async( msg = f"Failed to write documents to Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_documents(self, document_ids: list[str], refresh: RefreshType = "wait_for") -> None: + def delete_documents(self, document_ids: list[str], refresh: bool = True) -> None: """ Deletes all documents with a matching document_ids from the document store. @@ -598,7 +598,7 @@ def delete_documents(self, document_ids: list[str], refresh: RefreshType = "wait raise_on_error=False, ) - def _prepare_delete_all_request(self, *, is_async: bool, refresh: RefreshType) -> dict[str, Any]: + def _prepare_delete_all_request(self, *, is_async: bool, refresh: bool) -> dict[str, Any]: return { "index": self._index, "body": {"query": {"match_all": {}}}, # Delete all documents @@ -606,7 +606,7 @@ def _prepare_delete_all_request(self, *, is_async: bool, refresh: RefreshType) - "refresh": refresh, } - async def delete_documents_async(self, document_ids: list[str], refresh: RefreshType = "wait_for") -> None: + async def delete_documents_async(self, document_ids: list[str], refresh: bool = True) -> None: """ Asynchronously deletes all documents with a matching document_ids from the document store. @@ -640,7 +640,7 @@ def delete_all_documents(self, recreate_index: bool = False, refresh: bool = Tru settings. If False, all documents will be deleted using the `delete_by_query` API. :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the - [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + [Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh). """ self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists @@ -682,7 +682,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh settings. If False, all documents will be deleted using the `delete_by_query` API. :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the - [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + [Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh). """ self._ensure_initialized() # ensures _async_client is not None @@ -730,7 +730,7 @@ def delete_by_filter(self, filters: dict[str, Any], refresh: bool = True) -> int For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request completes. If False, no refresh is performed. For more details, see the - [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + [Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -787,7 +787,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refres :param meta: The metadata fields to update. :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request completes. If False, no refresh is performed. For more details, see the - [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + [Elasticsearch update_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-update-by-query#operation-update-by-query-refresh). :returns: The number of documents updated. """ self._ensure_initialized() @@ -824,7 +824,7 @@ async def update_by_filter_async( :param meta: The metadata fields to update. :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request completes. If False, no refresh is performed. For more details, see the - [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). + [Elasticsearch update_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-update-by-query#operation-update-by-query-refresh). :returns: The number of documents updated. """ self._ensure_initialized() From 59ef78a055fab1a57b6d8b39f1b5f3a117dea8be Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 16:53:09 +0530 Subject: [PATCH 06/11] style: format document_store.py with ruff --- .../document_stores/elasticsearch/document_store.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index b8b156db9..5652bc109 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -813,9 +813,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refres msg = f"Failed to update documents by filter in Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - async def update_by_filter_async( - self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = True - ) -> int: + async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = True) -> int: """ Asynchronously updates the metadata of all documents that match the provided filters. From 05f83baafddfa93b9eb45dfc2b5dba9043a1cc95 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 16:53:46 +0530 Subject: [PATCH 07/11] chore: remove obsolete refresh type alias comment per review --- .../document_stores/elasticsearch/document_store.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 5652bc109..cc9195234 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -25,10 +25,6 @@ logger = logging.getLogger(__name__) -# Type alias for the refresh parameter -# - True: Refresh immediately (force refresh) -# - False: Don't refresh (best for bulk performance) -# - "wait_for": Wait for next refresh cycle to make docs visible Hosts = Union[str, list[Union[str, Mapping[str, Union[str, int]], NodeConfig]]] From 73fa85cb532fa0e752bc0da52c8b1680ce08fa25 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 17:21:54 +0530 Subject: [PATCH 08/11] refactor: set delete_documents refresh param to Literal and default to 'wait_for' per review --- .../document_stores/elasticsearch/document_store.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index cc9195234..c22135a8d 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -575,7 +575,9 @@ async def write_documents_async( msg = f"Failed to write documents to Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_documents(self, document_ids: list[str], refresh: bool = True) -> None: + def delete_documents( + self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for" + ) -> None: """ Deletes all documents with a matching document_ids from the document store. From 842f70634479b112040268cf480a36f637a1f505 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 17:28:53 +0530 Subject: [PATCH 09/11] style: format document_store.py after refresh param type change --- .../document_stores/elasticsearch/document_store.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index c22135a8d..a962411b2 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -575,9 +575,7 @@ async def write_documents_async( msg = f"Failed to write documents to Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_documents( - self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for" - ) -> None: + def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None: """ Deletes all documents with a matching document_ids from the document store. From f34d2566172ac5820e66bc9147ecf7cefed2536c Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 17:30:51 +0530 Subject: [PATCH 10/11] refactor: set delete_documents_async refresh param to Literal and default to 'wait_for' per review --- .../document_stores/elasticsearch/document_store.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index a962411b2..7bd5c1fad 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -602,7 +602,9 @@ def _prepare_delete_all_request(self, *, is_async: bool, refresh: bool) -> dict[ "refresh": refresh, } - async def delete_documents_async(self, document_ids: list[str], refresh: bool = True) -> None: + async def delete_documents_async( + self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for" + ) -> None: """ Asynchronously deletes all documents with a matching document_ids from the document store. From 5636620192a72af6dbe077d90fe4748415043245 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Fri, 19 Dec 2025 19:38:26 +0530 Subject: [PATCH 11/11] Set refresh=False as default for update_by_filter and delete_by_filter (sync/async) to maintain backward compatibility per reviewer consensus. --- .../document_stores/elasticsearch/document_store.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 7bd5c1fad..2deede6a6 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -720,7 +720,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False, refresh msg = f"Failed to delete all documents from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_by_filter(self, filters: dict[str, Any], refresh: bool = True) -> int: + def delete_by_filter(self, filters: dict[str, Any], refresh: bool = False) -> int: """ Deletes all documents that match the provided filters. @@ -748,7 +748,7 @@ def delete_by_filter(self, filters: dict[str, Any], refresh: bool = True) -> int msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = True) -> int: + async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = False) -> int: """ Asynchronously deletes all documents that match the provided filters. @@ -776,7 +776,7 @@ async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = True) -> int: + def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int: """ Updates the metadata of all documents that match the provided filters. @@ -811,7 +811,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refres msg = f"Failed to update documents by filter in Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = True) -> int: + async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int: """ Asynchronously updates the metadata of all documents that match the provided filters.