@@ -719,13 +719,23 @@ def test(self, providers=providers): # default arg to capture loop value
719719 ** yaml_transform .SafeLineLoader .strip_metadata (
720720 fixture .get ('config' , {}))))
721721 for pipeline_spec in spec ['pipelines' ]:
722- with beam .Pipeline (options = PipelineOptions (
723- pickle_library = 'cloudpickle' ,
724- ** replace_recursive (yaml_transform .SafeLineLoader .strip_metadata (
725- pipeline_spec .get ('options' , {})),
726- vars ))) as p :
727- yaml_transform .expand_pipeline (
728- p , replace_recursive (pipeline_spec , vars ))
722+ try :
723+ with beam .Pipeline (options = PipelineOptions (
724+ pickle_library = 'cloudpickle' ,
725+ ** replace_recursive (
726+ yaml_transform .SafeLineLoader .strip_metadata (
727+ pipeline_spec .get ('options' , {})),
728+ vars ))) as p :
729+ yaml_transform .expand_pipeline (
730+ p , replace_recursive (pipeline_spec , vars ))
731+ except ValueError as exn :
732+ # FnApiRunner currently does not support this requirement in
733+ # some xlang scenarios (e.g. Iceberg YAML pipelines).
734+ if 'beam:requirement:pardo:on_window_expiration:v1' in str (exn ):
735+ self .skipTest (
736+ 'Runner does not support '
737+ 'beam:requirement:pardo:on_window_expiration:v1' )
738+ raise
729739
730740 yield f'test_{ suffix } ' , test
731741
0 commit comments