-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathpipeline.py
More file actions
305 lines (247 loc) · 9.49 KB
/
pipeline.py
File metadata and controls
305 lines (247 loc) · 9.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
from __future__ import annotations
import json
import logging
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable
import numpy as np
import pathlib
try:
from meshio import Mesh
except ModuleNotFoundError: # pragma: no cover
Mesh = None
try:
from pandas import DataFrame
except ModuleNotFoundError: # pragma: no cover
DataFrame = None
try:
from geopandas import GeoDataFrame, points_from_xy
except ModuleNotFoundError: # pragma: no cover
GeoDataFrame = points_from_xy = None
from . import drivers, libpdalpython
LogLevelToPDAL = {
logging.ERROR: 0,
logging.WARNING: 1,
logging.INFO: 2,
logging.DEBUG: 8, # pdal::LogLevel::Debug5
}
LogLevelFromPDAL = {v: k for k, v in LogLevelToPDAL.items()}
class Pipeline(libpdalpython.Pipeline):
def __init__(
self,
spec: Union[None, str, Sequence[Stage]] = None,
arrays: Sequence[np.ndarray] = (),
loglevel: int = logging.ERROR,
json: Optional[str] = None,
dataframes: Sequence[DataFrame] = (),
stream_handlers: Sequence[Callable[[], int]] = (),
):
if json:
if spec and json:
raise ValueError("provide 'spec' or 'json' arguments, not both")
spec = json
# Convert our data frames to Numpy Structured Arrays
if dataframes:
arrays = [df.to_records() if not "geometry" in df.columns else df.drop(columns=["geometry"]).to_records() for df in dataframes]
super().__init__()
self._stages: List[Stage] = []
if spec:
stages = _parse_stages(spec) if isinstance(spec, str) else spec
for stage in stages:
self |= stage
if stream_handlers:
if len(stream_handlers) != len(arrays):
raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes")
self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)]
else:
self.inputs = [(a, None) for a in arrays]
self.loglevel = loglevel
def __getstate__(self):
state = self.pipeline
return state
def __setstate__(self, state):
self.__init__(state)
@property
def stages(self) -> List[Stage]:
return list(self._stages)
@property
def streamable(self) -> bool:
return all(stage.streamable for stage in self._stages)
@property
def loglevel(self) -> int:
return LogLevelFromPDAL[super().loglevel]
@loglevel.setter
def loglevel(self, value: int) -> None:
try:
loglevel = LogLevelToPDAL[value]
except KeyError:
raise ValueError(f"Invalid level {value!r}")
# super() property setter is not supported
libpdalpython.Pipeline.loglevel.__set__(self, loglevel)
def __ior__(self, other: Union[Stage, Pipeline]) -> Pipeline:
if isinstance(other, Stage):
self._stages.append(other)
elif isinstance(other, Pipeline):
if self._stages and other._has_inputs:
raise ValueError(
"A pipeline with inputs cannot follow another pipeline"
)
self._stages.extend(other._stages)
else:
raise TypeError(f"Expected Stage or Pipeline, not {other}")
self._del_executor()
return self
def __or__(self, other: Union[Stage, Pipeline]) -> Pipeline:
new = self.__copy__()
new |= other
return new
def __copy__(self) -> Pipeline:
clone = self.__class__(loglevel=self.loglevel)
clone._copy_inputs(self)
clone |= self
return clone
def get_meshio(self, idx: int) -> Optional[Mesh]:
if Mesh is None: # pragma: no cover
raise RuntimeError(
"The get_meshio function can only be used if you have installed meshio. "
"Try pip install meshio"
)
array = self.arrays[idx]
mesh = self.meshes[idx]
if len(mesh) == 0:
return None
return Mesh(
np.stack((array["X"], array["Y"], array["Z"]), 1),
[("triangle", np.stack((mesh["A"], mesh["B"], mesh["C"]), 1))],
)
def get_dataframe(self, idx: int) -> Optional[DataFrame]:
if DataFrame is None:
raise RuntimeError("Pandas support requires Pandas to be installed")
return DataFrame(self.arrays[idx])
def get_geodataframe(self, idx: int, xyz: bool=False, crs: Any=None) -> Optional[GeoDataFrame]:
if GeoDataFrame is None:
raise RuntimeError("GeoPandas support requires GeoPandas to be installed")
df = DataFrame(self.arrays[idx])
coords = [df["X"], df["Y"], df["Z"]] if xyz else [df["X"], df["Y"]]
geometry = points_from_xy(*coords)
gdf = GeoDataFrame(
df,
geometry=geometry,
crs=crs,
)
df = coords = geometry = None
return gdf
def _get_json(self) -> str:
return self.toJSON()
def toJSON(self) -> str:
options_list = []
stage2tag: Dict[Stage, str] = {}
stages = self._stages
if all(isinstance(stage, Reader) for stage in stages):
stages = [*stages, Filter.merge()]
for stage in stages:
stage2tag[stage] = stage.tag or _generate_tag(stage, stage2tag.values())
options = stage.options
for option in options:
if isinstance(options[option], pathlib.Path):
options[option] = str(options[option])
options["tag"] = stage2tag[stage]
options["type"] = stage.type
inputs = _get_input_tags(stage, stage2tag)
if inputs:
options["inputs"] = inputs
options_list.append(options)
return json.dumps(options_list)
class Stage:
def __init__(self, **options: Any):
self._options = options
@property
def type(self) -> str:
return cast(str, self._options["type"])
@property
def streamable(self) -> bool:
return self.type in drivers.StreamableTypes
@property
def tag(self) -> Optional[str]:
return self._options.get("tag")
@property
def inputs(self) -> List[Union[Stage, str]]:
inputs = self._options.get("inputs", ())
return [inputs] if isinstance(inputs, (Stage, str)) else list(inputs)
@property
def options(self) -> Dict[str, Any]:
return dict(self._options)
def pipeline(self, *arrays: np.ndarray, loglevel: int = logging.ERROR) -> Pipeline:
return Pipeline((self,), arrays, loglevel)
def __or__(self, other: Union[Stage, Pipeline]) -> Pipeline:
return Pipeline((self, other))
class InferableTypeStage(Stage):
def __init__(self, filename: Optional[str] = None, **options: Any):
if filename:
if isinstance(filename, dict):
if "path" not in filename:
raise ValueError(f"'path' is missing in the provided filespec: {filename}")
options["filename"] = filename["path"]
else:
options["filename"] = filename
super().__init__(**options)
@property
def type(self) -> str:
try:
return super().type
except KeyError:
filename = self._options.get("filename")
return str(self._infer_type(filename) if filename else "")
_infer_type = staticmethod(lambda filename: "")
class Reader(InferableTypeStage):
_infer_type = staticmethod(libpdalpython.infer_reader_driver)
class Filter(Stage):
def __init__(self, type: str, **options: Any):
super().__init__(type=type, **options)
class Writer(InferableTypeStage):
_infer_type = staticmethod(libpdalpython.infer_writer_driver)
def _parse_stages(text: str) -> Iterator[Stage]:
json_stages = json.loads(text)
if isinstance(json_stages, dict):
json_stages = json_stages.get("pipeline")
if not isinstance(json_stages, list):
raise ValueError("root element is not a pipeline")
last = len(json_stages) - 1
for i, options in enumerate(json_stages):
if not isinstance(options, dict):
if isinstance(options, str):
options = {"filename": options}
else:
raise ValueError("A stage element must be string or dict")
stage_type = options.get("type")
if stage_type:
is_reader = stage_type.startswith("readers.")
else:
# The type is inferred from a filename as a reader if it's not
# the last stage or if there's only one.
is_reader = i == 0 or i != last
if is_reader:
yield Reader(**options)
elif not stage_type or stage_type.startswith("writers."):
yield Writer(**options)
else:
yield Filter(**options)
def _generate_tag(stage: Stage, tags: Container[str]) -> str:
tag_prefix = stage.type.replace(".", "_")
i = 1
while True:
tag = tag_prefix + str(i)
if tag not in tags:
return tag
i += 1
def _get_input_tags(stage: Stage, stage2tag: Dict[Stage, str]) -> List[str]:
tags = []
for input in stage.inputs:
if isinstance(input, Stage):
try:
tags.append(stage2tag[input])
except KeyError:
raise RuntimeError(
f"Invalid pipeline: Undefined stage " f"{input.tag or input.type!r}"
)
else:
tags.append(input)
return tags