diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e38346d92..ae5fc94b0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,13 +7,20 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Development] + +## [0.53.9 - 2026-03-31] + ### Fixed - **GFQL / GPU traversal**: Added a narrow one-hop undirected `hop()` fast path that avoids doubled edge-pair materialization for the common no-predicate traversal shape. On DGX-backed RAPIDS validation, warm `gplus` pipeline time improved `-39.67%` on `25.02` and `-39.27%` on `26.02`. +- **GFQL / Cypher**: Added bounded direct local Cypher reentry support for the vectorized same-alias `MATCH ... WITH ... MATCH ...` subset, including carried scalar projections and trailing `RETURN` / `ORDER BY` use on the carried alias. Unsupported cross-alias, fresh row-seeded, and prefix-order-dependent shapes now fail fast instead of silently miscompiling. ### Added - **GFQL / Cypher**: Support multi-alias scalar `RETURN` projections in direct Cypher queries. `MATCH (a)-[:R]->(b) RETURN a.id AS a_id, b.id AS b_id` now works by building a bindings table from edges joined with node properties (#981). - **GFQL / Cypher**: Edge alias property access in multi-alias `RETURN`. `MATCH (a)-[r:KNOWS]->(b) RETURN a.id, r.creationDate, b.firstName` now works — edge properties are accessible alongside node properties (#982). +### Tests +- **GFQL / Cypher / cuDF**: Added pandas and cuDF regression coverage for bounded reentry at both the helper and end-to-end lowering layers, including targeted DGX validation on official RAPIDS `26.02` `cuda13`. + ## [0.53.8 - 2026-03-31] ### Added diff --git a/graphistry/compute/gfql/cypher/lowering.py b/graphistry/compute/gfql/cypher/lowering.py index dda5795771..5dc68c899b 100644 --- a/graphistry/compute/gfql/cypher/lowering.py +++ b/graphistry/compute/gfql/cypher/lowering.py @@ -65,6 +65,7 @@ GraphBinding, GraphConstructor, LabelRef, + LimitClause, MatchClause, NodePattern, ParameterRef, @@ -115,7 +116,6 @@ class CompiledCypherQuery: optional_null_fill: Optional["OptionalNullFillPlan"] = None optional_projection_row_guard: Optional["OptionalProjectionRowGuardPlan"] = None start_nodes_query: Optional["CompiledCypherQuery"] = None - start_nodes_output_name: Optional[str] = None graph_bindings: Tuple["CompiledGraphBinding", ...] = () use_ref: Optional[str] = None @@ -299,6 +299,16 @@ def _unsupported(message: str, *, field: str, value: Any, line: int, column: int ) +def _unsupported_at_span(message: str, *, field: str, value: Any, span: SourceSpan) -> GFQLValidationError: + return _unsupported( + message, + field=field, + value=value, + line=span.line, + column=span.column, + ) + + def _rewrite_unquoted_expr_segments( expr_text: str, *, @@ -984,7 +994,7 @@ def _rewrite_cypher_integer_division_ast( ) -def _rewrite_order_expr_to_projected_outputs( +def _rewrite_alias_properties_to_outputs( expr_text: str, *, source_alias: str, @@ -1018,7 +1028,7 @@ def _rewrite(node_in: ExprNode) -> ExprNode: if output_name is not None: return Identifier(output_name) return PropertyAccessExpr(_rewrite(node_in.value), node_in.property) - return _rebuild_expr_node(node_in, rewrite=_rewrite, error_context="order expression rewrite") + return _rebuild_expr_node(node_in, rewrite=_rewrite, error_context="alias property rewrite") return _render_expr_node(_rewrite(node)) @@ -3456,7 +3466,7 @@ def _lower_order_by_clause( ), ) if plan.whole_row_output_names - else _rewrite_order_expr_to_projected_outputs( + else _rewrite_alias_properties_to_outputs( item.expression.text, source_alias=plan.source_alias, property_outputs=plan.projected_property_outputs, @@ -3818,6 +3828,21 @@ def _lower_match_alias_stage( if stage.clause.distinct: row_steps.append(distinct()) if stage.where is not None: + where_expr = stage.where + if not plan.whole_row_output_names: + where_expr = ExpressionText( + text=_rewrite_alias_properties_to_outputs( + stage.where.text, + source_alias=plan.source_alias, + property_outputs=plan.projected_property_outputs, + params=params, + alias_targets=scope.alias_targets, + field="with.where", + line=stage.where.span.line, + column=stage.where.span.column, + ), + span=stage.where.span, + ) _validate_row_expr_scope( stage.where.text, alias_targets=scope.alias_targets, @@ -3831,7 +3856,7 @@ def _lower_match_alias_stage( row_steps.append( where_rows( expr=_row_expr_arg( - stage.where, + where_expr, params=params, alias_targets=scope.alias_targets, field="with.where", @@ -5799,6 +5824,112 @@ def lower_cypher_query( return compiled.chain +def _reentry_hidden_column_name(output_name: str) -> str: + return f"__cypher_reentry_{output_name}__" + + +def _rewrite_reentry_expr_to_hidden_properties( + expr: ExpressionText, + *, + carried_alias: str, + carried_columns: Sequence[str], + field: str, +) -> ExpressionText: + if not carried_columns: + return expr + try: + node = parse_expr(expr.text) + except (GFQLExprParseError, ImportError) as exc: + raise _unsupported( + "Cypher MATCH after WITH carried-column rewrite requires a locally supported scalar expression", + field=field, + value=expr.text, + line=expr.span.line, + column=expr.span.column, + ) from exc + replacements = { + output_name: f"{carried_alias}.{_reentry_hidden_column_name(output_name)}" + for output_name in carried_columns + } + identifiers = collect_identifiers(node) + if not any(identifier in replacements for identifier in identifiers): + return expr + return ExpressionText( + text=_render_expr_node(_rewrite_expr_identifiers(node, replacements)), + span=expr.span, + ) + + +def _bounded_reentry_carry_columns( + prefix_projection: ResultProjectionPlan, + *, + projection_items: Sequence[str], + query: CypherQuery, + prefix_stage: ProjectionStage, +) -> Tuple[str, Tuple[str, ...]]: + whole_row_columns = tuple(column.output_name for column in prefix_projection.columns if column.kind == "whole_row") + if len(whole_row_columns) != 1: + raise _unsupported_at_span( + "Cypher MATCH after WITH currently requires the prefix WITH stage to project exactly one whole-row alias", + field="with", + value=projection_items, + span=prefix_stage.span, + ) + carried_columns = tuple(column.output_name for column in prefix_projection.columns if column.kind != "whole_row") + if not carried_columns: + return whole_row_columns[0], () + seed_alias = _single_node_seed_alias(query.matches[0]) if len(query.matches) == 1 else None + if seed_alias is None or seed_alias != prefix_projection.alias: + raise _unsupported_at_span( + "Cypher MATCH after WITH carried scalar columns currently require a single-node prefix MATCH seed", + field="with", + value=projection_items, + span=prefix_stage.span, + ) + invalid_output = next((name for name in carried_columns if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", name)), None) + if invalid_output is not None: + raise _unsupported_at_span( + "Cypher MATCH after WITH carried scalar columns currently require identifier-style WITH aliases", + field="with", + value=invalid_output, + span=prefix_stage.span, + ) + if len(set(carried_columns)) != len(carried_columns): + raise _unsupported_at_span( + "Cypher MATCH after WITH carried scalar columns currently require distinct WITH aliases", + field="with", + value=carried_columns, + span=prefix_stage.span, + ) + return whole_row_columns[0], carried_columns + + +def _literal_limit_value(limit_clause: Optional[LimitClause]) -> Optional[int]: + if limit_clause is None: + return None + value = limit_clause.value + if isinstance(value, int): + return value + if isinstance(value, ParameterRef): + return None + text = value.text.strip() + if not re.fullmatch(r"\d+", text): + return None + return int(text) + + +def _bounded_reentry_prefix_order_is_safe( + *, + prefix_stage: ProjectionStage, + query: CypherQuery, +) -> bool: + if prefix_stage.order_by is None: + return True + if query.order_by is not None: + return True + return prefix_stage.skip is None and _literal_limit_value(prefix_stage.limit) == 1 + + def _first_pattern_node_alias(clause: MatchClause) -> Optional[str]: if len(clause.patterns) != 1: raise _unsupported( @@ -5814,120 +5945,176 @@ def _first_pattern_node_alias(clause: MatchClause) -> Optional[str]: return pattern[0].variable +def _reentry_query_clone(query: CypherQuery, **overrides: Any) -> CypherQuery: + return replace( + query, call=None, row_sequence=(), reentry_matches=(), reentry_where=None, + graph_bindings=(), use=None, + **overrides, + ) + + def _compile_bounded_reentry_query( query: CypherQuery, *, params: Optional[Mapping[str, Any]] = None, ) -> CompiledCypherQuery: if len(query.with_stages) != 1 or len(query.reentry_matches) != 1: - raise _unsupported( + raise _unsupported_at_span( "Cypher MATCH after WITH is only supported for a single MATCH ... WITH ... MATCH ... RETURN shape in the local compiler", field="match", value=len(query.reentry_matches), - line=query.return_.span.line, - column=query.return_.span.column, + span=query.return_.span, ) prefix_stage = query.with_stages[0] + projection_items = [item.expression.text for item in prefix_stage.clause.items] if prefix_stage.where is not None: - raise _unsupported( + raise _unsupported_at_span( "Cypher MATCH after WITH does not yet support WITH ... WHERE in the prefix stage", field="with.where", value=prefix_stage.where.text, - line=prefix_stage.span.line, - column=prefix_stage.span.column, + span=prefix_stage.span, ) - prefix_query = CypherQuery( - matches=query.matches, - where=query.where, - call=None, - unwinds=query.unwinds, + prefix_query = _reentry_query_clone( + query, with_stages=(), - return_=ReturnClause( - items=prefix_stage.clause.items, - distinct=prefix_stage.clause.distinct, - kind="return", - span=prefix_stage.clause.span, - ), + return_=replace(prefix_stage.clause, kind="return"), order_by=prefix_stage.order_by, skip=prefix_stage.skip, limit=prefix_stage.limit, - row_sequence=(), trailing_semicolon=False, - span=query.span, ) prefix_compiled = compile_cypher_query(prefix_query, params=params) if not isinstance(prefix_compiled, CompiledCypherQuery): - raise _unsupported( + raise _unsupported_at_span( "Cypher MATCH after WITH prefix compilation produced an unexpected UNION program", field="with", value="union", - line=prefix_stage.span.line, - column=prefix_stage.span.column, + span=prefix_stage.span, ) prefix_projection = prefix_compiled.result_projection - if prefix_projection is None or len(prefix_projection.columns) != 1 or prefix_projection.columns[0].kind != "whole_row": - raise _unsupported( + if prefix_projection is None: + raise _unsupported_at_span( "Cypher MATCH after WITH currently requires the prefix WITH stage to project exactly one whole-row alias", field="with", - value=[item.expression.text for item in prefix_stage.clause.items], - line=prefix_stage.span.line, - column=prefix_stage.span.column, + value=projection_items, + span=prefix_stage.span, ) - if prefix_projection.table != "nodes": + reentry_alias, carry_columns = _bounded_reentry_carry_columns( + prefix_projection, + projection_items=projection_items, + query=query, + prefix_stage=prefix_stage, + ) + if not _bounded_reentry_prefix_order_is_safe(prefix_stage=prefix_stage, query=query): raise _unsupported( + "Cypher MATCH after WITH does not yet preserve prefix WITH row ordering across MATCH re-entry for multi-row result shapes", + field="with.order_by", + value=( + [item.expression.text for item in prefix_stage.order_by.items] + if prefix_stage.order_by is not None + else None + ), + line=prefix_stage.order_by.span.line if prefix_stage.order_by is not None else prefix_stage.span.line, + column=prefix_stage.order_by.span.column if prefix_stage.order_by is not None else prefix_stage.span.column, + ) + if prefix_projection.table != "nodes": + raise _unsupported_at_span( "Cypher MATCH after WITH currently supports node re-entry only", field="with", value=prefix_projection.table, - line=prefix_stage.span.line, - column=prefix_stage.span.column, + span=prefix_stage.span, ) if len(query.return_.items) == 1 and query.return_.items[0].expression.text == "*": - raise _unsupported( + raise _unsupported_at_span( "Cypher MATCH after WITH does not yet support RETURN * from the trailing MATCH re-entry stage", field=query.return_.kind, value="*", - line=query.return_.span.line, - column=query.return_.span.column, + span=query.return_.span, ) reentry_match = query.reentry_matches[0] first_alias = _first_pattern_node_alias(reentry_match) - if first_alias is None or first_alias != prefix_projection.alias: - raise _unsupported( + if first_alias is None or first_alias != reentry_alias: + raise _unsupported_at_span( "Cypher MATCH after WITH currently requires the trailing MATCH to start from the same carried node alias", field="match", value=first_alias, - line=reentry_match.span.line, - column=reentry_match.span.column, + span=reentry_match.span, ) - suffix_query = CypherQuery( + hidden_columns = tuple(_reentry_hidden_column_name(output_name) for output_name in carry_columns) + + def rewrite_expr(expr: ExpressionText, field: str) -> ExpressionText: + return _rewrite_reentry_expr_to_hidden_properties( + expr, + carried_alias=reentry_alias, + carried_columns=carry_columns, + field=field, + ) + + reentry_where = query.reentry_where + if reentry_where is not None and reentry_where.expr is not None and hidden_columns: + reentry_where = replace( + reentry_where, + expr=rewrite_expr(reentry_where.expr, "where"), + ) + reentry_return = query.return_ + if hidden_columns: + reentry_return = replace( + query.return_, + items=tuple( + replace( + item, + expression=rewritten_expr, + alias=item.alias or (item.expression.text if rewritten_expr.text != item.expression.text else None), + ) + for item in query.return_.items + for rewritten_expr in (rewrite_expr(item.expression, query.return_.kind),) + ), + ) + reentry_order_by = query.order_by + if reentry_order_by is not None and hidden_columns: + reentry_order_by = replace( + reentry_order_by, + items=tuple( + replace( + item, + expression=rewrite_expr(item.expression, "order_by"), + ) + for item in reentry_order_by.items + ), + ) + suffix_query = _reentry_query_clone( + query, matches=query.reentry_matches, - where=query.reentry_where, - call=None, + where=reentry_where, unwinds=(), with_stages=(), - return_=query.return_, - order_by=query.order_by, - skip=query.skip, - limit=query.limit, - row_sequence=(), - trailing_semicolon=query.trailing_semicolon, - span=query.span, + return_=reentry_return, + order_by=reentry_order_by, ) suffix_compiled = compile_cypher_query(suffix_query, params=params) if not isinstance(suffix_compiled, CompiledCypherQuery): - raise _unsupported( + raise _unsupported_at_span( "Cypher MATCH after WITH suffix compilation produced an unexpected UNION program", field="match", value="union", - line=reentry_match.span.line, - column=reentry_match.span.column, + span=reentry_match.span, + ) + result_projection = suffix_compiled.result_projection + if result_projection is not None and result_projection.alias == reentry_alias and hidden_columns: + result_projection = replace( + result_projection, + exclude_columns=tuple( + dict.fromkeys( + result_projection.exclude_columns + hidden_columns + ) + ), ) return replace( suffix_compiled, + result_projection=result_projection, start_nodes_query=prefix_compiled, - start_nodes_output_name=prefix_projection.columns[0].output_name, ) @@ -6123,19 +6310,7 @@ def compile_cypher_query( def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery: if not compiled_bindings and _use_ref is None: return result - return CompiledCypherQuery( - chain=result.chain, - seed_rows=result.seed_rows, - procedure_call=result.procedure_call, - result_projection=result.result_projection, - empty_result_row=result.empty_result_row, - optional_null_fill=result.optional_null_fill, - optional_projection_row_guard=result.optional_projection_row_guard, - start_nodes_query=result.start_nodes_query, - start_nodes_output_name=result.start_nodes_output_name, - graph_bindings=compiled_bindings, - use_ref=_use_ref, - ) + return replace(result, graph_bindings=compiled_bindings, use_ref=_use_ref) _reject_unsupported_variable_length_where_pattern_predicates(query) _reject_variable_length_path_alias_references(query, params=params) diff --git a/graphistry/compute/gfql_unified.py b/graphistry/compute/gfql_unified.py index 2d306b418d..0eb293194b 100644 --- a/graphistry/compute/gfql_unified.py +++ b/graphistry/compute/gfql_unified.py @@ -1,10 +1,11 @@ """GFQL unified entrypoint for chains, DAGs, and local string-compiled queries.""" # ruff: noqa: E501 +from dataclasses import replace import re -from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Union, cast +from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Tuple, Union, cast from graphistry.Plottable import Plottable -from graphistry.Engine import Engine, EngineAbstract, df_concat, df_cons, resolve_engine +from graphistry.Engine import Engine, EngineAbstract, df_concat, df_cons, resolve_engine, safe_merge from graphistry.util import setup_logger from .ast import ASTObject, ASTLet, ASTNode, ASTEdge, ASTCall from .chain import Chain, chain as chain_impl @@ -25,7 +26,12 @@ ) from graphistry.compute.exceptions import ErrorCode, GFQLValidationError from graphistry.compute.gfql.cypher.api import compile_cypher -from graphistry.compute.gfql.cypher.lowering import CompiledCypherGraphQuery, CompiledCypherQuery, CompiledCypherUnionQuery +from graphistry.compute.gfql.cypher.lowering import ( + CompiledCypherGraphQuery, + CompiledCypherQuery, + CompiledCypherUnionQuery, + _reentry_hidden_column_name, +) from graphistry.compute.gfql.cypher.call_procedures import execute_cypher_call from graphistry.compute.gfql.cypher.result_postprocess import WholeRowProjectionMeta, apply_result_projection from graphistry.compute.gfql.df_executor import ( @@ -41,6 +47,9 @@ logger = setup_logger(__name__) +_REENTRY_WHOLE_ROW_SUGGESTION = "Carry a whole-row node alias through WITH before MATCH re-entry." +_REENTRY_SCALAR_SUGGESTION = "Carry scalar columns through WITH before MATCH re-entry." + _CYPHER_LEAD_RE = re.compile( r"^\s*(?:MATCH|OPTIONAL\s+MATCH|WITH|RETURN|UNWIND|CALL|CREATE|MERGE|DELETE|DETACH\s+DELETE|SET|REMOVE|FOREACH|GRAPH|USE)\b", re.IGNORECASE, @@ -89,6 +98,47 @@ def _apply_empty_result_row( return out +def _entity_projection_meta_entry( + result: Plottable, + *, + output_name: str, + field: str, + message: str, + suggestion: str, +) -> WholeRowProjectionMeta: + entity_meta = cast( + Optional[Dict[str, WholeRowProjectionMeta]], + getattr(result, "_cypher_entity_projection_meta", None), + ) + if not isinstance(entity_meta, dict) or output_name not in entity_meta: + raise GFQLValidationError( + ErrorCode.E108, + message, + field=field, + value=output_name, + suggestion=suggestion, + language="cypher", + ) + return entity_meta[output_name] + + +def _reentry_validation_error( + message: str, + *, + value: Any, + suggestion: str, + field: str = "with", +) -> GFQLValidationError: + return GFQLValidationError( + ErrorCode.E108, + message, + field=field, + value=value, + suggestion=suggestion, + language="cypher", + ) + + def _apply_optional_null_fill( result: Plottable, *, @@ -105,20 +155,13 @@ def _apply_optional_null_fill( rows_df = getattr(result, "_nodes", None) actual_rows = 0 if rows_df is None else len(rows_df) - entity_meta = cast( - Optional[Dict[str, WholeRowProjectionMeta]], - getattr(alignment_result, "_cypher_entity_projection_meta", None), - ) - if not isinstance(entity_meta, dict) or alignment_output_name not in entity_meta: - raise GFQLValidationError( - ErrorCode.E108, - "Cypher OPTIONAL MATCH null-row alignment could not recover matched seed identities", - field="match", - value=alignment_output_name, - suggestion="Use a simpler OPTIONAL MATCH projection shape in the local compiler.", - language="cypher", - ) - matched_ids = entity_meta[alignment_output_name]["ids"] + matched_ids = _entity_projection_meta_entry( + alignment_result, + output_name=alignment_output_name, + field="match", + message="Cypher OPTIONAL MATCH null-row alignment could not recover matched seed identities", + suggestion="Use a simpler OPTIONAL MATCH projection shape in the local compiler.", + )["ids"] if not hasattr(matched_ids, "tolist"): raise GFQLValidationError( ErrorCode.E108, @@ -307,17 +350,7 @@ def _execute_query_with_graph_context( else: target_graph = base_graph # Strip graph context from the compiled query and execute normally - plain_query = CompiledCypherQuery( - chain=compiled.chain, - seed_rows=compiled.seed_rows, - procedure_call=compiled.procedure_call, - result_projection=compiled.result_projection, - empty_result_row=compiled.empty_result_row, - optional_null_fill=compiled.optional_null_fill, - optional_projection_row_guard=compiled.optional_projection_row_guard, - start_nodes_query=compiled.start_nodes_query, - start_nodes_output_name=compiled.start_nodes_output_name, - ) + plain_query = replace(compiled, graph_bindings=(), use_ref=None) return _execute_compiled_query( target_graph, compiled_query=plain_query, @@ -425,51 +458,174 @@ def _execute_compiled_query( return result -def _compiled_query_start_nodes( +def _compiled_query_reentry_state( + base_graph: Plottable, compiled_query: CompiledCypherQuery, prefix_result: Plottable, *, engine: Union[EngineAbstract, str], -) -> DataFrameT: - output_name = compiled_query.start_nodes_output_name - entity_meta = cast( - Optional[Dict[str, WholeRowProjectionMeta]], - getattr(prefix_result, "_cypher_entity_projection_meta", None), +) -> Tuple[Plottable, DataFrameT]: + output_name, carried_columns = _compiled_query_reentry_contract(compiled_query) + meta = _entity_projection_meta_entry( + prefix_result, + output_name=output_name, + field="with", + message="Cypher MATCH after WITH could not recover carried node identities from the prefix stage", + suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, ) - if not isinstance(entity_meta, dict) or output_name not in entity_meta: - raise GFQLValidationError( - ErrorCode.E108, - "Cypher MATCH after WITH could not recover carried node identities from the prefix stage", - field="with", - value=output_name, - suggestion="Carry a whole-row node alias through WITH before MATCH re-entry.", - language="cypher", - ) - meta = entity_meta[output_name] if meta["table"] != "nodes": - raise GFQLValidationError( - ErrorCode.E108, + raise _reentry_validation_error( "Cypher MATCH after WITH currently supports node re-entry only", - field="with", value=output_name, - suggestion="Carry a whole-row node alias through WITH before MATCH re-entry.", - language="cypher", + suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, ) ids = meta["ids"] id_column = meta["id_column"] if not hasattr(ids, "dropna"): - raise GFQLValidationError( - ErrorCode.E108, + raise _reentry_validation_error( "Cypher MATCH after WITH could not recover carried node identities from the prefix stage", - field="with", value=output_name, - suggestion="Carry a whole-row node alias through WITH before MATCH re-entry.", - language="cypher", + suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, ) - concrete_engine = resolve_engine(cast(Any, engine), prefix_result) - df_ctor = df_cons(concrete_engine) - carried_ids = cast(SeriesT, ids.dropna().reset_index(drop=True)) - return df_ctor({id_column: carried_ids}) + base_nodes = getattr(base_graph, "_nodes", None) + if base_nodes is None or id_column not in base_nodes.columns: + raise _reentry_validation_error( + "Cypher MATCH after WITH could not recover the base node table for re-entry", + value=id_column, + suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, + ) + concrete_engine = resolve_engine(cast(Any, engine), base_graph) + carried_ids, aligned_prefix_rows = _aligned_reentry_rows( + ids=cast(SeriesT, ids), + prefix_rows=getattr(prefix_result, "_nodes", None), + output_name=output_name, + ) + carried_node_ids = cast(DataFrameT, df_cons(concrete_engine)({id_column: carried_ids})) + if not carried_columns: + return base_graph, _ordered_reentry_start_nodes( + node_rows=base_nodes, + carried_node_ids=carried_node_ids, + id_column=id_column, + ) + if aligned_prefix_rows is None: + raise _reentry_validation_error( + "Cypher MATCH after WITH could not recover carried row columns from the prefix stage", + value=output_name, + suggestion=_REENTRY_SCALAR_SUGGESTION, + ) + duplicate_mask = carried_ids.duplicated() + if bool(duplicate_mask.any()) if hasattr(duplicate_mask, "any") else False: + raise _reentry_validation_error( + "Cypher MATCH after WITH carried scalar columns currently require unique carried node rows", + value=output_name, + suggestion="Use a single-node seed WITH shape, or avoid carrying scalar columns into MATCH re-entry.", + ) + + carry_payload = _reentry_carry_payload( + carried_node_ids=carried_node_ids, + prefix_rows=aligned_prefix_rows, + carried_columns=carried_columns, + ) + hidden_columns = [name for name in map(_reentry_hidden_column_name, carried_columns) if name in base_nodes.columns] + merge_base = cast(DataFrameT, base_nodes.drop(columns=hidden_columns)) if hidden_columns else base_nodes + node_rows = cast(DataFrameT, safe_merge(merge_base, carry_payload, on=id_column, how="left")) + + dispatch_graph = base_graph.bind() + dispatch_graph._nodes = node_rows + edges_df = getattr(base_graph, "_edges", None) + if edges_df is not None: + dispatch_graph._edges = edges_df + return dispatch_graph, _ordered_reentry_start_nodes( + node_rows=node_rows, + carried_node_ids=carried_node_ids, + id_column=id_column, + ) + + +def _compiled_query_reentry_contract( + compiled_query: CompiledCypherQuery, +) -> Tuple[str, Tuple[str, ...]]: + prefix_query = compiled_query.start_nodes_query + prefix_projection = None if prefix_query is None else prefix_query.result_projection + if prefix_projection is None: + raise _reentry_validation_error( + "Cypher MATCH after WITH could not recover the prefix projection contract for re-entry", + value=None, + suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, + ) + whole_row_columns = tuple( + column.output_name for column in prefix_projection.columns if column.kind == "whole_row" + ) + if len(whole_row_columns) != 1: + raise _reentry_validation_error( + "Cypher MATCH after WITH could not recover exactly one whole-row alias from the prefix projection", + value=whole_row_columns, + suggestion="Carry exactly one whole-row node alias through WITH before MATCH re-entry.", + ) + carried_columns = tuple( + column.output_name for column in prefix_projection.columns if column.kind != "whole_row" + ) + return whole_row_columns[0], carried_columns + + +def _aligned_reentry_rows( + *, + ids: SeriesT, + prefix_rows: Optional[DataFrameT], + output_name: Optional[str], +) -> Tuple[SeriesT, Optional[DataFrameT]]: + if prefix_rows is not None and len(prefix_rows) != len(ids): + raise _reentry_validation_error( + "Cypher MATCH after WITH metadata row counts disagreed with prefix rows during re-entry", + value=output_name, + suggestion="Retry with a direct whole-row carry through WITH or inspect intermediate row-shaping before MATCH re-entry.", + ) + if not hasattr(ids, "notna"): + raise _reentry_validation_error( + "Cypher MATCH after WITH could not align carried node identities from the prefix stage", + value=output_name, + suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, + ) + + non_null_mask = cast(SeriesT, ids.notna()) + carried_ids = cast(SeriesT, ids[non_null_mask].reset_index(drop=True)) + if prefix_rows is None: + return carried_ids, None + return carried_ids, cast(DataFrameT, prefix_rows.loc[non_null_mask].reset_index(drop=True)) + + +def _reentry_carry_payload( + *, + carried_node_ids: DataFrameT, + prefix_rows: DataFrameT, + carried_columns: Sequence[str], +) -> DataFrameT: + missing_column = next((name for name in carried_columns if name not in prefix_rows.columns), None) + if missing_column is not None: + raise _reentry_validation_error( + "Cypher MATCH after WITH could not recover a carried scalar column from the prefix stage", + value=missing_column, + suggestion="Project the scalar column explicitly before MATCH re-entry.", + ) + return cast( + DataFrameT, + carried_node_ids.assign( + **{ + _reentry_hidden_column_name(output_name): cast(SeriesT, prefix_rows[output_name]).reset_index(drop=True) + for output_name in carried_columns + } + ), + ) + + +def _ordered_reentry_start_nodes( + *, + node_rows: DataFrameT, + carried_node_ids: DataFrameT, + id_column: str, +) -> DataFrameT: + # MATCH re-entry must preserve the WITH row order, not the base node-table order. + return cast(DataFrameT, safe_merge(carried_node_ids, node_rows, on=id_column, how="left")) def _materialize_split_alias_columns( @@ -871,6 +1027,7 @@ def policy(context: PolicyContext) -> None: raise ValueError("where provided for Chain that already includes where") query = Chain(query.chain, where=where_param) if compiled_query is not None: + compiled_base_graph = self start_nodes = None if compiled_query.start_nodes_query is not None: prefix_result = _execute_compiled_query( @@ -880,9 +1037,14 @@ def policy(context: PolicyContext) -> None: policy=expanded_policy, context=context, ) - start_nodes = _compiled_query_start_nodes(compiled_query, prefix_result, engine=engine) + compiled_base_graph, start_nodes = _compiled_query_reentry_state( + self, + compiled_query, + prefix_result, + engine=engine, + ) return _execute_compiled_query( - self, + compiled_base_graph, compiled_query=compiled_query, engine=engine, policy=expanded_policy, diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering.py b/graphistry/tests/compute/gfql/cypher/test_lowering.py index 2be3778332..0da3a670f9 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering.py @@ -2,7 +2,7 @@ import pandas as pd import pytest -from typing import Any, cast +from typing import Any, Dict, List, Optional, Tuple, cast from graphistry.compute.ast import ASTCall, ASTNode, ASTEdgeForward, ASTEdgeReverse, ASTEdgeUndirected from graphistry.compute.exceptions import ErrorCode, GFQLSyntaxError, GFQLTypeError, GFQLValidationError @@ -87,6 +87,71 @@ def _mk_empty_graph() -> _CypherTestGraph: return _mk_graph(pd.DataFrame({"id": []}), pd.DataFrame({"s": [], "d": []})) +def _mk_reentry_carried_scalar_graph() -> _CypherTestGraph: + return _mk_graph( + pd.DataFrame( + { + "id": ["a1", "a2", "b1", "b2"], + "label__A": [True, True, False, False], + "num": [1, 2, 1, 3], + } + ), + pd.DataFrame( + { + "s": ["a1", "a2"], + "d": ["b1", "b2"], + "type": ["R", "R"], + } + ), + ) + + +def _mk_reentry_carried_scalar_graph_cudf() -> _CypherTestGraph: + return _mk_cudf_graph( + pd.DataFrame( + { + "id": ["a1", "a2", "b1", "b2"], + "label__A": [True, True, False, False], + "num": [1, 2, 1, 3], + } + ), + pd.DataFrame( + { + "s": ["a1", "a2"], + "d": ["b1", "b2"], + "type": ["R", "R"], + } + ), + ) + + +def _compiled_reentry_projection_outputs(compiled: CompiledCypherQuery) -> Tuple[str, Tuple[str, ...]]: + assert compiled.start_nodes_query is not None + projection = compiled.start_nodes_query.result_projection + assert projection is not None + whole_row_columns = tuple(column.output_name for column in projection.columns if column.kind == "whole_row") + carried_columns = tuple(column.output_name for column in projection.columns if column.kind != "whole_row") + assert len(whole_row_columns) == 1 + return whole_row_columns[0], carried_columns + + +def _reentry_query( + with_clause: str, + *, + return_clause: str, + match_alias: str = "a", + order_by: Optional[str] = None, + where_clause: Optional[str] = None, +) -> str: + parts = ["MATCH (a:A) ", f"WITH {with_clause} ", f"MATCH ({match_alias})-->(b) "] + if where_clause is not None: + parts.append(f"WHERE {where_clause} ") + parts.append(f"RETURN {return_clause}") + if order_by is not None: + parts.append(f" ORDER BY {order_by}") + return "".join(parts) + + def _assert_query_rows( query: str, expected_rows: list[dict[str, object]], @@ -4713,6 +4778,38 @@ def test_string_cypher_supports_return_star_after_with_distinct_row_projection() assert result._nodes.to_dict(orient="records") == [{"name": "C"}] +@pytest.mark.parametrize( + ("query", "expected"), + [ + ( + "MATCH (a) " + "WITH DISTINCT a.name2 AS name " + "WHERE a.name2 = 'B' " + "RETURN *", + [{"name": "B"}], + ), + ( + "MATCH (a) " + "WITH a.name2 AS name " + "WHERE name = 'B' OR a.name2 = 'C' " + "RETURN * " + "ORDER BY name", + [{"name": "B"}, {"name": "C"}], + ), + ], +) +def test_string_cypher_supports_with_where_using_projected_source_properties( + query: str, + expected: List[Dict[str, Any]], +) -> None: + result = _mk_graph( + pd.DataFrame({"id": ["a", "b", "c"], "name2": ["A", "B", "C"]}), + pd.DataFrame({"s": [], "d": []}), + ).gfql(query) + + assert result._nodes.to_dict(orient="records") == expected + + def test_string_cypher_rejects_out_of_scope_order_by_after_multiple_with_stages() -> None: g = _mk_graph(pd.DataFrame({"id": []}), pd.DataFrame({"s": [], "d": []})) @@ -4936,6 +5033,36 @@ def test_string_cypher_executes_with_match_reentry_limit_shape() -> None: assert result._nodes.to_dict(orient="records") == [{"a": "(:A {name: 'alpha'})"}] +def test_string_cypher_executes_with_match_reentry_limit_shape_on_cudf() -> None: + cudf = pytest.importorskip("cudf") + + nodes = cudf.DataFrame.from_pandas( + pd.DataFrame( + { + "id": ["a1", "a2", "b1", "b2"], + "label__A": [True, True, False, False], + "name": ["alpha", "beta", None, None], + } + ) + ) + edges = cudf.DataFrame.from_pandas( + pd.DataFrame( + { + "s": ["a1", "a2"], + "d": ["b1", "b2"], + } + ) + ) + + result = _mk_graph(nodes, edges).gfql( + "MATCH (a:A) WITH a ORDER BY a.name LIMIT 1 MATCH (a)-->(b) RETURN a", + engine="cudf", + ) + + assert type(result._nodes).__module__.startswith("cudf") + assert result._nodes.to_pandas().to_dict(orient="records") == [{"a": "(:A {name: 'alpha'})"}] + + def test_string_cypher_executes_with_match_reentry_multihop_shape() -> None: nodes = pd.DataFrame( { @@ -4958,6 +5085,174 @@ def test_string_cypher_executes_with_match_reentry_multihop_shape() -> None: assert result._nodes.to_dict(orient="records") == [{"id": "c"}] +@pytest.mark.parametrize( + ("query", "expected_whole_row_output", "expected_columns"), + [ + ( + _reentry_query("a, a.num AS property", return_clause="property", order_by="property DESC"), + "a", + ("property",), + ), + ( + _reentry_query( + "a, a.num AS property, a.num + 10 AS property2", + return_clause="property, property2", + order_by="property DESC", + ), + "a", + ("property", "property2"), + ), + ( + _reentry_query( + "a AS x, a.num AS property", + match_alias="x", + return_clause="property", + order_by="property DESC", + ), + "x", + ("property",), + ), + ], +) +def test_compile_cypher_tracks_reentry_carried_scalar_columns( + query: str, + expected_whole_row_output: str, + expected_columns: Tuple[str, ...], +) -> None: + compiled = _compile_query(query) + whole_row_output, carried_columns = _compiled_reentry_projection_outputs(compiled) + + assert whole_row_output == expected_whole_row_output + assert carried_columns == expected_columns + + +@pytest.mark.parametrize( + ("query", "expected"), + [ + ( + _reentry_query("a AS x", match_alias="x", return_clause="b.id AS bid", order_by="bid"), + [{"bid": "b1"}, {"bid": "b2"}], + ), + ( + _reentry_query("a, a.num AS property", return_clause="property", order_by="property DESC"), + [{"property": 2}, {"property": 1}], + ), + ( + _reentry_query("a, a.num AS property", return_clause="a", order_by="property DESC"), + [{"a": "(:A {num: 2})"}, {"a": "(:A {num: 1})"}], + ), + ( + _reentry_query( + "a, a.num AS property, a.num + 10 AS property2", + return_clause="property, property2", + order_by="property DESC", + ), + [{"property": 2, "property2": 12}, {"property": 1, "property2": 11}], + ), + ( + _reentry_query( + "a AS x, a.num AS property", + match_alias="x", + return_clause="x, property", + order_by="property DESC", + ), + [{"x": "(:A {num: 2})", "property": 2}, {"x": "(:A {num: 1})", "property": 1}], + ), + ], +) +def test_string_cypher_executes_with_match_reentry_carried_scalar_shapes(query: str, expected: List[Dict[str, Any]]) -> None: + result = _mk_reentry_carried_scalar_graph().gfql(query) + assert result._nodes.to_dict(orient="records") == expected + + +@pytest.mark.parametrize( + ("query", "expected"), + [ + ( + _reentry_query("a, a.num AS property", return_clause="property", order_by="property DESC"), + [{"property": 2}, {"property": 1}], + ), + ( + _reentry_query( + "a AS x, a.num AS property", + match_alias="x", + return_clause="x, property", + order_by="property DESC", + ), + [{"x": "(:A {num: 2})", "property": 2}, {"x": "(:A {num: 1})", "property": 1}], + ), + ], +) +def test_string_cypher_executes_with_match_reentry_carried_scalar_shapes_on_cudf( + query: str, + expected: List[Dict[str, Any]], +) -> None: + pytest.importorskip("cudf") + + result = _mk_reentry_carried_scalar_graph_cudf().gfql(query, engine="cudf") + + assert type(result._nodes).__module__.startswith("cudf") + assert result._nodes.to_pandas().to_dict(orient="records") == expected + + +def test_string_cypher_reentry_carried_scalars_ignore_internal_hidden_column_collisions() -> None: + g = _mk_reentry_carried_scalar_graph() + g._nodes = g._nodes.assign(__cypher_reentry_property__=["orig1", "orig2", None, None]) + + result = g.gfql(_reentry_query("a, a.num AS property", return_clause="property", order_by="property DESC")) + + assert result._nodes.to_dict(orient="records") == [{"property": 2}, {"property": 1}] + + +def test_string_cypher_reentry_carried_scalars_ignore_internal_hidden_column_collisions_on_cudf() -> None: + pytest.importorskip("cudf") + + g = _mk_reentry_carried_scalar_graph_cudf() + g._nodes = g._nodes.assign(__cypher_reentry_property__=["orig1", "orig2", None, None]) + + result = g.gfql( + _reentry_query("a, a.num AS property", return_clause="property", order_by="property DESC"), + engine="cudf", + ) + + assert type(result._nodes).__module__.startswith("cudf") + assert result._nodes.to_pandas().to_dict(orient="records") == [{"property": 2}, {"property": 1}] + + +@pytest.mark.parametrize( + ("query", "match"), + [ + ( + _reentry_query( + "a, a.num AS property", + return_clause="b.id AS id", + where_clause="property = b.num", + ), + "one MATCH source alias at a time", + ), + ( + "MATCH (a:A) " + "WITH a " + "ORDER BY a.num DESC " + "MATCH (a)-->(b) " + "RETURN b.id AS bid", + "does not yet preserve prefix WITH row ordering", + ), + ( + "MATCH (a:A) " + "WITH a, a.num AS property " + "ORDER BY property DESC " + "MATCH (a)-->(b) " + "RETURN b.id AS bid", + "does not yet preserve prefix WITH row ordering", + ), + ], +) +def test_string_cypher_failfast_rejects_with_match_reentry_unsupported_shapes(query: str, match: str) -> None: + with pytest.raises(GFQLValidationError, match=match): + _mk_reentry_carried_scalar_graph().gfql(query) + + def test_string_cypher_executes_seeded_multihop_then_with_match_reentry_shape() -> None: nodes = pd.DataFrame( { diff --git a/graphistry/tests/compute/test_gfql.py b/graphistry/tests/compute/test_gfql.py index 7bed3cc17a..5a861e82f3 100644 --- a/graphistry/tests/compute/test_gfql.py +++ b/graphistry/tests/compute/test_gfql.py @@ -1,8 +1,12 @@ import pandas as pd import pytest +from typing import Any, Dict, List from graphistry.compute.ast import ASTLet, ASTRef, n, e from graphistry.compute.chain import Chain from graphistry.compute.exceptions import ErrorCode, GFQLSyntaxError, GFQLValidationError +from graphistry.compute.gfql.cypher import compile_cypher +from graphistry.compute.gfql.cypher.lowering import _reentry_hidden_column_name +from graphistry.compute.gfql_unified import _compiled_query_reentry_state from graphistry.tests.test_compute import CGFull # Suppress deprecation warnings for chain() method in this test file @@ -16,6 +20,13 @@ def _mk_graph(ids, types, src, dst): return CGFull().nodes(nodes_df, "id").edges(edges_df, "s", "d") +def _mk_cudf_graph(ids, types, src, dst): + cudf = pytest.importorskip("cudf") + nodes_df = cudf.from_pandas(pd.DataFrame({"id": ids, "type": types})) + edges_df = cudf.from_pandas(pd.DataFrame({"s": src, "d": dst})) + return CGFull().nodes(nodes_df, "id").edges(edges_df, "s", "d") + + def _mk_people_company_graph3(): return _mk_graph( ids=["a", "b", "c"], @@ -38,6 +49,51 @@ def _mk_empty_graph(): return _mk_graph(ids=[], types=[], src=[], dst=[]) +def _mk_reentry_scalar_graph(): + nodes_df = pd.DataFrame( + { + "id": ["a1", "a2", "b1", "b2"], + "label__A": [True, True, False, False], + "num": [1, 2, 1, 3], + } + ) + edges_df = pd.DataFrame( + { + "s": ["a1", "a2"], + "d": ["b1", "b2"], + "type": ["R", "R"], + } + ) + return CGFull().nodes(nodes_df, "id").edges(edges_df, "s", "d") + + +def _mk_reentry_scalar_graph_cudf(): + cudf = pytest.importorskip("cudf") + nodes_df = cudf.from_pandas( + pd.DataFrame( + { + "id": ["a1", "a2", "b1", "b2"], + "label__A": [True, True, False, False], + "num": [1, 2, 1, 3], + } + ) + ) + edges_df = cudf.from_pandas( + pd.DataFrame( + { + "s": ["a1", "a2"], + "d": ["b1", "b2"], + "type": ["R", "R"], + } + ) + ) + return CGFull().nodes(nodes_df, "id").edges(edges_df, "s", "d") + + +def _to_pandas_df(df): + return df.to_pandas() if hasattr(df, "to_pandas") else df + + class TestGFQLAPI: """Test unified GFQL API and migration""" @@ -462,3 +518,285 @@ def test_gfql_chain_dict_envelope(self): result = g.gfql(chain_dict) assert len(result._nodes) == 2 assert all(result._nodes['type'] == 'person') + + +class TestGFQLCypherReentryCarrier: + + _BASE_A_ROWS = { + "a1": {"id": "a1", "label__A": True, "num": 1}, + "a2": {"id": "a2", "label__A": True, "num": 2}, + } + + @staticmethod + def _compile_reentry_query( + with_clause: str = "a, a.num AS property", + *, + match_alias: str = "a", + ): + return compile_cypher( + "MATCH (a:A) " + f"WITH {with_clause} " + f"MATCH ({match_alias})-->(b) " + "RETURN b.id AS bid" + ) + + @staticmethod + def _bind_reentry_prefix_result( + g, + rows: Dict[str, List[Any]], + ids: List[Any], + *, + output_name: str = "a", + engine: str = "pandas", + ): + prefix_result = g.bind() + if engine == "cudf": + cudf = pytest.importorskip("cudf") + prefix_result._nodes = cudf.from_pandas(pd.DataFrame(rows)) + meta_ids = cudf.Series(ids, name="id") + else: + prefix_result._nodes = pd.DataFrame(rows) + meta_ids = pd.Series(ids, name="id") + prefix_result._cypher_entity_projection_meta = { + output_name: { + "table": "nodes", + "alias": output_name, + "id_column": "id", + "ids": meta_ids, + } + } + return prefix_result + + @staticmethod + def _carry_by_id(dispatch_graph): + # Compare hidden carrier columns by node id so pandas and cudf assert the same contract. + return { + row["id"]: {key: value for key, value in row.items() if key.startswith("__cypher_reentry_")} + for row in _to_pandas_df(dispatch_graph._nodes).to_dict(orient="records") + if row["id"] in {"a1", "a2"} and any(key.startswith("__cypher_reentry_") for key in row) + } + + @staticmethod + def _hidden_updates(**values: Any) -> Dict[str, Any]: + return { + _reentry_hidden_column_name(output_name): value + for output_name, value in values.items() + } + + @classmethod + def _expected_reentry_rows(cls, ordered_ids: List[str], carry_values_by_id: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: + return [ + { + **cls._BASE_A_ROWS[node_id], + **cls._hidden_updates(**carry_values_by_id.get(node_id, {})), + } + for node_id in ordered_ids + ] + + @staticmethod + def _run_reentry_state(g, compiled, prefix_result, *, engine: str = "pandas"): + return _compiled_query_reentry_state( + g, + compiled, + prefix_result, + engine=engine, + ) + + @staticmethod + def _assert_hidden_columns_preserved(g, expected_values_by_column: Dict[str, List[Any]]): + for column, expected_values in expected_values_by_column.items(): + series = _to_pandas_df(g._nodes[[column]])[column].reset_index(drop=True) + assert len(series) == len(expected_values) + for actual, expected in zip(series.tolist(), expected_values): + if pd.isna(expected): + assert pd.isna(actual) + else: + assert actual == expected + + def _assert_reentry_state_by_id( + self, + *, + g, + compiled, + prefix_result, + ordered_ids, + carry_values_by_id, + expect_same_graph, + engine: str = "pandas", + ): + dispatch_graph, start_nodes = self._run_reentry_state(g, compiled, prefix_result, engine=engine) + assert (dispatch_graph is g) is expect_same_graph + # Assert rows first so backend-neutral carrier assertions can key off the same ordered ids. + assert _to_pandas_df(start_nodes).to_dict(orient="records") == self._expected_reentry_rows( + ordered_ids, + carry_values_by_id, + ) + assert self._carry_by_id(dispatch_graph) == { + node_id: self._hidden_updates(**values) + for node_id, values in carry_values_by_id.items() + } + + def test_reentry_state_preserves_prefix_order_without_carried_columns(self): + g = _mk_reentry_scalar_graph() + self._assert_reentry_state_by_id( + g=g, + compiled=compile_cypher( + "MATCH (a:A) " + "WITH a " + "MATCH (a)-->(b) " + "RETURN b.id AS bid" + ), + prefix_result=g.gfql("MATCH (a:A) WITH a ORDER BY a.num DESC RETURN a"), + ordered_ids=["a2", "a1"], + carry_values_by_id={}, + expect_same_graph=True, + ) + + def test_reentry_state_preserves_cudf_backend_without_carried_columns(self): + pytest.importorskip("cudf") + g = _mk_reentry_scalar_graph_cudf() + compiled = compile_cypher( + "MATCH (a:A) " + "WITH a " + "MATCH (a)-->(b) " + "RETURN b.id AS bid" + ) + prefix_result = g.gfql("MATCH (a:A) WITH a ORDER BY a.num DESC RETURN a", engine="cudf") + + dispatch_graph, start_nodes = self._run_reentry_state(g, compiled, prefix_result, engine="cudf") + + assert dispatch_graph is g + assert type(dispatch_graph._nodes).__module__.startswith("cudf") + assert type(start_nodes).__module__.startswith("cudf") + assert _to_pandas_df(start_nodes).to_dict(orient="records") == self._expected_reentry_rows( + ["a2", "a1"], + {}, + ) + + @pytest.mark.parametrize( + ("rows", "ids", "match"), + [ + ({"property": [1, 1]}, ["a1", "a1"], "unique carried node rows"), + ({"property": [1]}, ["a1", "a2"], "metadata row counts disagreed"), + ], + ) + def test_reentry_state_rejects_invalid_carried_scalar_rows(self, rows, ids, match): + g = _mk_reentry_scalar_graph() + with pytest.raises(GFQLValidationError, match=match): + self._run_reentry_state( + g, + self._compile_reentry_query(), + self._bind_reentry_prefix_result(g, rows=rows, ids=ids), + ) + + def test_reentry_state_filters_null_carried_ids_before_aligning_scalar_payload(self): + g = _mk_reentry_scalar_graph() + self._assert_reentry_state_by_id( + g=g, + compiled=self._compile_reentry_query(), + prefix_result=self._bind_reentry_prefix_result( + g, + rows={"property": [10, 20, 30]}, + ids=["a1", None, "a2"], + ), + ordered_ids=["a1", "a2"], + carry_values_by_id={ + "a1": {"property": 10}, + "a2": {"property": 30}, + }, + expect_same_graph=False, + ) + + def test_reentry_state_preserves_cudf_backend_with_carried_scalars(self): + pytest.importorskip("cudf") + g = _mk_reentry_scalar_graph_cudf() + compiled = self._compile_reentry_query() + prefix_result = g.gfql( + "MATCH (a:A) WITH a, a.num AS property RETURN a, property ORDER BY property DESC", + engine="cudf", + ) + + dispatch_graph, start_nodes = self._run_reentry_state(g, compiled, prefix_result, engine="cudf") + + assert dispatch_graph is not g + assert type(dispatch_graph._nodes).__module__.startswith("cudf") + assert type(start_nodes).__module__.startswith("cudf") + assert _to_pandas_df(start_nodes).to_dict(orient="records") == self._expected_reentry_rows( + ["a2", "a1"], + {"a1": {"property": 1}, "a2": {"property": 2}}, + ) + + @pytest.mark.parametrize( + ("with_clause", "rows", "carry_values_by_id", "existing_hidden_values"), + [ + ( + "a, a.num AS property", + {"property": [2, 1]}, + { + "a1": {"property": 1}, + "a2": {"property": 2}, + }, + {"__cypher_reentry_property__": ["orig1", "orig2", None, None]}, + ), + ( + "a, a.num AS property, a.num + 10 AS property2", + {"property": [2, 1], "property2": [12, 11]}, + { + "a1": {"property": 1, "property2": 11}, + "a2": {"property": 2, "property2": 12}, + }, + { + "__cypher_reentry_property__": ["orig1", "orig2", None, None], + "__cypher_reentry_property2__": ["keep1", "keep2", None, None], + }, + ), + ], + ) + def test_reentry_state_overrides_internal_hidden_column_collisions( + self, + with_clause, + rows, + carry_values_by_id, + existing_hidden_values, + ): + g = _mk_reentry_scalar_graph() + g._nodes = g._nodes.assign(**existing_hidden_values) + + self._assert_reentry_state_by_id( + g=g, + compiled=self._compile_reentry_query(with_clause), + prefix_result=self._bind_reentry_prefix_result( + g, + rows=rows, + ids=["a2", "a1"], + ), + ordered_ids=["a2", "a1"], + carry_values_by_id=carry_values_by_id, + expect_same_graph=False, + ) + + self._assert_hidden_columns_preserved(g, existing_hidden_values) + + def test_reentry_state_preserves_cudf_backend_when_hidden_columns_collide(self): + pytest.importorskip("cudf") + g = _mk_reentry_scalar_graph_cudf() + g._nodes = g._nodes.assign(__cypher_reentry_property__=["orig1", "orig2", None, None]) + + dispatch_graph, start_nodes = self._run_reentry_state( + g, + self._compile_reentry_query(), + g.gfql( + "MATCH (a:A) WITH a, a.num AS property RETURN a, property ORDER BY property DESC", + engine="cudf", + ), + engine="cudf", + ) + + assert dispatch_graph is not g + assert type(dispatch_graph._nodes).__module__.startswith("cudf") + assert type(start_nodes).__module__.startswith("cudf") + assert _to_pandas_df(start_nodes).to_dict(orient="records") == self._expected_reentry_rows( + ["a2", "a1"], + {"a1": {"property": 1}, "a2": {"property": 2}}, + ) + self._assert_hidden_columns_preserved(g, {"__cypher_reentry_property__": ["orig1", "orig2", None, None]})