Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 51 additions & 8 deletions verdin/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,42 @@
import json
import logging
import os
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union, TYPE_CHECKING

import requests

from . import config

if TYPE_CHECKING:
from _typeshed import SupportsWrite

LOG = logging.getLogger(__name__)

Record = Union[Tuple, List[Any]]
Records = List[Record]


def to_csv(records: Records, **kwargs) -> str:
"""
Convert the given records to CSV using a CSV writer, and return them as a single string.

:param records: The records to convert to CSV.
:param kwargs: Args to be passed to ``csv.writer``.
:return: A string representing the CSV
"""
output = io.StringIO()
write_csv(output, records, **kwargs)
return output.getvalue()


def write_csv(file, records: Records, **kwargs):
def write_csv(file: "SupportsWrite[str]", records: Records, **kwargs):
"""
Converts the given records to CSV and writes them to the given file.

:param file: The file passed to the CSV writer.
:param records: The records to convert to CSV.
:param kwargs: Args to be passed to ``csv.writer``.
"""
# TODO: do proper type conversion here to optimize for CSV input
# see: https://guides.tinybird.co/guide/fine-tuning-csvs-for-fast-ingestion

Expand All @@ -45,24 +62,39 @@ class Datasource:
name: str
version: Optional[int]

def __init__(self, name, token, version: int = None, api=None) -> None:
def __init__(self, name, token, version: int = None, api: str = None) -> None:
self.name = name
self.token = token
self.version = version
self.api = (api or config.API_URL).rstrip("/") + self.endpoint

@property
def canonical_name(self):
def canonical_name(self) -> str:
"""
Returns the name of the table that can be queried. If a version is specified, the name will be suffixed with
``__v<version>``. Otherwise, this just returns the name. Note that versions are discouraged in the current
tinybird workflows.

:return: The canonical name of the table that can be used in queries
"""
if self.version is not None:
return f"{self.name}__v{self.version}"
else:
return self.name

def append(self, records: List[Record], *args, **kwargs) -> requests.Response:
def append(self, records: Records, *args, **kwargs) -> requests.Response:
"""Calls ``append_csv``."""
# TODO: replicate tinybird API concepts instead of returning Response
return self.append_csv(records, *args, **kwargs)

def append_csv(self, records: List[Record], delimiter: str = ",") -> requests.Response:
def append_csv(self, records: Records, delimiter: str = ",") -> requests.Response:
"""
Makes a POST request to the datasource using mode=append with CSV data. This appends data to the table.

:param records: List of records to append. The will be converted to CSV using the provided delimiter.
:param delimiter: Optional delimiter (defaults to ",")
:return: The HTTP response
"""
params = {"name": self.canonical_name, "mode": "append"}
if delimiter:
params["dialect_delimiter"] = delimiter
Expand All @@ -84,6 +116,12 @@ def append_csv(self, records: List[Record], delimiter: str = ",") -> requests.Re
return requests.post(url=self.api, params=params, headers=headers, data=data)

def append_ndjson(self, records: List[Dict]) -> requests.Response:
"""
Makes a POST request to the datasource using mode=append with ndjson data. This appends data to the table.

:param records: List of JSON records to append. The will be converted to NDJSON using ``json.dumps``
:return: The HTTP response
"""
# TODO: generalize appending in different formats
query = {"name": self.canonical_name, "mode": "append", "format": "ndjson"}

Expand Down Expand Up @@ -130,14 +168,16 @@ def __repr__(self):


class FileDatasource(Datasource):
# for debugging/development purposes
"""
Datasource that writes into a file, used for testing and development purposes.
"""

def __init__(self, path: str):
name = os.path.basename(path)
super().__init__(name, None)
self.path = path

def append(self, records: Records) -> requests.Response:
def append_csv(self, records: Records, *args, **kwargs) -> requests.Response:
if records:
with open(self.path, "a") as fd:
write_csv(fd, records)
Expand All @@ -152,3 +192,6 @@ def append_ndjson(self, records: List[Dict]) -> requests.Response:
def readlines(self) -> List[str]:
with open(self.path, "r") as fd:
return fd.readlines()

def truncate(self):
raise NotImplementedError
88 changes: 74 additions & 14 deletions verdin/pipe.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,74 @@
import logging
from typing import Any, Dict, Iterator, List, Optional, Tuple
from typing import Any, Iterator, Optional

import requests

from . import config

LOG = logging.getLogger(__name__)

PipeMetadata = List[Tuple[str, str]]
PipeJsonData = List[Dict[str, Any]]
PipeMetadata = list[tuple[str, str]]
PipeJsonData = list[dict[str, Any]]


class PipeError(Exception):
"""
Wrapper of the HTTP response returned by a Pipe query if the HTTP response is not a 2XX code.
"""

response: requests.Response

def __init__(self, response) -> None:
def __init__(self, response: requests.Response) -> None:
self.response = response
self.json: Dict = response.json()
self.json: dict = response.json()
super().__init__(self.description)

@property
def description(self):
def description(self) -> str:
return self.json.get("error")


class PipeJsonResponse:
"""
Wrapper of the HTTP response returned by a Pipe query.
"""

response: requests.Response
result: Dict
result: dict

def __init__(self, response):
def __init__(self, response: requests.Response):
self.response = response
self.result = response.json()

@property
def empty(self):
def empty(self) -> bool:
"""
A property to check if the data in the result is empty.

This property evaluates whether the "data" field within the "result"
attribute is empty.

:return: Returns True if the "data" field in "result" is missing or empty,
otherwise returns False.
"""
return not self.result.get("data")

@property
def meta(self) -> PipeMetadata:
"""
Returns the PipeMetadata from the query, which includes attributes and their types.

:return: The PipeMetadata
"""
return [(t["name"], t["type"]) for t in self.result.get("meta", [])]

@property
def data(self) -> PipeJsonData:
"""
Returns the data from the query, which is a list of dictionaries representing the rows of the query result.

:return: The PipeJsonData
"""
return self.result.get("data")


Expand Down Expand Up @@ -83,26 +110,47 @@ class Pipe:
version: Optional[int]
resource: str

def __init__(self, name, token, version: int = None, api=None) -> None:
def __init__(self, name, token, version: int = None, api: str = None) -> None:
super().__init__()
self.name = name
self.token = token
self.version = version
self.resource = (api or config.API_URL).rstrip("/") + self.endpoint

@property
def canonical_name(self):
def canonical_name(self) -> str:
"""
Returns the name of the pipe that can be queried. If a version is specified, the name will be suffixed with
``__v<version>``. Otherwise, this just returns the name. Note that versions are discouraged in the current
tinybird workflows.

:return: The canonical name of the pipe that can be used in queries
"""
if self.version is not None:
return f"{self.name}__v{self.version}"
else:
return self.name

@property
def pipe_url(self):
def pipe_url(self) -> str:
"""
Returns the API URL of this pipe. It's something like ``https://api.tinybird.co/v0/pipes/my_pipe.json``.

:return: The Pipe API URL
"""
return self.resource + "/" + self.canonical_name + ".json"

def query(self, params=None) -> PipeJsonResponse:
params = params or dict()
def query(self, params: dict[str, str] = None) -> PipeJsonResponse:
"""
Query the pipe endpoint using the given dynamic parameters. Note that the pipe needs to be exposed as an
endpoint.

See: https://www.tinybird.co/docs/forward/work-with-data/query-parameters#use-pipes-api-endpoints-with-dynamic-parameters

:param params: The dynamic parameters of the pipe and the values for your query
:return: a PipeJsonResponse containing the result of the query
"""
params = params or {}
if "token" not in params and self.token:
params["token"] = self.token

Expand All @@ -114,6 +162,15 @@ def query(self, params=None) -> PipeJsonResponse:
raise PipeError(response)

def pages(self, page_size: int = 50, start_at: int = 0) -> PipePageIterator:
"""
Returns an iterator over the pipe's data pages. Each page contains ``page_size`` records.

TODO: currently we don't support dynamic parameters with paged queries

:param page_size: The size of each page (default 50)
:param start_at: The offset at which to start (default 0)
:return:
"""
return PagedPipeQuery(pipe=self, page_size=page_size, start_at=start_at)

def sql(self, query: str) -> PipeJsonResponse:
Expand All @@ -123,6 +180,9 @@ def sql(self, query: str) -> PipeJsonResponse:
pipe.sql("select count() from _")

See https://docs.tinybird.co/api-reference/query-api.html

:param query: The SQL query to run
:return: The result of the query
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
params = {"q": query}
Expand Down
Loading