Skip to content

Commit 94569c0

Browse files
committed
feat: expose refresh parameter in ElasticsearchDocumentStore and OpenSearchDocumentStore
Add configurable refresh parameter to write, delete, and update methods in both ElasticsearchDocumentStore and OpenSearchDocumentStore. This allows users to control when index changes become visible to search operations, enabling read-your-writes consistency without relying on time.sleep() workarounds. Methods updated: - write_documents / write_documents_async - delete_documents / delete_documents_async - delete_all_documents / delete_all_documents_async - delete_by_filter / delete_by_filter_async - update_by_filter / update_by_filter_async The refresh parameter accepts: - True: Force immediate refresh - False: No refresh (best for bulk performance) - "wait_for": Wait for next refresh cycle (default) Closes #2065
1 parent e708b89 commit 94569c0

File tree

5 files changed

+191
-78
lines changed

5 files changed

+191
-78
lines changed

integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
from typing import Any, Literal, Optional, Union
1212

1313
import numpy as np
14+
15+
# Type alias for the refresh parameter
16+
# - True: Refresh immediately (force refresh)
17+
# - False: Don't refresh (best for bulk performance)
18+
# - "wait_for": Wait for next refresh cycle to make docs visible
19+
RefreshType = Union[bool, Literal["wait_for"]]
1420
from elastic_transport import NodeConfig
1521
from haystack import default_from_dict, default_to_dict, logging
1622
from haystack.dataclasses import Document
@@ -410,12 +416,21 @@ def _deserialize_document(hit: dict[str, Any]) -> Document:
410416

411417
return Document.from_dict(data)
412418

413-
def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
419+
def write_documents(
420+
self,
421+
documents: list[Document],
422+
policy: DuplicatePolicy = DuplicatePolicy.NONE,
423+
refresh: RefreshType = "wait_for",
424+
) -> int:
414425
"""
415426
Writes `Document`s to Elasticsearch.
416427
417428
:param documents: List of Documents to write to the document store.
418429
:param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store.
430+
:param refresh: Controls when changes are made visible to search operations.
431+
- `True`: Force refresh immediately after the operation.
432+
- `False`: Do not refresh (better performance for bulk operations).
433+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
419434
:raises ValueError: If `documents` is not a list of `Document`s.
420435
:raises DuplicateDocumentError: If a document with the same ID already exists in the document store and
421436
`policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`.
@@ -456,7 +471,7 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D
456471
documents_written, errors = helpers.bulk(
457472
client=self.client,
458473
actions=elasticsearch_actions,
459-
refresh="wait_for",
474+
refresh=refresh,
460475
index=self._index,
461476
raise_on_error=False,
462477
stats_only=False,
@@ -488,13 +503,20 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D
488503
return documents_written
489504

490505
async def write_documents_async(
491-
self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
506+
self,
507+
documents: list[Document],
508+
policy: DuplicatePolicy = DuplicatePolicy.NONE,
509+
refresh: RefreshType = "wait_for",
492510
) -> int:
493511
"""
494512
Asynchronously writes `Document`s to Elasticsearch.
495513
496514
:param documents: List of Documents to write to the document store.
497515
:param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store.
516+
:param refresh: Controls when changes are made visible to search operations.
517+
- `True`: Force refresh immediately after the operation.
518+
- `False`: Do not refresh (better performance for bulk operations).
519+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
498520
:raises ValueError: If `documents` is not a list of `Document`s.
499521
:raises DuplicateDocumentError: If a document with the same ID already exists in the document store and
500522
`policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`.
@@ -537,7 +559,7 @@ async def write_documents_async(
537559
client=self.async_client,
538560
actions=actions,
539561
index=self._index,
540-
refresh=True,
562+
refresh=refresh,
541563
raise_on_error=False,
542564
stats_only=False,
543565
)
@@ -556,33 +578,41 @@ async def write_documents_async(
556578
msg = f"Failed to write documents to Elasticsearch: {e!s}"
557579
raise DocumentStoreError(msg) from e
558580

559-
def delete_documents(self, document_ids: list[str]) -> None:
581+
def delete_documents(self, document_ids: list[str], refresh: RefreshType = "wait_for") -> None:
560582
"""
561583
Deletes all documents with a matching document_ids from the document store.
562584
563585
:param document_ids: the document ids to delete
586+
:param refresh: Controls when changes are made visible to search operations.
587+
- `True`: Force refresh immediately after the operation.
588+
- `False`: Do not refresh (better performance for bulk operations).
589+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
564590
"""
565591
helpers.bulk(
566592
client=self.client,
567593
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
568-
refresh="wait_for",
594+
refresh=refresh,
569595
index=self._index,
570596
raise_on_error=False,
571597
)
572598

573-
def _prepare_delete_all_request(self, *, is_async: bool) -> dict[str, Any]:
599+
def _prepare_delete_all_request(self, *, is_async: bool, refresh: RefreshType) -> dict[str, Any]:
574600
return {
575601
"index": self._index,
576602
"body": {"query": {"match_all": {}}}, # Delete all documents
577603
"wait_for_completion": False if is_async else True, # block until done (set False for async)
578-
"refresh": True, # Ensure changes are visible immediately
604+
"refresh": refresh,
579605
}
580606

581-
async def delete_documents_async(self, document_ids: list[str]) -> None:
607+
async def delete_documents_async(self, document_ids: list[str], refresh: RefreshType = "wait_for") -> None:
582608
"""
583609
Asynchronously deletes all documents with a matching document_ids from the document store.
584610
585611
:param document_ids: the document ids to delete
612+
:param refresh: Controls when changes are made visible to search operations.
613+
- `True`: Force refresh immediately after the operation.
614+
- `False`: Do not refresh (better performance for bulk operations).
615+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
586616
"""
587617
self._ensure_initialized()
588618

@@ -591,20 +621,24 @@ async def delete_documents_async(self, document_ids: list[str]) -> None:
591621
client=self.async_client,
592622
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
593623
index=self._index,
594-
refresh=True,
624+
refresh=refresh,
595625
)
596626
except Exception as e:
597627
msg = f"Failed to delete documents from Elasticsearch: {e!s}"
598628
raise DocumentStoreError(msg) from e
599629

600-
def delete_all_documents(self, recreate_index: bool = False) -> None:
630+
def delete_all_documents(self, recreate_index: bool = False, refresh: RefreshType = True) -> None:
601631
"""
602632
Deletes all documents in the document store.
603633
604634
A fast way to clear all documents from the document store while preserving any index settings and mappings.
605635
606636
:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
607637
settings. If False, all documents will be deleted using the `delete_by_query` API.
638+
:param refresh: Controls when changes are made visible to search operations.
639+
- `True`: Force refresh immediately after the operation (default).
640+
- `False`: Do not refresh (better performance for bulk operations).
641+
- `"wait_for"`: Wait for the next refresh cycle.
608642
"""
609643
self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists
610644

@@ -630,20 +664,24 @@ def delete_all_documents(self, recreate_index: bool = False) -> None:
630664
self._client.indices.create(index=self._index, mappings=mappings) # type: ignore
631665

632666
else:
633-
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) # type: ignore
667+
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False, refresh=refresh)) # type: ignore
634668
logger.info(
635669
"Deleted all the {n_docs} documents from the index '{index}'.",
636670
index=self._index,
637671
n_docs=result["deleted"],
638672
)
639673

640-
async def delete_all_documents_async(self, recreate_index: bool = False) -> None:
674+
async def delete_all_documents_async(self, recreate_index: bool = False, refresh: RefreshType = True) -> None:
641675
"""
642676
Asynchronously deletes all documents in the document store.
643677
644678
A fast way to clear all documents from the document store while preserving any index settings and mappings.
645679
:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
646680
settings. If False, all documents will be deleted using the `delete_by_query` API.
681+
:param refresh: Controls when changes are made visible to search operations.
682+
- `True`: Force refresh immediately after the operation (default).
683+
- `False`: Do not refresh (better performance for bulk operations).
684+
- `"wait_for"`: Wait for the next refresh cycle.
647685
"""
648686
self._ensure_initialized() # ensures _async_client is not None
649687

@@ -670,7 +708,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
670708
else:
671709
# use delete_by_query for more efficient deletion without index recreation
672710
# For async, we need to wait for completion to get the deleted count
673-
delete_request = self._prepare_delete_all_request(is_async=True)
711+
delete_request = self._prepare_delete_all_request(is_async=True, refresh=refresh)
674712
delete_request["wait_for_completion"] = True # Override to wait for completion in async
675713
result = await self._async_client.delete_by_query(**delete_request) # type: ignore
676714
logger.info(
@@ -683,20 +721,24 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
683721
msg = f"Failed to delete all documents from Elasticsearch: {e!s}"
684722
raise DocumentStoreError(msg) from e
685723

686-
def delete_by_filter(self, filters: dict[str, Any]) -> int:
724+
def delete_by_filter(self, filters: dict[str, Any], refresh: RefreshType = "wait_for") -> int:
687725
"""
688726
Deletes all documents that match the provided filters.
689727
690728
:param filters: The filters to apply to select documents for deletion.
691729
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
730+
:param refresh: Controls when changes are made visible to search operations.
731+
- `True`: Force refresh immediately after the operation.
732+
- `False`: Do not refresh (better performance for bulk operations).
733+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
692734
:returns: The number of documents deleted.
693735
"""
694736
self._ensure_initialized()
695737

696738
try:
697739
normalized_filters = _normalize_filters(filters)
698740
body = {"query": {"bool": {"filter": normalized_filters}}}
699-
result = self.client.delete_by_query(index=self._index, body=body) # type: ignore
741+
result = self.client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
700742
deleted_count = result.get("deleted", 0)
701743
logger.info(
702744
"Deleted {n_docs} documents from index '{index}' using filters.",
@@ -708,20 +750,24 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int:
708750
msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}"
709751
raise DocumentStoreError(msg) from e
710752

711-
async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
753+
async def delete_by_filter_async(self, filters: dict[str, Any], refresh: RefreshType = "wait_for") -> int:
712754
"""
713755
Asynchronously deletes all documents that match the provided filters.
714756
715757
:param filters: The filters to apply to select documents for deletion.
716758
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
759+
:param refresh: Controls when changes are made visible to search operations.
760+
- `True`: Force refresh immediately after the operation.
761+
- `False`: Do not refresh (better performance for bulk operations).
762+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
717763
:returns: The number of documents deleted.
718764
"""
719765
self._ensure_initialized()
720766

721767
try:
722768
normalized_filters = _normalize_filters(filters)
723769
body = {"query": {"bool": {"filter": normalized_filters}}}
724-
result = await self.async_client.delete_by_query(index=self._index, body=body) # type: ignore
770+
result = await self.async_client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
725771
deleted_count = result.get("deleted", 0)
726772
logger.info(
727773
"Deleted {n_docs} documents from index '{index}' using filters.",
@@ -733,13 +779,17 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
733779
msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}"
734780
raise DocumentStoreError(msg) from e
735781

736-
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
782+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: RefreshType = "wait_for") -> int:
737783
"""
738784
Updates the metadata of all documents that match the provided filters.
739785
740786
:param filters: The filters to apply to select documents for updating.
741787
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
742788
:param meta: The metadata fields to update.
789+
:param refresh: Controls when changes are made visible to search operations.
790+
- `True`: Force refresh immediately after the operation.
791+
- `False`: Do not refresh (better performance for bulk operations).
792+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
743793
:returns: The number of documents updated.
744794
"""
745795
self._ensure_initialized()
@@ -753,7 +803,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
753803
"script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"},
754804
}
755805

756-
result = self.client.update_by_query(index=self._index, body=body) # type: ignore
806+
result = self.client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
757807
updated_count = result.get("updated", 0)
758808
logger.info(
759809
"Updated {n_docs} documents in index '{index}' using filters.",
@@ -765,13 +815,19 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
765815
msg = f"Failed to update documents by filter in Elasticsearch: {e!s}"
766816
raise DocumentStoreError(msg) from e
767817

768-
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
818+
async def update_by_filter_async(
819+
self, filters: dict[str, Any], meta: dict[str, Any], refresh: RefreshType = "wait_for"
820+
) -> int:
769821
"""
770822
Asynchronously updates the metadata of all documents that match the provided filters.
771823
772824
:param filters: The filters to apply to select documents for updating.
773825
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
774826
:param meta: The metadata fields to update.
827+
:param refresh: Controls when changes are made visible to search operations.
828+
- `True`: Force refresh immediately after the operation.
829+
- `False`: Do not refresh (better performance for bulk operations).
830+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
775831
:returns: The number of documents updated.
776832
"""
777833
self._ensure_initialized()
@@ -785,7 +841,7 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str,
785841
"script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"},
786842
}
787843

788-
result = await self.async_client.update_by_query(index=self._index, body=body) # type: ignore
844+
result = await self.async_client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
789845
updated_count = result.get("updated", 0)
790846
logger.info(
791847
"Updated {n_docs} documents in index '{index}' using filters.",

0 commit comments

Comments
 (0)