Skip to content

Commit 91f7caf

Browse files
feat: expose refresh parameter in ElasticsearchDocumentStore (#2622)
* feat: expose refresh parameter in ElasticsearchDocumentStore Add configurable refresh parameter to write, delete, and update methods in ElasticsearchDocumentStore. This allows users to control when index changes become visible to search operations, enabling read-your-writes consistency without relying on time.sleep() workarounds. Methods updated: - write_documents / write_documents_async - delete_documents / delete_documents_async - delete_all_documents / delete_all_documents_async - delete_by_filter / delete_by_filter_async - update_by_filter / update_by_filter_async The refresh parameter accepts: - True: Force immediate refresh - False: No refresh (best for bulk performance) - 'wait_for': Wait for next refresh cycle (default) Closes #2065 * docs: add Elasticsearch refresh documentation link to all refresh parameters * fix: restrict refresh param to bool for delete_by_query and update_by_query methods (per ES docs and review) * chore: remove RefreshType alias and fix docstring line length (E501) * docs: update refresh param doc links to operation-specific Elasticsearch docs * style: format document_store.py with ruff * chore: remove obsolete refresh type alias comment per review * refactor: set delete_documents refresh param to Literal and default to 'wait_for' per review * style: format document_store.py after refresh param type change * refactor: set delete_documents_async refresh param to Literal and default to 'wait_for' per review * Set refresh=False as default for update_by_filter and delete_by_filter (sync/async) to maintain backward compatibility per reviewer consensus. --------- Co-authored-by: Michele Pangrazzi <[email protected]>
1 parent 4792bc5 commit 91f7caf

File tree

2 files changed

+83
-37
lines changed

2 files changed

+83
-37
lines changed

integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
logger = logging.getLogger(__name__)
2727

28+
2829
Hosts = Union[str, list[Union[str, Mapping[str, Union[str, int]], NodeConfig]]]
2930

3031
# document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to
@@ -410,12 +411,22 @@ def _deserialize_document(hit: dict[str, Any]) -> Document:
410411

411412
return Document.from_dict(data)
412413

413-
def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
414+
def write_documents(
415+
self,
416+
documents: list[Document],
417+
policy: DuplicatePolicy = DuplicatePolicy.NONE,
418+
refresh: Literal["wait_for", True, False] = "wait_for",
419+
) -> int:
414420
"""
415421
Writes `Document`s to Elasticsearch.
416422
417423
:param documents: List of Documents to write to the document store.
418424
:param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store.
425+
:param refresh: Controls when changes are made visible to search operations.
426+
- `True`: Force refresh immediately after the operation.
427+
- `False`: Do not refresh (better performance for bulk operations).
428+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
429+
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
419430
:raises ValueError: If `documents` is not a list of `Document`s.
420431
:raises DuplicateDocumentError: If a document with the same ID already exists in the document store and
421432
`policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`.
@@ -456,7 +467,7 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D
456467
documents_written, errors = helpers.bulk(
457468
client=self.client,
458469
actions=elasticsearch_actions,
459-
refresh="wait_for",
470+
refresh=refresh,
460471
index=self._index,
461472
raise_on_error=False,
462473
stats_only=False,
@@ -488,13 +499,21 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D
488499
return documents_written
489500

490501
async def write_documents_async(
491-
self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
502+
self,
503+
documents: list[Document],
504+
policy: DuplicatePolicy = DuplicatePolicy.NONE,
505+
refresh: Literal["wait_for", True, False] = "wait_for",
492506
) -> int:
493507
"""
494508
Asynchronously writes `Document`s to Elasticsearch.
495509
496510
:param documents: List of Documents to write to the document store.
497511
:param policy: DuplicatePolicy to apply when a document with the same ID already exists in the document store.
512+
:param refresh: Controls when changes are made visible to search operations.
513+
- `True`: Force refresh immediately after the operation.
514+
- `False`: Do not refresh (better performance for bulk operations).
515+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
516+
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
498517
:raises ValueError: If `documents` is not a list of `Document`s.
499518
:raises DuplicateDocumentError: If a document with the same ID already exists in the document store and
500519
`policy` is set to `DuplicatePolicy.FAIL` or `DuplicatePolicy.NONE`.
@@ -537,7 +556,7 @@ async def write_documents_async(
537556
client=self.async_client,
538557
actions=actions,
539558
index=self._index,
540-
refresh=True,
559+
refresh=refresh,
541560
raise_on_error=False,
542561
stats_only=False,
543562
)
@@ -556,33 +575,45 @@ async def write_documents_async(
556575
msg = f"Failed to write documents to Elasticsearch: {e!s}"
557576
raise DocumentStoreError(msg) from e
558577

559-
def delete_documents(self, document_ids: list[str]) -> None:
578+
def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None:
560579
"""
561580
Deletes all documents with a matching document_ids from the document store.
562581
563582
:param document_ids: the document ids to delete
583+
:param refresh: Controls when changes are made visible to search operations.
584+
- `True`: Force refresh immediately after the operation.
585+
- `False`: Do not refresh (better performance for bulk operations).
586+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
587+
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
564588
"""
565589
helpers.bulk(
566590
client=self.client,
567591
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
568-
refresh="wait_for",
592+
refresh=refresh,
569593
index=self._index,
570594
raise_on_error=False,
571595
)
572596

573-
def _prepare_delete_all_request(self, *, is_async: bool) -> dict[str, Any]:
597+
def _prepare_delete_all_request(self, *, is_async: bool, refresh: bool) -> dict[str, Any]:
574598
return {
575599
"index": self._index,
576600
"body": {"query": {"match_all": {}}}, # Delete all documents
577601
"wait_for_completion": False if is_async else True, # block until done (set False for async)
578-
"refresh": True, # Ensure changes are visible immediately
602+
"refresh": refresh,
579603
}
580604

581-
async def delete_documents_async(self, document_ids: list[str]) -> None:
605+
async def delete_documents_async(
606+
self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for"
607+
) -> None:
582608
"""
583609
Asynchronously deletes all documents with a matching document_ids from the document store.
584610
585611
:param document_ids: the document ids to delete
612+
:param refresh: Controls when changes are made visible to search operations.
613+
- `True`: Force refresh immediately after the operation.
614+
- `False`: Do not refresh (better performance for bulk operations).
615+
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
616+
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
586617
"""
587618
self._ensure_initialized()
588619

@@ -591,20 +622,23 @@ async def delete_documents_async(self, document_ids: list[str]) -> None:
591622
client=self.async_client,
592623
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
593624
index=self._index,
594-
refresh=True,
625+
refresh=refresh,
595626
)
596627
except Exception as e:
597628
msg = f"Failed to delete documents from Elasticsearch: {e!s}"
598629
raise DocumentStoreError(msg) from e
599630

600-
def delete_all_documents(self, recreate_index: bool = False) -> None:
631+
def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None:
601632
"""
602633
Deletes all documents in the document store.
603634
604635
A fast way to clear all documents from the document store while preserving any index settings and mappings.
605636
606637
:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
607638
settings. If False, all documents will be deleted using the `delete_by_query` API.
639+
:param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request
640+
completes. If False, no refresh is performed. For more details, see the
641+
[Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh).
608642
"""
609643
self._ensure_initialized() # _ensure_initialized ensures _client is not None and an index exists
610644

@@ -630,20 +664,23 @@ def delete_all_documents(self, recreate_index: bool = False) -> None:
630664
self._client.indices.create(index=self._index, mappings=mappings) # type: ignore
631665

632666
else:
633-
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) # type: ignore
667+
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False, refresh=refresh)) # type: ignore
634668
logger.info(
635669
"Deleted all the {n_docs} documents from the index '{index}'.",
636670
index=self._index,
637671
n_docs=result["deleted"],
638672
)
639673

640-
async def delete_all_documents_async(self, recreate_index: bool = False) -> None:
674+
async def delete_all_documents_async(self, recreate_index: bool = False, refresh: bool = True) -> None:
641675
"""
642676
Asynchronously deletes all documents in the document store.
643677
644678
A fast way to clear all documents from the document store while preserving any index settings and mappings.
645679
:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
646680
settings. If False, all documents will be deleted using the `delete_by_query` API.
681+
:param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request
682+
completes. If False, no refresh is performed. For more details, see the
683+
[Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh).
647684
"""
648685
self._ensure_initialized() # ensures _async_client is not None
649686

@@ -670,7 +707,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
670707
else:
671708
# use delete_by_query for more efficient deletion without index recreation
672709
# For async, we need to wait for completion to get the deleted count
673-
delete_request = self._prepare_delete_all_request(is_async=True)
710+
delete_request = self._prepare_delete_all_request(is_async=True, refresh=refresh)
674711
delete_request["wait_for_completion"] = True # Override to wait for completion in async
675712
result = await self._async_client.delete_by_query(**delete_request) # type: ignore
676713
logger.info(
@@ -683,20 +720,23 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
683720
msg = f"Failed to delete all documents from Elasticsearch: {e!s}"
684721
raise DocumentStoreError(msg) from e
685722

686-
def delete_by_filter(self, filters: dict[str, Any]) -> int:
723+
def delete_by_filter(self, filters: dict[str, Any], refresh: bool = False) -> int:
687724
"""
688725
Deletes all documents that match the provided filters.
689726
690727
:param filters: The filters to apply to select documents for deletion.
691728
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
729+
:param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request
730+
completes. If False, no refresh is performed. For more details, see the
731+
[Elasticsearch delete_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query#operation-delete-by-query-refresh).
692732
:returns: The number of documents deleted.
693733
"""
694734
self._ensure_initialized()
695735

696736
try:
697737
normalized_filters = _normalize_filters(filters)
698738
body = {"query": {"bool": {"filter": normalized_filters}}}
699-
result = self.client.delete_by_query(index=self._index, body=body) # type: ignore
739+
result = self.client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
700740
deleted_count = result.get("deleted", 0)
701741
logger.info(
702742
"Deleted {n_docs} documents from index '{index}' using filters.",
@@ -708,20 +748,23 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int:
708748
msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}"
709749
raise DocumentStoreError(msg) from e
710750

711-
async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
751+
async def delete_by_filter_async(self, filters: dict[str, Any], refresh: bool = False) -> int:
712752
"""
713753
Asynchronously deletes all documents that match the provided filters.
714754
715755
:param filters: The filters to apply to select documents for deletion.
716756
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
757+
:param refresh: If True, Elasticsearch refreshes all shards involved in the delete by query after the request
758+
completes. If False, no refresh is performed. For more details, see the
759+
[Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
717760
:returns: The number of documents deleted.
718761
"""
719762
self._ensure_initialized()
720763

721764
try:
722765
normalized_filters = _normalize_filters(filters)
723766
body = {"query": {"bool": {"filter": normalized_filters}}}
724-
result = await self.async_client.delete_by_query(index=self._index, body=body) # type: ignore
767+
result = await self.async_client.delete_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
725768
deleted_count = result.get("deleted", 0)
726769
logger.info(
727770
"Deleted {n_docs} documents from index '{index}' using filters.",
@@ -733,13 +776,16 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
733776
msg = f"Failed to delete documents by filter from Elasticsearch: {e!s}"
734777
raise DocumentStoreError(msg) from e
735778

736-
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
779+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int:
737780
"""
738781
Updates the metadata of all documents that match the provided filters.
739782
740783
:param filters: The filters to apply to select documents for updating.
741784
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
742785
:param meta: The metadata fields to update.
786+
:param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request
787+
completes. If False, no refresh is performed. For more details, see the
788+
[Elasticsearch update_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-update-by-query#operation-update-by-query-refresh).
743789
:returns: The number of documents updated.
744790
"""
745791
self._ensure_initialized()
@@ -753,7 +799,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
753799
"script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"},
754800
}
755801

756-
result = self.client.update_by_query(index=self._index, body=body) # type: ignore
802+
result = self.client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
757803
updated_count = result.get("updated", 0)
758804
logger.info(
759805
"Updated {n_docs} documents in index '{index}' using filters.",
@@ -765,13 +811,16 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int
765811
msg = f"Failed to update documents by filter in Elasticsearch: {e!s}"
766812
raise DocumentStoreError(msg) from e
767813

768-
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
814+
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any], refresh: bool = False) -> int:
769815
"""
770816
Asynchronously updates the metadata of all documents that match the provided filters.
771817
772818
:param filters: The filters to apply to select documents for updating.
773819
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
774820
:param meta: The metadata fields to update.
821+
:param refresh: If True, Elasticsearch refreshes all shards involved in the update by query after the request
822+
completes. If False, no refresh is performed. For more details, see the
823+
[Elasticsearch update_by_query refresh documentation](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-update-by-query#operation-update-by-query-refresh).
775824
:returns: The number of documents updated.
776825
"""
777826
self._ensure_initialized()
@@ -785,7 +834,7 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str,
785834
"script": {"source": UPDATE_SCRIPT, "params": meta, "lang": "painless"},
786835
}
787836

788-
result = await self.async_client.update_by_query(index=self._index, body=body) # type: ignore
837+
result = await self.async_client.update_by_query(index=self._index, body=body, refresh=refresh) # type: ignore
789838
updated_count = result.get("updated", 0)
790839
logger.info(
791840
"Updated {n_docs} documents in index '{index}' using filters.",

0 commit comments

Comments
 (0)