diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index b6400af95..2deede6a6 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -25,6 +25,7 @@ logger = logging.getLogger(__name__) + Hosts = Union[str, list[Union[str, Mapping[str, Union[str, int]], NodeConfig]]] # document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to @@ -410,12 +411,22 @@ def _deserialize_document(hit: dict[str, Any]) -> Document: return Document.from_dict(data) - def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int: + def write_documents( + self, + documents: list[Document], + policy: DuplicatePolicy = DuplicatePolicy.NONE, + refresh: Literal["wait_for", True, False] = "wait_for", + ) -> int: """ Writes `Document`s to Elasticsearch. :param documents: List of Documents to write to the document store. :param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store. + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :raises ValueError: If `documents` is not a list of `Document`s. :raises DuplicateDocumentError: If a document with the same ID already exists in the document store and `policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`. @@ -456,7 +467,7 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D documents_written, errors = helpers.bulk( client=self.client, actions=elasticsearch_actions, - refresh="wait_for", + refresh=refresh, index=self._index, raise_on_error=False, stats_only=False, @@ -488,13 +499,21 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D return documents_written async def write_documents_async( - self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE + self, + documents: list[Document], + policy: DuplicatePolicy = DuplicatePolicy.NONE, + refresh: Literal["wait_for", True, False] = "wait_for", ) -> int: """ Asynchronously writes `Document`s to Elasticsearch. :param documents: List of Documents to write to the document store. :param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store. + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :raises ValueError: If `documents` is not a list of `Document`s. :raises DuplicateDocumentError: If a document with the same ID already exists in the document store and `policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`. @@ -537,7 +556,7 @@ async def write_documents_async( client=self.async_client, actions=actions, index=self._index, - refresh=True, + refresh=refresh, raise_on_error=False, stats_only=False, ) @@ -556,33 +575,45 @@ async def write_documents_async( msg = f"Failed to write documents to Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_documents(self, document_ids: list[str]) -> None: + def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None: """ Deletes all documents with a matching document_ids from the document store. :param document_ids: the document ids to delete + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ helpers.bulk( client=self.client, actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), - refresh="wait_for", + refresh=refresh, index=self._index, raise_on_error=False, ) - def _prepare_delete_all_request(self, *, is_async: bool) -> dict[str, Any]: + def _prepare_delete_all_request(self, *, is_async: bool, refresh: bool) -> dict[str, Any]: return { "index": self._index, "body": {"query": {"match_all": {}}}, # Delete all documents "wait_for_completion": False if is_async else True, # block until done (set False for async) - "refresh": True, # Ensure changes are visible immediately + "refresh": refresh, } - async def delete_documents_async(self, document_ids: list[str]) -> None: + async def delete_documents_async( + self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for" + ) -> None: """ Asynchronously deletes all documents with a matching document_ids from the document store. :param document_ids: the document ids to delete + :param refresh: Controls when changes are made visible to search operations. + - `True`: Force refresh immediately after the operation. + - `False`: Do not refresh (better performance for bulk operations). + - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). + For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ self._ensure_initialized() @@ -591,13 +622,13 @@ async def delete_documents_async(self, document_ids: list[str]) -> None: client=self.async_client, actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), index=self._index, - refresh=True, + refresh=refresh, ) except Exception as e: msg = f"Failed to delete documents from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_all_documents(self, recreate_index: bool = False) -> None: + def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None: """ Deletes all documents in the document store. @@ -605,6 +636,9 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh). """ self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists @@ -630,20 +664,23 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: self._client.indices.create(index=self._index, mappings=mappings) # type: ignore else: - result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) # type: ignore + result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False, refresh=refresh)) # type: ignore logger.info( "Deleted all the {n_docs} documents from the index '{index}'.", index=self._index, n_docs=result["deleted"], ) - async def delete_all_documents_async(self, recreate_index: bool = False) -> None: + async def delete_all_documents_async(self, recreate_index: bool = False, refresh: bool = True) -> None: """ Asynchronously deletes all documents in the document store. A fast way to clear all documents from the document store while preserving any index settings and mappings. :param recreate_index: If True, the index will be deleted and recreated with the original mappings and settings. If False, all documents will be deleted using the `delete_by_query` API. + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh). """ self._ensure_initialized() # ensures _async_client is not None @@ -670,7 +707,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None else: # use delete_by_query for more efficient deletion without index recreation # For async, we need to wait for completion to get the deleted count - delete_request = self._prepare_delete_all_request(is_async=True) + delete_request = self._prepare_delete_all_request(is_async=True, refresh=refresh) delete_request["wait_for_completion"] = True # Override to wait for completion in async result = await self._async_client.delete_by_query(**delete_request) # type: ignore logger.info( @@ -683,12 +720,15 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None msg = f"Failed to delete all documents from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def delete_by_filter(self, filters: dict[str, Any]) -> int: + def delete_by_filter(self, filters: dict[str, Any], refresh: bool = False) -> int: """ Deletes all documents that match the provided filters. :param filters: The filters to apply to select documents for deletion. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -696,7 +736,7 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int: try: normalized_filters = _normalize_filters(filters) body = {"query": {"bool": {"filter": normalized_filters}}} - result = self.client.delete_by_query(index=self._index, body=body) # type: ignore + result = self.client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore deleted_count = result.get("deleted", 0) logger.info( "Deleted {n_docs} documents from index '{index}' using filters.", @@ -708,12 +748,15 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int: msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: + async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = False) -> int: """ Asynchronously deletes all documents that match the provided filters. :param filters: The filters to apply to select documents for deletion. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). :returns: The number of documents deleted. """ self._ensure_initialized() @@ -721,7 +764,7 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: try: normalized_filters = _normalize_filters(filters) body = {"query": {"bool": {"filter": normalized_filters}}} - result = await self.async_client.delete_by_query(index=self._index, body=body) # type: ignore + result = await self.async_client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore deleted_count = result.get("deleted", 0) logger.info( "Deleted {n_docs} documents from index '{index}' using filters.", @@ -733,13 +776,16 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int: """ Updates the metadata of all documents that match the provided filters. :param filters: The filters to apply to select documents for updating. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param meta: The metadata fields to update. + :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch update_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-update-by-query#operation-update-by-query-refresh). :returns: The number of documents updated. """ self._ensure_initialized() @@ -753,7 +799,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int "script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"}, } - result = self.client.update_by_query(index=self._index, body=body) # type: ignore + result = self.client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore updated_count = result.get("updated", 0) logger.info( "Updated {n_docs} documents in index '{index}' using filters.", @@ -765,13 +811,16 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int msg = f"Failed to update documents by filter in Elasticsearch: {e!s}" raise DocumentStoreError(msg) from e - async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int: """ Asynchronously updates the metadata of all documents that match the provided filters. :param filters: The filters to apply to select documents for updating. For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :param meta: The metadata fields to update. + :param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request + completes. If False, no refresh is performed. For more details, see the + [Elasticsearch update_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-update-by-query#operation-update-by-query-refresh). :returns: The number of documents updated. """ self._ensure_initialized() @@ -785,7 +834,7 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, "script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"}, } - result = await self.async_client.update_by_query(index=self._index, body=body) # type: ignore + result = await self.async_client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore updated_count = result.get("updated", 0) logger.info( "Updated {n_docs} documents in index '{index}' using filters.", diff --git a/integrations/elasticsearch/tests/test_document_store.py b/integrations/elasticsearch/tests/test_document_store.py index ed7c763a1..0ded2d619 100644 --- a/integrations/elasticsearch/tests/test_document_store.py +++ b/integrations/elasticsearch/tests/test_document_store.py @@ -2,9 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 -import asyncio import random -import time from unittest.mock import Mock, patch import pytest @@ -525,8 +523,7 @@ def test_delete_all_documents_no_index_recreation(self, document_store: Elastics document_store.write_documents(docs) assert document_store.count_documents() == 2 - document_store.delete_all_documents(recreate_index=False) - time.sleep(2) # need to wait for the deletion to be reflected in count_documents + document_store.delete_all_documents(recreate_index=False, refresh=True) assert document_store.count_documents() == 0 new_doc = Document(id="3", content="New document after delete all") @@ -547,8 +544,9 @@ def test_delete_by_filter(self, document_store: ElasticsearchDocumentStore): assert document_store.count_documents() == 3 # Delete documents with category="A" - deleted_count = document_store.delete_by_filter(filters={"field": "category", "operator": "==", "value": "A"}) - time.sleep(2) # wait for deletion to be reflected + deleted_count = document_store.delete_by_filter( + filters={"field": "category", "operator": "==", "value": "A"}, refresh=True + ) assert deleted_count == 2 assert document_store.count_documents() == 1 @@ -568,9 +566,10 @@ def test_update_by_filter(self, document_store: ElasticsearchDocumentStore): # Update status for category="A" documents updated_count = document_store.update_by_filter( - filters={"field": "category", "operator": "==", "value": "A"}, meta={"status": "published"} + filters={"field": "category", "operator": "==", "value": "A"}, + meta={"status": "published"}, + refresh=True, ) - time.sleep(2) # wait for update to be reflected assert updated_count == 2 # Verify the updates @@ -803,9 +802,7 @@ async def test_delete_all_documents_async_no_index_recreation(self, document_sto await document_store.write_documents_async(docs) assert await document_store.count_documents_async() == 2 - await document_store.delete_all_documents_async(recreate_index=False) - # Need to wait for the deletion to be reflected in count_documents - await asyncio.sleep(2) + await document_store.delete_all_documents_async(recreate_index=False, refresh=True) assert await document_store.count_documents_async() == 0 new_doc = Document(id="3", content="New document after delete all") @@ -824,9 +821,8 @@ async def test_delete_by_filter_async(self, document_store): # Delete documents with category="A" deleted_count = await document_store.delete_by_filter_async( - filters={"field": "category", "operator": "==", "value": "A"} + filters={"field": "category", "operator": "==", "value": "A"}, refresh=True ) - await asyncio.sleep(2) # wait for deletion to be reflected assert deleted_count == 2 assert await document_store.count_documents_async() == 1 @@ -848,9 +844,10 @@ async def test_update_by_filter_async(self, document_store): # Update status for category="A" documents updated_count = await document_store.update_by_filter_async( - filters={"field": "category", "operator": "==", "value": "A"}, meta={"status": "published"} + filters={"field": "category", "operator": "==", "value": "A"}, + meta={"status": "published"}, + refresh=True, ) - await asyncio.sleep(2) # wait for update to be reflected assert updated_count == 2 # Verify the updates