diff --git a/verdin/datasource.py b/verdin/datasource.py index 6fc6b6d..b4c1c08 100644 --- a/verdin/datasource.py +++ b/verdin/datasource.py @@ -3,12 +3,15 @@ import json import logging import os -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union, TYPE_CHECKING import requests from . import config +if TYPE_CHECKING: + from _typeshed import SupportsWrite + LOG = logging.getLogger(__name__) Record = Union[Tuple, List[Any]] @@ -16,12 +19,26 @@ def to_csv(records: Records, **kwargs) -> str: + """ + Convert the given records to CSV using a CSV writer, and return them as a single string. + + :param records: The records to convert to CSV. + :param kwargs: Args to be passed to ``csv.writer``. + :return: A string representing the CSV + """ output = io.StringIO() write_csv(output, records, **kwargs) return output.getvalue() -def write_csv(file, records: Records, **kwargs): +def write_csv(file: "SupportsWrite[str]", records: Records, **kwargs): + """ + Converts the given records to CSV and writes them to the given file. + + :param file: The file passed to the CSV writer. + :param records: The records to convert to CSV. + :param kwargs: Args to be passed to ``csv.writer``. + """ # TODO: do proper type conversion here to optimize for CSV input # see: https://guides.tinybird.co/guide/fine-tuning-csvs-for-fast-ingestion @@ -45,24 +62,39 @@ class Datasource: name: str version: Optional[int] - def __init__(self, name, token, version: int = None, api=None) -> None: + def __init__(self, name, token, version: int = None, api: str = None) -> None: self.name = name self.token = token self.version = version self.api = (api or config.API_URL).rstrip("/") + self.endpoint @property - def canonical_name(self): + def canonical_name(self) -> str: + """ + Returns the name of the table that can be queried. If a version is specified, the name will be suffixed with + ``__v``. Otherwise, this just returns the name. Note that versions are discouraged in the current + tinybird workflows. + + :return: The canonical name of the table that can be used in queries + """ if self.version is not None: return f"{self.name}__v{self.version}" else: return self.name - def append(self, records: List[Record], *args, **kwargs) -> requests.Response: + def append(self, records: Records, *args, **kwargs) -> requests.Response: + """Calls ``append_csv``.""" # TODO: replicate tinybird API concepts instead of returning Response return self.append_csv(records, *args, **kwargs) - def append_csv(self, records: List[Record], delimiter: str = ",") -> requests.Response: + def append_csv(self, records: Records, delimiter: str = ",") -> requests.Response: + """ + Makes a POST request to the datasource using mode=append with CSV data. This appends data to the table. + + :param records: List of records to append. The will be converted to CSV using the provided delimiter. + :param delimiter: Optional delimiter (defaults to ",") + :return: The HTTP response + """ params = {"name": self.canonical_name, "mode": "append"} if delimiter: params["dialect_delimiter"] = delimiter @@ -84,6 +116,12 @@ def append_csv(self, records: List[Record], delimiter: str = ",") -> requests.Re return requests.post(url=self.api, params=params, headers=headers, data=data) def append_ndjson(self, records: List[Dict]) -> requests.Response: + """ + Makes a POST request to the datasource using mode=append with ndjson data. This appends data to the table. + + :param records: List of JSON records to append. The will be converted to NDJSON using ``json.dumps`` + :return: The HTTP response + """ # TODO: generalize appending in different formats query = {"name": self.canonical_name, "mode": "append", "format": "ndjson"} @@ -130,14 +168,16 @@ def __repr__(self): class FileDatasource(Datasource): - # for debugging/development purposes + """ + Datasource that writes into a file, used for testing and development purposes. + """ def __init__(self, path: str): name = os.path.basename(path) super().__init__(name, None) self.path = path - def append(self, records: Records) -> requests.Response: + def append_csv(self, records: Records, *args, **kwargs) -> requests.Response: if records: with open(self.path, "a") as fd: write_csv(fd, records) @@ -152,3 +192,6 @@ def append_ndjson(self, records: List[Dict]) -> requests.Response: def readlines(self) -> List[str]: with open(self.path, "r") as fd: return fd.readlines() + + def truncate(self): + raise NotImplementedError diff --git a/verdin/pipe.py b/verdin/pipe.py index 163e204..fb200b8 100644 --- a/verdin/pipe.py +++ b/verdin/pipe.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Dict, Iterator, List, Optional, Tuple +from typing import Any, Iterator, Optional import requests @@ -7,41 +7,68 @@ LOG = logging.getLogger(__name__) -PipeMetadata = List[Tuple[str, str]] -PipeJsonData = List[Dict[str, Any]] +PipeMetadata = list[tuple[str, str]] +PipeJsonData = list[dict[str, Any]] class PipeError(Exception): + """ + Wrapper of the HTTP response returned by a Pipe query if the HTTP response is not a 2XX code. + """ + response: requests.Response - def __init__(self, response) -> None: + def __init__(self, response: requests.Response) -> None: self.response = response - self.json: Dict = response.json() + self.json: dict = response.json() super().__init__(self.description) @property - def description(self): + def description(self) -> str: return self.json.get("error") class PipeJsonResponse: + """ + Wrapper of the HTTP response returned by a Pipe query. + """ + response: requests.Response - result: Dict + result: dict - def __init__(self, response): + def __init__(self, response: requests.Response): self.response = response self.result = response.json() @property - def empty(self): + def empty(self) -> bool: + """ + A property to check if the data in the result is empty. + + This property evaluates whether the "data" field within the "result" + attribute is empty. + + :return: Returns True if the "data" field in "result" is missing or empty, + otherwise returns False. + """ return not self.result.get("data") @property def meta(self) -> PipeMetadata: + """ + Returns the PipeMetadata from the query, which includes attributes and their types. + + :return: The PipeMetadata + """ return [(t["name"], t["type"]) for t in self.result.get("meta", [])] @property def data(self) -> PipeJsonData: + """ + Returns the data from the query, which is a list of dictionaries representing the rows of the query result. + + :return: The PipeJsonData + """ return self.result.get("data") @@ -83,7 +110,7 @@ class Pipe: version: Optional[int] resource: str - def __init__(self, name, token, version: int = None, api=None) -> None: + def __init__(self, name, token, version: int = None, api: str = None) -> None: super().__init__() self.name = name self.token = token @@ -91,18 +118,39 @@ def __init__(self, name, token, version: int = None, api=None) -> None: self.resource = (api or config.API_URL).rstrip("/") + self.endpoint @property - def canonical_name(self): + def canonical_name(self) -> str: + """ + Returns the name of the pipe that can be queried. If a version is specified, the name will be suffixed with + ``__v``. Otherwise, this just returns the name. Note that versions are discouraged in the current + tinybird workflows. + + :return: The canonical name of the pipe that can be used in queries + """ if self.version is not None: return f"{self.name}__v{self.version}" else: return self.name @property - def pipe_url(self): + def pipe_url(self) -> str: + """ + Returns the API URL of this pipe. It's something like ``https://api.tinybird.co/v0/pipes/my_pipe.json``. + + :return: The Pipe API URL + """ return self.resource + "/" + self.canonical_name + ".json" - def query(self, params=None) -> PipeJsonResponse: - params = params or dict() + def query(self, params: dict[str, str] = None) -> PipeJsonResponse: + """ + Query the pipe endpoint using the given dynamic parameters. Note that the pipe needs to be exposed as an + endpoint. + + See: https://www.tinybird.co/docs/forward/work-with-data/query-parameters#use-pipes-api-endpoints-with-dynamic-parameters + + :param params: The dynamic parameters of the pipe and the values for your query + :return: a PipeJsonResponse containing the result of the query + """ + params = params or {} if "token" not in params and self.token: params["token"] = self.token @@ -114,6 +162,15 @@ def query(self, params=None) -> PipeJsonResponse: raise PipeError(response) def pages(self, page_size: int = 50, start_at: int = 0) -> PipePageIterator: + """ + Returns an iterator over the pipe's data pages. Each page contains ``page_size`` records. + + TODO: currently we don't support dynamic parameters with paged queries + + :param page_size: The size of each page (default 50) + :param start_at: The offset at which to start (default 0) + :return: + """ return PagedPipeQuery(pipe=self, page_size=page_size, start_at=start_at) def sql(self, query: str) -> PipeJsonResponse: @@ -123,6 +180,9 @@ def sql(self, query: str) -> PipeJsonResponse: pipe.sql("select count() from _") See https://docs.tinybird.co/api-reference/query-api.html + + :param query: The SQL query to run + :return: The result of the query """ headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} params = {"q": query} diff --git a/verdin/query.py b/verdin/query.py index 7945818..4d1f5c5 100644 --- a/verdin/query.py +++ b/verdin/query.py @@ -1,6 +1,6 @@ import enum import logging -from typing import Any, Dict, List, Optional, TypedDict +from typing import Any, Optional, TypedDict import requests @@ -31,12 +31,13 @@ class Statistics(TypedDict): bytes_read: int -JsonData = Dict[str, Any] +JsonData = dict[str, Any] +QueryJsonData = list[dict[str, Any]] class JsonResult(TypedDict): - meta: List[QueryMetadata] - data: List[JsonData] + meta: list[QueryMetadata] + data: QueryJsonData rows: int statistics: Statistics @@ -51,14 +52,33 @@ def __init__(self, response: requests.Response): @property def empty(self): + """ + A property to check if the data in the result is empty. + + This property evaluates whether the "data" field within the "result" + attribute is empty. + + :return: Returns True if the "data" field in "result" is missing or empty, + otherwise returns False. + """ return not self.result.get("data") @property - def meta(self) -> List[QueryMetadata]: + def meta(self) -> list[QueryMetadata]: + """ + Returns the QueryMetadata from the query, which includes attributes and their types. + + :return: The QueryMetadata + """ return self.result.get("meta") @property - def data(self) -> List[JsonData]: + def data(self) -> QueryJsonData: + """ + Returns the data from the query, which is a list of dictionaries representing the rows of the query result. + + :return: The QueryJsonData + """ return self.result.get("data") @@ -85,14 +105,23 @@ class SqlQuery: sql: str format: Optional[OutputFormat] - def __init__(self, sql: str, token, format: Optional[OutputFormat] = None, api=None) -> None: + def __init__( + self, sql: str, token, format: Optional[OutputFormat] = None, api: str = None + ) -> None: self.sql = sql self.format = format or OutputFormat.JSON self.token = token self.api = (api or config.API_URL).rstrip("/") + self.endpoint - def get(self, format: Optional[OutputFormat] = None): - # TODO: replicate tinybird API concepts instead of returning Response + def get(self, format: Optional[OutputFormat] = None) -> requests.Response: + """ + Runs the query and returns the response. + + TODO: replicate tinybird API concepts instead of returning Response + + :param format: Overwrite the default output format set in the constructor. + :return: the HTTP response + """ query = {"q": self._sql_with_format(format or self.format)} headers = {"Content-Type": "text/html; charset=utf-8"} @@ -112,11 +141,23 @@ def get(self, format: Optional[OutputFormat] = None): return response def json(self) -> QueryJsonResult: + """ + Runs the query and returns the result in JSON output format. + + :return: A QueryJsonResult containing the result of the query. + """ response = self.get(OutputFormat.JSON) return QueryJsonResult(response) - def _sql_with_format(self, output_format: Optional[OutputFormat] = None): + def _sql_with_format(self, output_format: Optional[OutputFormat] = None) -> str: + """ + Returns a formatted SQL query with the given output format. If no output format is specified, the query is + returned as is. + + :param output_format: The output format to use (suffixes ``FORMAT `` to the query) + :return: An SQL string + """ # TODO: handle potentially already existing FORMAT string if not output_format: return self.sql diff --git a/verdin/worker.py b/verdin/worker.py index b831955..abc98e7 100644 --- a/verdin/worker.py +++ b/verdin/worker.py @@ -1,3 +1,9 @@ +""" +This module contains a worker that reads batches of records from a Queue and appends them to a Datasource. It provides +an opinionated way to ingest data into tinybird from a python process. Note this worker does not use the ``/v0/events`` +API, but instead uses the datasource's append functionality, which has higher rate limits. +""" + import logging import multiprocessing import time