Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

logger = logging.getLogger(__name__)


Hosts = Union[str, list[Union[str, Mapping[str, Union[str, int]], NodeConfig]]]

# document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to
Expand Down Expand Up @@ -410,12 +411,22 @@ def _deserialize_document(hit: dict[str, Any]) -> Document:

return Document.from_dict(data)

def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
def write_documents(
self,
documents: list[Document],
policy: DuplicatePolicy = DuplicatePolicy.NONE,
refresh: Literal["wait_for", True, False] = "wait_for",
) -> int:
"""
Writes `Document`s to Elasticsearch.

:param documents: List of Documents to write to the document store.
:param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store.
:param refresh: Controls when changes are made visible to search operations.
- `True`: Force refresh immediately after the operation.
- `False`: Do not refresh (better performance for bulk operations).
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
:raises ValueError: If `documents` is not a list of `Document`s.
:raises DuplicateDocumentError: If a document with the same ID already exists in the document store and
`policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`.
Expand Down Expand Up @@ -456,7 +467,7 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D
documents_written, errors = helpers.bulk(
client=self.client,
actions=elasticsearch_actions,
refresh="wait_for",
refresh=refresh,
index=self._index,
raise_on_error=False,
stats_only=False,
Expand Down Expand Up @@ -488,13 +499,21 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D
return documents_written

async def write_documents_async(
self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
self,
documents: list[Document],
policy: DuplicatePolicy = DuplicatePolicy.NONE,
refresh: Literal["wait_for", True, False] = "wait_for",
) -> int:
"""
Asynchronously writes `Document`s to Elasticsearch.

:param documents: List of Documents to write to the document store.
:param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store.
:param refresh: Controls when changes are made visible to search operations.
- `True`: Force refresh immediately after the operation.
- `False`: Do not refresh (better performance for bulk operations).
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
:raises ValueError: If `documents` is not a list of `Document`s.
:raises DuplicateDocumentError: If a document with the same ID already exists in the document store and
`policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`.
Expand Down Expand Up @@ -537,7 +556,7 @@ async def write_documents_async(
client=self.async_client,
actions=actions,
index=self._index,
refresh=True,
refresh=refresh,
raise_on_error=False,
stats_only=False,
)
Expand All @@ -556,33 +575,45 @@ async def write_documents_async(
msg = f"Failed to write documents to Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

def delete_documents(self, document_ids: list[str]) -> None:
def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None:
"""
Deletes all documents with a matching document_ids from the document store.

:param document_ids: the document ids to delete
:param refresh: Controls when changes are made visible to search operations.
- `True`: Force refresh immediately after the operation.
- `False`: Do not refresh (better performance for bulk operations).
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
"""
helpers.bulk(
client=self.client,
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
refresh="wait_for",
refresh=refresh,
index=self._index,
raise_on_error=False,
)

def _prepare_delete_all_request(self, *, is_async: bool) -> dict[str, Any]:
def _prepare_delete_all_request(self, *, is_async: bool, refresh: bool) -> dict[str, Any]:
return {
"index": self._index,
"body": {"query": {"match_all": {}}}, # Delete all documents
"wait_for_completion": False if is_async else True, # block until done (set False for async)
"refresh": True, # Ensure changes are visible immediately
"refresh": refresh,
}

async def delete_documents_async(self, document_ids: list[str]) -> None:
async def delete_documents_async(
self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for"
) -> None:
"""
Asynchronously deletes all documents with a matching document_ids from the document store.

:param document_ids: the document ids to delete
:param refresh: Controls when changes are made visible to search operations.
- `True`: Force refresh immediately after the operation.
- `False`: Do not refresh (better performance for bulk operations).
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
"""
self._ensure_initialized()

Expand All @@ -591,20 +622,23 @@ async def delete_documents_async(self, document_ids: list[str]) -> None:
client=self.async_client,
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
index=self._index,
refresh=True,
refresh=refresh,
)
except Exception as e:
msg = f"Failed to delete documents from Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

def delete_all_documents(self, recreate_index: bool = False) -> None:
def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None:
"""
Deletes all documents in the document store.

A fast way to clear all documents from the document store while preserving any index settings and mappings.

:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
settings. If False, all documents will be deleted using the `delete_by_query` API.
:param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh).
"""
self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists

Expand All @@ -630,20 +664,23 @@ def delete_all_documents(self, recreate_index: bool = False) -> None:
self._client.indices.create(index=self._index, mappings=mappings) # type: ignore

else:
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) # type: ignore
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False, refresh=refresh)) # type: ignore
logger.info(
"Deleted all the {n_docs} documents from the index '{index}'.",
index=self._index,
n_docs=result["deleted"],
)

async def delete_all_documents_async(self, recreate_index: bool = False) -> None:
async def delete_all_documents_async(self, recreate_index: bool = False, refresh: bool = True) -> None:
"""
Asynchronously deletes all documents in the document store.

A fast way to clear all documents from the document store while preserving any index settings and mappings.
:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
settings. If False, all documents will be deleted using the `delete_by_query` API.
:param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh).
"""
self._ensure_initialized() # ensures _async_client is not None

Expand All @@ -670,7 +707,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
else:
# use delete_by_query for more efficient deletion without index recreation
# For async, we need to wait for completion to get the deleted count
delete_request = self._prepare_delete_all_request(is_async=True)
delete_request = self._prepare_delete_all_request(is_async=True, refresh=refresh)
delete_request["wait_for_completion"] = True # Override to wait for completion in async
result = await self._async_client.delete_by_query(**delete_request) # type: ignore
logger.info(
Expand All @@ -683,20 +720,23 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
msg = f"Failed to delete all documents from Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

def delete_by_filter(self, filters: dict[str, Any]) -> int:
def delete_by_filter(self, filters: dict[str, Any], refresh: bool = False) -> int:
"""
Deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh).
:returns: The number of documents deleted.
"""
self._ensure_initialized()

try:
normalized_filters = _normalize_filters(filters)
body = {"query": {"bool": {"filter": normalized_filters}}}
result = self.client.delete_by_query(index=self._index, body=body) # type: ignore
result = self.client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
deleted_count = result.get("deleted", 0)
logger.info(
"Deleted {n_docs} documents from index '{index}' using filters.",
Expand All @@ -708,20 +748,23 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int:
msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = False) -> int:
"""
Asynchronously deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request
completes. If False, no refresh is performed. For more details, see the
[Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
:returns: The number of documents deleted.
"""
self._ensure_initialized()

try:
normalized_filters = _normalize_filters(filters)
body = {"query": {"bool": {"filter": normalized_filters}}}
result = await self.async_client.delete_by_query(index=self._index, body=body) # type: ignore
result = await self.async_client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
deleted_count = result.get("deleted", 0)
logger.info(
"Deleted {n_docs} documents from index '{index}' using filters.",
Expand All @@ -733,13 +776,16 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int:
"""
Updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update.
:param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request
completes. If False, no refresh is performed. For more details, see the
[Elasticsearch update_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-update-by-query#operation-update-by-query-refresh).
:returns: The number of documents updated.
"""
self._ensure_initialized()
Expand All @@ -753,7 +799,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
"script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"},
}

result = self.client.update_by_query(index=self._index, body=body) # type: ignore
result = self.client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
updated_count = result.get("updated", 0)
logger.info(
"Updated {n_docs} documents in index '{index}' using filters.",
Expand All @@ -765,13 +811,16 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
msg = f"Failed to update documents by filter in Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int:
"""
Asynchronously updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update.
:param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request
completes. If False, no refresh is performed. For more details, see the
[Elasticsearch update_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-update-by-query#operation-update-by-query-refresh).
:returns: The number of documents updated.
"""
self._ensure_initialized()
Expand All @@ -785,7 +834,7 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str,
"script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"},
}

result = await self.async_client.update_by_query(index=self._index, body=body) # type: ignore
result = await self.async_client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
updated_count = result.get("updated", 0)
logger.info(
"Updated {n_docs} documents in index '{index}' using filters.",
Expand Down
Loading