diff --git a/pyproject.toml b/pyproject.toml
index 55d8dee..016a499 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -37,6 +37,7 @@ dev = [
"coverage[toml]>=5.0",
"pytest-cov>=2.7.1",
"coveralls",
+ "tinybird",
]
[tool.hatch.version]
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
new file mode 100644
index 0000000..aab739c
--- /dev/null
+++ b/tests/integration/conftest.py
@@ -0,0 +1,49 @@
+import os
+import time
+
+import pytest
+
+from verdin.client import Client
+from verdin.test.cli import TinybirdCli
+from verdin.test.container import TinybirdLocalContainer
+
+
+@pytest.fixture(scope="session")
+def client(tinybird_local_container) -> Client:
+ return tinybird_local_container.client()
+
+
+@pytest.fixture(scope="session")
+def cli(tinybird_local_container) -> TinybirdCli:
+ project_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "project"))
+
+ return TinybirdCli(
+ host=tinybird_local_container.url,
+ local=True,
+ cwd=project_dir,
+ )
+
+
+@pytest.fixture(scope="session", autouse=True)
+def tinybird_local_container():
+ """
+ Starts a tinybird local container in the background and waits until it becomes available.
+ """
+ project_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "project"))
+
+ container = TinybirdLocalContainer(cwd=project_dir)
+
+ container.start()
+ container.wait_is_up()
+
+ yield container
+
+ # cleanup
+ container.stop()
+
+
+@pytest.fixture(scope="session", autouse=True)
+def deployed_project(cli):
+ time.sleep(5)
+ cli.deploy(wait=True, auto=True)
+ yield
diff --git a/tests/integration/project/.cursorrules b/tests/integration/project/.cursorrules
new file mode 100644
index 0000000..a8744de
--- /dev/null
+++ b/tests/integration/project/.cursorrules
@@ -0,0 +1,374 @@
+
+You are an expert in SQL and Tinybird. Follow these instructions when working with .datasource and .pipe files:
+
+
+You have commands at your disposal to develop a tinybird project:
+- tb build: to build the project locally and check it works.
+- tb deployment create --wait --auto: to create a deployment and promote it automatically
+- tb test run: to run existing tests
+- tb endpoint url : to get the url of an endpoint, token included.
+- tb endpoint data : to get the data of an endpoint. You can pass parameters to the endpoint like this: tb endpoint data --param1 value1 --param2 value2
+- tb token ls: to list all the tokens
+There are other commands that you can use, but these are the most common ones. Run `tb -h` to see all the commands if needed.
+When you need to work with resources or data in cloud, add always the --cloud flag before the command. Example: tb --cloud datasource ls
+
+
+- When asking to create a tinybird data project, if the needed folders are not already created, use the following structure:
+├── connections
+├── copies
+├── sinks
+├── datasources
+├── endpoints
+├── fixtures
+├── materializations
+├── pipes
+└── tests
+- The local development server will be available at http://localhost:7181. Even if some response uses another base url, use always http://localhost:7181.
+- After every change in your .datasource, .pipe or .ndjson files, run `tb build` to build the project locally.
+- When you need to ingest data locally in a datasource, create a .ndjson file with the same name of the datasource and the data you want and run `tb build` so the data is ingested.
+- The format of the generated api endpoint urls is: http://localhost:7181/v0/pipe/.json?token=
+- Before running the tests, remember to have the project built with `tb build` with the latest changes.
+
+When asking for ingesting data, adding data or appending data do the following depending on the environment you want to work with:
+
+- When building locally, create a .ndjson file with the data you want to ingest and do `tb build` to ingest the data in the build env.
+- We call `cloud` the production environment.
+- When appending data in cloud, use `tb --cloud datasource append `
+- When you have a response that says “there are rows in quarantine”, do `tb [--cloud] datasource data _quarantine` to understand what is the problem.
+
+
+Follow these instructions when creating or updating .datasource files:
+
+
+ - Content cannot be empty.
+ - The datasource names must be unique.
+ - No indentation is allowed for property names: DESCRIPTION, SCHEMA, ENGINE, ENGINE_PARTITION_KEY, ENGINE_SORTING_KEY, etc.
+ - Use MergeTree engine by default.
+ - Use AggregatingMergeTree engine when the datasource is the target of a materialized pipe.
+ - Use always json paths to define the schema. Example: `user_id` String `json:$.user_id`,
+ - Array columns are supported with a special syntax. Example: `items` Array(String) `json:$.items[:]`
+ - If the datasource is using an S3 or GCS connection, they need to set IMPORT_CONNECTION_NAME, IMPORT_BUCKET_URI and IMPORT_SCHEDULE (GCS @on-demand only, S3 supports @auto too)
+ - If the datasource is using a Kafka connection, they need to set KAFKA_CONNECTION_NAME as the name of the .connection file, KAFKA_TOPIC topic_name and KAFKA_GROUP_ID as the group id for the datasource
+ - Unless the user asks for them, do not include ENGINE_PARTITION_KEY and ENGINE_PRIMARY_KEY.
+ - DateTime64 type without precision is not supported. Use DateTime64(3) instead.
+
+
+
+
+
+Follow these instructions when creating or updating .pipe files:
+
+Follow these instructions when creating or updating any type of .pipe file:
+
+ - The pipe names must be unique.
+ - Nodes do NOT use the same name as the Pipe they belong to. So if the pipe name is "my_pipe", the nodes must be named different like "my_pipe_node_1", "my_pipe_node_2", etc.
+ - Node names MUST be different from the resource names in the project.
+ - No indentation is allowed for property names: DESCRIPTION, NODE, SQL, TYPE, etc.
+ - Allowed TYPE values are: endpoint, copy, materialized, sink.
+ - Add always the output node in the TYPE section or in the last node of the pipe.
+
+
+
+
+ - The SQL query must be a valid ClickHouse SQL query that mixes ClickHouse syntax and Tinybird templating syntax (Tornado templating language under the hood).
+ - SQL queries with parameters must start with "%" character and a newline on top of every query to be able to use the parameters. Examples:
+
+ SELECT * FROM events WHERE session_id={{String(my_param, "default_value")}}
+
+
+ %
+ SELECT * FROM events WHERE session_id={{String(my_param, "default_value")}}
+
+ - The Parameter functions like this one {{String(my_param_name,default_value)}} can be one of the following: String, DateTime, Date, Float32, Float64, Int, Integer, UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256
+ - Parameter names must be different from column names. Pass always the param name and a default value to the function.
+ - Use ALWAYS hardcoded values for default values for parameters.
+ - Code inside the template {{template_expression}} follows the rules of Tornado templating language so no module is allowed to be imported. So for example you can't use now() as default value for a DateTime parameter. You need an if else block like this:
+
+ AND timestamp BETWEEN {DateTime(start_date, now() - interval 30 day)} AND {DateTime(end_date, now())}
+
+
+ {%if not defined(start_date)%}
+ timestamp BETWEEN now() - interval 30 day
+ {%else%}
+ timestamp BETWEEN {{DateTime(start_date)}}
+ {%end%}
+ {%if not defined(end_date)%}
+ AND now()
+ {%else%}
+ AND {{DateTime(end_date)}}
+ {%end%}
+
+ - Parameters must not be quoted.
+ - When you use defined function with a paremeter inside, do NOT add quotes around the parameter:
+ {% if defined('my_param') %}
+ {% if defined(my_param) %}
+ - Use datasource names as table names when doing SELECT statements.
+ - Do not use pipe names as table names.
+ - The available datasource names to use in the SQL are the ones present in the existing_resources section or the ones you will create.
+ - Use node names as table names only when nodes are present in the same file.
+ - Do not reference the current node name in the SQL.
+ - SQL queries only accept SELECT statements with conditions, aggregations, joins, etc.
+ - Do NOT use CREATE TABLE, INSERT INTO, CREATE DATABASE, etc.
+ - Use ONLY SELECT statements in the SQL section.
+ - INSERT INTO is not supported in SQL section.
+ - ClickHouse functions supported are:
+ - General functions supported are: ['BLAKE3', 'CAST', 'CHARACTER_LENGTH', 'CHAR_LENGTH', 'CRC32', 'CRC32IEEE', 'CRC64', 'DATABASE', 'DATE', 'DATE_DIFF', 'DATE_FORMAT', 'DATE_TRUNC', 'DAY', 'DAYOFMONTH', 'DAYOFWEEK', 'DAYOFYEAR', 'FORMAT_BYTES', 'FQDN', 'FROM_BASE64', 'FROM_DAYS', 'FROM_UNIXTIME', 'HOUR', 'INET6_ATON', 'INET6_NTOA', 'INET_ATON', 'INET_NTOA', 'IPv4CIDRToRange', 'IPv4NumToString', 'IPv4NumToStringClassC', 'IPv4StringToNum', 'IPv4StringToNumOrDefault', 'IPv4StringToNumOrNull', 'IPv4ToIPv6', 'IPv6CIDRToRange', 'IPv6NumToString', 'IPv6StringToNum', 'IPv6StringToNumOrDefault', 'IPv6StringToNumOrNull', 'JSONArrayLength', 'JSONExtract', 'JSONExtractArrayRaw', 'JSONExtractBool', 'JSONExtractFloat', 'JSONExtractInt', 'JSONExtractKeys', 'JSONExtractKeysAndValues', 'JSONExtractKeysAndValuesRaw', 'JSONExtractRaw', 'JSONExtractString', 'JSONExtractUInt', 'JSONHas', 'JSONKey', 'JSONLength', 'JSONRemoveDynamoDBAnnotations', 'JSONType', 'JSON_ARRAY_LENGTH', 'JSON_EXISTS', 'JSON_QUERY', 'JSON_VALUE', 'L1Distance', 'L1Norm', 'L1Normalize', 'L2Distance', 'L2Norm', 'L2Normalize', 'L2SquaredDistance', 'L2SquaredNorm', 'LAST_DAY', 'LinfDistance', 'LinfNorm', 'LinfNormalize', 'LpDistance', 'LpNorm', 'LpNormalize', 'MACNumToString', 'MACStringToNum', 'MACStringToOUI', 'MAP_FROM_ARRAYS', 'MD4', 'MD5', 'MILLISECOND', 'MINUTE', 'MONTH', 'OCTET_LENGTH', 'QUARTER', 'REGEXP_EXTRACT', 'REGEXP_MATCHES', 'REGEXP_REPLACE', 'SCHEMA', 'SECOND', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'SHA512_256', 'SUBSTRING_INDEX', 'SVG', 'TIMESTAMP_DIFF', 'TO_BASE64', 'TO_DAYS', 'TO_UNIXTIME', 'ULIDStringToDateTime', 'URLHash', 'URLHierarchy', 'URLPathHierarchy', 'UTCTimestamp', 'UTC_timestamp', 'UUIDNumToString', 'UUIDStringToNum', 'UUIDToNum', 'UUIDv7ToDateTime', 'YEAR', 'YYYYMMDDToDate', 'YYYYMMDDToDate32', 'YYYYMMDDhhmmssToDateTime', 'YYYYMMDDhhmmssToDateTime64']
+ - Character insensitive functions supported are: ['cast', 'character_length', 'char_length', 'crc32', 'crc32ieee', 'crc64', 'database', 'date', 'date_format', 'date_trunc', 'day', 'dayofmonth', 'dayofweek', 'dayofyear', 'format_bytes', 'fqdn', 'from_base64', 'from_days', 'from_unixtime', 'hour', 'inet6_aton', 'inet6_ntoa', 'inet_aton', 'inet_ntoa', 'json_array_length', 'last_day', 'millisecond', 'minute', 'month', 'octet_length', 'quarter', 'regexp_extract', 'regexp_matches', 'regexp_replace', 'schema', 'second', 'substring_index', 'to_base64', 'to_days', 'to_unixtime', 'utctimestamp', 'utc_timestamp', 'year']
+ - Aggregate functions supported are: ['BIT_AND', 'BIT_OR', 'BIT_XOR', 'COVAR_POP', 'COVAR_SAMP', 'STD', 'STDDEV_POP', 'STDDEV_SAMP', 'VAR_POP', 'VAR_SAMP', 'aggThrow', 'analysisOfVariance', 'anova', 'any', 'anyHeavy', 'anyLast', 'anyLast_respect_nulls', 'any_respect_nulls', 'any_value', 'any_value_respect_nulls', 'approx_top_count', 'approx_top_k', 'approx_top_sum', 'argMax', 'argMin', 'array_agg', 'array_concat_agg', 'avg', 'avgWeighted', 'boundingRatio', 'categoricalInformationValue', 'contingency', 'corr', 'corrMatrix', 'corrStable', 'count', 'covarPop', 'covarPopMatrix', 'covarPopStable', 'covarSamp', 'covarSampMatrix', 'covarSampStable', 'cramersV', 'cramersVBiasCorrected', 'deltaSum', 'deltaSumTimestamp', 'dense_rank', 'entropy', 'exponentialMovingAverage', 'exponentialTimeDecayedAvg', 'exponentialTimeDecayedCount', 'exponentialTimeDecayedMax', 'exponentialTimeDecayedSum', 'first_value', 'first_value_respect_nulls', 'flameGraph', 'groupArray', 'groupArrayInsertAt', 'groupArrayIntersect', 'groupArrayLast', 'groupArrayMovingAvg', 'groupArrayMovingSum', 'groupArraySample', 'groupArraySorted', 'groupBitAnd', 'groupBitOr', 'groupBitXor', 'groupBitmap', 'groupBitmapAnd', 'groupBitmapOr', 'groupBitmapXor', 'groupUniqArray', 'histogram', 'intervalLengthSum', 'kolmogorovSmirnovTest', 'kurtPop', 'kurtSamp', 'lagInFrame', 'largestTriangleThreeBuckets', 'last_value', 'last_value_respect_nulls', 'leadInFrame', 'lttb', 'mannWhitneyUTest', 'max', 'maxIntersections', 'maxIntersectionsPosition', 'maxMappedArrays', 'meanZTest', 'median', 'medianBFloat16', 'medianBFloat16Weighted', 'medianDD', 'medianDeterministic', 'medianExact', 'medianExactHigh', 'medianExactLow', 'medianExactWeighted', 'medianGK', 'medianInterpolatedWeighted', 'medianTDigest', 'medianTDigestWeighted', 'medianTiming', 'medianTimingWeighted', 'min', 'minMappedArrays', 'nonNegativeDerivative', 'nothing', 'nothingNull', 'nothingUInt64', 'nth_value', 'ntile', 'quantile', 'quantileBFloat16', 'quantileBFloat16Weighted', 'quantileDD', 'quantileDeterministic', 'quantileExact', 'quantileExactExclusive', 'quantileExactHigh', 'quantileExactInclusive', 'quantileExactLow', 'quantileExactWeighted', 'quantileGK', 'quantileInterpolatedWeighted', 'quantileTDigest', 'quantileTDigestWeighted', 'quantileTiming', 'quantileTimingWeighted', 'quantiles', 'quantilesBFloat16', 'quantilesBFloat16Weighted', 'quantilesDD', 'quantilesDeterministic', 'quantilesExact', 'quantilesExactExclusive', 'quantilesExactHigh', 'quantilesExactInclusive', 'quantilesExactLow', 'quantilesExactWeighted', 'quantilesGK', 'quantilesInterpolatedWeighted', 'quantilesTDigest', 'quantilesTDigestWeighted', 'quantilesTiming', 'quantilesTimingWeighted', 'rank', 'rankCorr', 'retention', 'row_number', 'sequenceCount', 'sequenceMatch', 'sequenceNextNode', 'simpleLinearRegression', 'singleValueOrNull', 'skewPop', 'skewSamp', 'sparkBar', 'sparkbar', 'stddevPop', 'stddevPopStable', 'stddevSamp', 'stddevSampStable', 'stochasticLinearRegression', 'stochasticLogisticRegression', 'studentTTest', 'sum', 'sumCount', 'sumKahan', 'sumMapFiltered', 'sumMapFilteredWithOverflow', 'sumMapWithOverflow', 'sumMappedArrays', 'sumWithOverflow', 'theilsU', 'topK', 'topKWeighted', 'uniq', 'uniqCombined', 'uniqCombined64', 'uniqExact', 'uniqHLL12', 'uniqTheta', 'uniqUpTo', 'varPop', 'varPopStable', 'varSamp', 'varSampStable', 'welchTTest', 'windowFunnel']
+ - How to use ClickHouse supported functions:
+ - When using functions try always ClickHouse functions first, then SQL functions.
+ - Do not use any ClickHouse function that is not present in the list of general functions, character insensitive functions and aggregate functions.
+ - If the function is not present in the list, the sql query will fail, so avoid at all costs to use any function that is not present in the list.
+ - When aliasing a column, use first the column name and then the alias.
+ - General functions and aggregate functions are case sensitive.
+ - Character insensitive functions are case insensitive.
+ - Parameters are never quoted in any case.
+ - Use the following syntax in the SQL section for the iceberg table function: iceberg('s3://bucket/path/to/table', {{tb_secret('aws_access_key_id')}}, {{tb_secret('aws_secret_access_key')}})
+ - Use the following syntax in the SQL section for the postgres table function: postgresql('host:port', 'database', 'table', {{tb_secret('db_username')}}, {{tb_secret('db_password')}}), 'schema')
+
+
+
+
+DESCRIPTION >
+ Some meaningful description of the datasource
+
+SCHEMA >
+ `column_name_1` clickhouse_tinybird_compatible_data_type `json:$.column_name_1`,
+ `column_name_2` clickhouse_tinybird_compatible_data_type `json:$.column_name_2`,
+ ...
+ `column_name_n` clickhouse_tinybird_compatible_data_type `json:$.column_name_n`
+
+ENGINE "MergeTree"
+ENGINE_PARTITION_KEY "partition_key"
+ENGINE_SORTING_KEY "sorting_key_1, sorting_key_2, ..."
+
+
+
+
+DESCRIPTION >
+ Some meaningful description of the pipe
+
+NODE node_1
+SQL >
+ [sql query using clickhouse syntax and tinybird templating syntax and starting always with SELECT or %
+SELECT]
+TYPE endpoint
+
+
+
+
+
+- Do not create copy pipes by default, unless the user asks for it.
+- Copy pipes should be created in the /copies folder.
+- In a .pipe file you can define how to export the result of a Pipe to a Data Source, optionally with a schedule.
+- Do not include COPY_SCHEDULE in the .pipe file unless is specifically requested by the user.
+- COPY_SCHEDULE is a cron expression that defines the schedule of the copy pipe.
+- COPY_SCHEDULE is optional and if not provided, the copy pipe will be executed only once.
+- TARGET_DATASOURCE is the name of the Data Source to export the result to.
+- TYPE COPY is the type of the pipe and it is mandatory for copy pipes.
+- If the copy pipe uses parameters, you must include the % character and a newline on top of every query to be able to use the parameters.
+- The content of the .pipe file must follow this format:
+DESCRIPTION Copy Pipe to export sales hour every hour to the sales_hour_copy Data Source
+
+NODE daily_sales
+SQL >
+ %
+ SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales
+ FROM teams
+ WHERE
+ day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now())
+ and country = {{ String(country, 'US')}}
+ GROUP BY day, country
+
+TYPE COPY
+TARGET_DATASOURCE sales_hour_copy
+COPY_SCHEDULE 0 * * * *
+
+
+
+
+- Do not create materialized pipes by default, unless the user asks for it.
+- Materialized pipes should be created in the /materializations folder.
+- In a .pipe file you can define how to materialize each row ingested in the earliest Data Source in the Pipe query to a materialized Data Source. Materialization happens at ingest.
+- DATASOURCE: Required when TYPE is MATERIALIZED. Sets the target Data Source for materialized nodes.
+- TYPE MATERIALIZED is the type of the pipe and it is mandatory for materialized pipes.
+- The content of the .pipe file must follow the materialized_pipe_content format.
+- Use State modifier for the aggregated columns in the pipe.
+
+
+NODE daily_sales
+SQL >
+ SELECT toStartOfDay(starting_date) day, country, sumState(sales) as total_sales
+ FROM teams
+ GROUP BY day, country
+
+TYPE MATERIALIZED
+DATASOURCE sales_by_hour
+
+
+- The target datasource of a materialized pipe must have an AggregatingMergeTree engine.
+- Use AggregateFunction for the aggregated columns in the pipe.
+- Pipes using a materialized data source must use the Merge modifier in the SQL query for the aggregated columns. Example: sumMerge(total_sales)
+- Put all dimensions in the ENGINE_SORTING_KEY, sorted from least to most cardinality.
+
+
+SCHEMA >
+ `total_sales` AggregateFunction(sum, Float64),
+ `sales_count` AggregateFunction(count, UInt64),
+ `column_name_2` AggregateFunction(avg, Float64),
+ `dimension_1` String,
+ `dimension_2` String,
+ ...
+ `date` DateTime
+
+ENGINE "AggregatingMergeTree"
+ENGINE_PARTITION_KEY "toYYYYMM(date)"
+ENGINE_SORTING_KEY "date, dimension_1, dimension_2, ..."
+
+
+
+
+- Do not create sink pipes by default, unless the user asks for it.
+- Sink pipes should be created in the /sinks folder.
+- In a .pipe file you can define how to export the result of a Pipe to an external system, optionally with a schedule.
+- Valid external systems are Kafka, S3, GCS.
+- Sink pipes depend on a connection, if no connection is provided, search for an existing connection that suits the request. If none, create a new connection.
+- Do not include EXPORT_SCHEDULE in the .pipe file unless is specifically requested by the user.
+- EXPORT_SCHEDULE is a cron expression that defines the schedule of the sink pipe.
+- EXPORT_SCHEDULE is optional and if not provided, the sink pipe will be executed only once.
+- EXPORT_CONNECTION_NAME is the name of the connection used to export.
+- TYPE SINK is the type of the pipe and it is mandatory for sink pipes.
+- If the sink pipe uses parameters, you must include the % character and a newline on top of every query to be able to use the parameters.
+- The content of the .pipe file must follow this format:
+DESCRIPTION Sink Pipe to export sales hour every hour using my_connection
+
+NODE daily_sales
+SQL >
+ %
+ SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales
+ FROM teams
+ WHERE
+ day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now())
+ and country = {{ String(country, 'US')}}
+ GROUP BY day, country
+
+TYPE sink
+EXPORT_CONNECTION_NAME "my_connection"
+EXPORT_BUCKET_URI "s3://tinybird-sinks"
+EXPORT_FILE_TEMPLATE "daily_prices"
+EXPORT_SCHEDULE "*/5 * * * *"
+EXPORT_FORMAT "csv"
+EXPORT_COMPRESSION "gz"
+EXPORT_STRATEGY "truncate"
+
+
+
+
+ - Content cannot be empty.
+ - The connection names must be unique.
+ - No indentation is allowed for property names
+ - We support kafka, gcs and s3 connections for now
+
+
+
+
+TYPE kafka
+KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("PRODUCTION_KAFKA_SERVERS", "localhost:9092") }}
+KAFKA_SECURITY_PROTOCOL SASL_SSL
+KAFKA_SASL_MECHANISM PLAIN
+KAFKA_KEY {{ tb_secret("PRODUCTION_KAFKA_USERNAME", "") }}
+KAFKA_SECRET {{ tb_secret("PRODUCTION_KAFKA_PASSWORD", "") }}
+
+
+
+
+TYPE gcs
+GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("PRODUCTION_GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON", "") }}
+
+
+
+
+TYPE gcs
+GCS_HMAC_ACCESS_ID {{ tb_secret("gcs_hmac_access_id") }}
+GCS_HMAC_SECRET {{ tb_secret("gcs_hmac_secret") }}
+
+
+
+
+TYPE s3
+S3_REGION {{ tb_secret("PRODUCTION_S3_REGION", "") }}
+S3_ARN {{ tb_secret("PRODUCTION_S3_ARN", "") }}
+
+
+
+
+Follow these instructions when creating or updating .yaml files for tests:
+
+- The test file name must match the name of the pipe it is testing.
+- Every scenario name must be unique inside the test file.
+- When looking for the parameters available, you will find them in the pipes in the following format: {{{{String(my_param_name, default_value)}}}}.
+- If there are no parameters, you can omit parameters and generate a single test.
+- The format of the parameters is the following: param1=value1¶m2=value2¶m3=value3
+- If some parameters are provided by the user and you need to use them, preserve in the same format as they were provided, like case sensitive
+- Test as many scenarios as possible.
+- The format of the test file is the following:
+
+- name: kpis_single_day
+ description: Test hourly granularity for a single day
+ parameters: date_from=2024-01-01&date_to=2024-01-01
+ expected_result: |
+ {"date":"2024-01-01 00:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+ {"date":"2024-01-01 01:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+
+- name: kpis_date_range
+ description: Test daily granularity for a date range
+ parameters: date_from=2024-01-01&date_to=2024-01-31
+ expected_result: |
+ {"date":"2024-01-01","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+ {"date":"2024-01-02","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+
+- name: kpis_default_range
+ description: Test default behavior without date parameters (last 7 days)
+ parameters: ''
+ expected_result: |
+ {"date":"2025-01-10","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+ {"date":"2025-01-11","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+
+- name: kpis_fixed_time
+ description: Test with fixed timestamp for consistent testing
+ parameters: fixed_time=2024-01-15T12:00:00
+ expected_result: ''
+
+- name: kpis_single_day
+ description: Test single day with hourly granularity
+ parameters: date_from=2024-01-01&date_to=2024-01-01
+ expected_result: |
+ {"date":"2024-01-01 00:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+ {"date":"2024-01-01 01:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+
+
+
+
+
+Follow these instructions when evolving a datasource schema:
+
+- When you make schema changes that are incompatible with the old schema, you must use a forward query in your data source. Forward queries are necessary when introducing breaking changes. Otherwise, your deployment will fail due to a schema mismatch.
+- Forward queries translate the old schema to a new one that you define in the .datasource file. This helps you evolve your schema while continuing to ingest data.
+Follow these steps to evolve your schema using a forward query:
+- Edit the .datasource file to add a forward query.
+- Run tb deploy --check to validate the deployment before creating it.
+- Deploy and promote your changes in Tinybird Cloud using {base_command} --cloud deploy.
+
+SCHEMA >
+ `timestamp` DateTime `json:$.timestamp`,
+ `session_id` UUID `json:$.session_id`,
+ `action` String `json:$.action`,
+ `version` String `json:$.version`,
+ `payload` String `json:$.payload`
+
+FORWARD_QUERY >
+ select timestamp, toUUID(session_id) as session_id, action, version, payload
+
+
+
+
diff --git a/tests/integration/project/.env.local b/tests/integration/project/.env.local
new file mode 100644
index 0000000..e69de29
diff --git a/tests/integration/project/.gitignore b/tests/integration/project/.gitignore
new file mode 100644
index 0000000..464c1bb
--- /dev/null
+++ b/tests/integration/project/.gitignore
@@ -0,0 +1,2 @@
+.tinyb
+.terraform
diff --git a/tests/integration/project/CLAUDE.md b/tests/integration/project/CLAUDE.md
new file mode 100644
index 0000000..ffcff51
--- /dev/null
+++ b/tests/integration/project/CLAUDE.md
@@ -0,0 +1,373 @@
+
+# Tinybird CLI rules
+
+## Commands
+You have commands at your disposal to develop a tinybird project:
+- tb build: to build the project locally and check it works.
+- tb deployment create --wait --auto: to create a deployment and promote it automatically
+- tb test run: to run existing tests
+- tb endpoint url : to get the url of an endpoint, token included.
+- tb endpoint data : to get the data of an endpoint. You can pass parameters to the endpoint like this: tb endpoint data --param1 value1 --param2 value2
+- tb token ls: to list all the tokens
+There are other commands that you can use, but these are the most common ones. Run `tb -h` to see all the commands if needed.
+When you need to work with resources or data in cloud, add always the --cloud flag before the command. Example: tb --cloud datasource ls
+
+## Development instructions
+- When asking to create a tinybird data project, if the needed folders are not already created, use the following structure:
+├── connections
+├── copies
+├── sinks
+├── datasources
+├── endpoints
+├── fixtures
+├── materializations
+├── pipes
+└── tests
+- The local development server will be available at http://localhost:7181. Even if some response uses another base url, use always http://localhost:7181.
+- After every change in your .datasource, .pipe or .ndjson files, run `tb build` to build the project locally.
+- When you need to ingest data locally in a datasource, create a .ndjson file with the same name of the datasource and the data you want and run `tb build` so the data is ingested.
+- The format of the generated api endpoint urls is: http://localhost:7181/v0/pipe/.json?token=
+- Before running the tests, remember to have the project built with `tb build` with the latest changes.
+
+When asking for ingesting data, adding data or appending data do the following depending on the environment you want to work with:
+
+## Ingestion instructions
+- When building locally, create a .ndjson file with the data you want to ingest and do `tb build` to ingest the data in the build env.
+- We call `cloud` the production environment.
+- When appending data in cloud, use `tb --cloud datasource append `
+- When you have a response that says “there are rows in quarantine”, do `tb [--cloud] datasource data _quarantine` to understand what is the problem.
+
+## .datasource file instructions
+Follow these instructions when creating or updating .datasource files:
+
+
+ - Content cannot be empty.
+ - The datasource names must be unique.
+ - No indentation is allowed for property names: DESCRIPTION, SCHEMA, ENGINE, ENGINE_PARTITION_KEY, ENGINE_SORTING_KEY, etc.
+ - Use MergeTree engine by default.
+ - Use AggregatingMergeTree engine when the datasource is the target of a materialized pipe.
+ - Use always json paths to define the schema. Example: `user_id` String `json:$.user_id`,
+ - Array columns are supported with a special syntax. Example: `items` Array(String) `json:$.items[:]`
+ - If the datasource is using an S3 or GCS connection, they need to set IMPORT_CONNECTION_NAME, IMPORT_BUCKET_URI and IMPORT_SCHEDULE (GCS @on-demand only, S3 supports @auto too)
+ - If the datasource is using a Kafka connection, they need to set KAFKA_CONNECTION_NAME as the name of the .connection file, KAFKA_TOPIC topic_name and KAFKA_GROUP_ID as the group id for the datasource
+ - Unless the user asks for them, do not include ENGINE_PARTITION_KEY and ENGINE_PRIMARY_KEY.
+ - DateTime64 type without precision is not supported. Use DateTime64(3) instead.
+
+
+
+## .pipe file instructions
+Follow these instructions when creating or updating .pipe files:
+
+Follow these instructions when creating or updating any type of .pipe file:
+
+ - The pipe names must be unique.
+ - Nodes do NOT use the same name as the Pipe they belong to. So if the pipe name is "my_pipe", the nodes must be named different like "my_pipe_node_1", "my_pipe_node_2", etc.
+ - Node names MUST be different from the resource names in the project.
+ - No indentation is allowed for property names: DESCRIPTION, NODE, SQL, TYPE, etc.
+ - Allowed TYPE values are: endpoint, copy, materialized, sink.
+ - Add always the output node in the TYPE section or in the last node of the pipe.
+
+
+
+
+ - The SQL query must be a valid ClickHouse SQL query that mixes ClickHouse syntax and Tinybird templating syntax (Tornado templating language under the hood).
+ - SQL queries with parameters must start with "%" character and a newline on top of every query to be able to use the parameters. Examples:
+
+ SELECT * FROM events WHERE session_id={{String(my_param, "default_value")}}
+
+
+ %
+ SELECT * FROM events WHERE session_id={{String(my_param, "default_value")}}
+
+ - The Parameter functions like this one {{String(my_param_name,default_value)}} can be one of the following: String, DateTime, Date, Float32, Float64, Int, Integer, UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256
+ - Parameter names must be different from column names. Pass always the param name and a default value to the function.
+ - Use ALWAYS hardcoded values for default values for parameters.
+ - Code inside the template {{template_expression}} follows the rules of Tornado templating language so no module is allowed to be imported. So for example you can't use now() as default value for a DateTime parameter. You need an if else block like this:
+
+ AND timestamp BETWEEN {DateTime(start_date, now() - interval 30 day)} AND {DateTime(end_date, now())}
+
+
+ {%if not defined(start_date)%}
+ timestamp BETWEEN now() - interval 30 day
+ {%else%}
+ timestamp BETWEEN {{DateTime(start_date)}}
+ {%end%}
+ {%if not defined(end_date)%}
+ AND now()
+ {%else%}
+ AND {{DateTime(end_date)}}
+ {%end%}
+
+ - Parameters must not be quoted.
+ - When you use defined function with a paremeter inside, do NOT add quotes around the parameter:
+ {% if defined('my_param') %}
+ {% if defined(my_param) %}
+ - Use datasource names as table names when doing SELECT statements.
+ - Do not use pipe names as table names.
+ - The available datasource names to use in the SQL are the ones present in the existing_resources section or the ones you will create.
+ - Use node names as table names only when nodes are present in the same file.
+ - Do not reference the current node name in the SQL.
+ - SQL queries only accept SELECT statements with conditions, aggregations, joins, etc.
+ - Do NOT use CREATE TABLE, INSERT INTO, CREATE DATABASE, etc.
+ - Use ONLY SELECT statements in the SQL section.
+ - INSERT INTO is not supported in SQL section.
+ - ClickHouse functions supported are:
+ - General functions supported are: ['BLAKE3', 'CAST', 'CHARACTER_LENGTH', 'CHAR_LENGTH', 'CRC32', 'CRC32IEEE', 'CRC64', 'DATABASE', 'DATE', 'DATE_DIFF', 'DATE_FORMAT', 'DATE_TRUNC', 'DAY', 'DAYOFMONTH', 'DAYOFWEEK', 'DAYOFYEAR', 'FORMAT_BYTES', 'FQDN', 'FROM_BASE64', 'FROM_DAYS', 'FROM_UNIXTIME', 'HOUR', 'INET6_ATON', 'INET6_NTOA', 'INET_ATON', 'INET_NTOA', 'IPv4CIDRToRange', 'IPv4NumToString', 'IPv4NumToStringClassC', 'IPv4StringToNum', 'IPv4StringToNumOrDefault', 'IPv4StringToNumOrNull', 'IPv4ToIPv6', 'IPv6CIDRToRange', 'IPv6NumToString', 'IPv6StringToNum', 'IPv6StringToNumOrDefault', 'IPv6StringToNumOrNull', 'JSONArrayLength', 'JSONExtract', 'JSONExtractArrayRaw', 'JSONExtractBool', 'JSONExtractFloat', 'JSONExtractInt', 'JSONExtractKeys', 'JSONExtractKeysAndValues', 'JSONExtractKeysAndValuesRaw', 'JSONExtractRaw', 'JSONExtractString', 'JSONExtractUInt', 'JSONHas', 'JSONKey', 'JSONLength', 'JSONRemoveDynamoDBAnnotations', 'JSONType', 'JSON_ARRAY_LENGTH', 'JSON_EXISTS', 'JSON_QUERY', 'JSON_VALUE', 'L1Distance', 'L1Norm', 'L1Normalize', 'L2Distance', 'L2Norm', 'L2Normalize', 'L2SquaredDistance', 'L2SquaredNorm', 'LAST_DAY', 'LinfDistance', 'LinfNorm', 'LinfNormalize', 'LpDistance', 'LpNorm', 'LpNormalize', 'MACNumToString', 'MACStringToNum', 'MACStringToOUI', 'MAP_FROM_ARRAYS', 'MD4', 'MD5', 'MILLISECOND', 'MINUTE', 'MONTH', 'OCTET_LENGTH', 'QUARTER', 'REGEXP_EXTRACT', 'REGEXP_MATCHES', 'REGEXP_REPLACE', 'SCHEMA', 'SECOND', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'SHA512_256', 'SUBSTRING_INDEX', 'SVG', 'TIMESTAMP_DIFF', 'TO_BASE64', 'TO_DAYS', 'TO_UNIXTIME', 'ULIDStringToDateTime', 'URLHash', 'URLHierarchy', 'URLPathHierarchy', 'UTCTimestamp', 'UTC_timestamp', 'UUIDNumToString', 'UUIDStringToNum', 'UUIDToNum', 'UUIDv7ToDateTime', 'YEAR', 'YYYYMMDDToDate', 'YYYYMMDDToDate32', 'YYYYMMDDhhmmssToDateTime', 'YYYYMMDDhhmmssToDateTime64']
+ - Character insensitive functions supported are: ['cast', 'character_length', 'char_length', 'crc32', 'crc32ieee', 'crc64', 'database', 'date', 'date_format', 'date_trunc', 'day', 'dayofmonth', 'dayofweek', 'dayofyear', 'format_bytes', 'fqdn', 'from_base64', 'from_days', 'from_unixtime', 'hour', 'inet6_aton', 'inet6_ntoa', 'inet_aton', 'inet_ntoa', 'json_array_length', 'last_day', 'millisecond', 'minute', 'month', 'octet_length', 'quarter', 'regexp_extract', 'regexp_matches', 'regexp_replace', 'schema', 'second', 'substring_index', 'to_base64', 'to_days', 'to_unixtime', 'utctimestamp', 'utc_timestamp', 'year']
+ - Aggregate functions supported are: ['BIT_AND', 'BIT_OR', 'BIT_XOR', 'COVAR_POP', 'COVAR_SAMP', 'STD', 'STDDEV_POP', 'STDDEV_SAMP', 'VAR_POP', 'VAR_SAMP', 'aggThrow', 'analysisOfVariance', 'anova', 'any', 'anyHeavy', 'anyLast', 'anyLast_respect_nulls', 'any_respect_nulls', 'any_value', 'any_value_respect_nulls', 'approx_top_count', 'approx_top_k', 'approx_top_sum', 'argMax', 'argMin', 'array_agg', 'array_concat_agg', 'avg', 'avgWeighted', 'boundingRatio', 'categoricalInformationValue', 'contingency', 'corr', 'corrMatrix', 'corrStable', 'count', 'covarPop', 'covarPopMatrix', 'covarPopStable', 'covarSamp', 'covarSampMatrix', 'covarSampStable', 'cramersV', 'cramersVBiasCorrected', 'deltaSum', 'deltaSumTimestamp', 'dense_rank', 'entropy', 'exponentialMovingAverage', 'exponentialTimeDecayedAvg', 'exponentialTimeDecayedCount', 'exponentialTimeDecayedMax', 'exponentialTimeDecayedSum', 'first_value', 'first_value_respect_nulls', 'flameGraph', 'groupArray', 'groupArrayInsertAt', 'groupArrayIntersect', 'groupArrayLast', 'groupArrayMovingAvg', 'groupArrayMovingSum', 'groupArraySample', 'groupArraySorted', 'groupBitAnd', 'groupBitOr', 'groupBitXor', 'groupBitmap', 'groupBitmapAnd', 'groupBitmapOr', 'groupBitmapXor', 'groupUniqArray', 'histogram', 'intervalLengthSum', 'kolmogorovSmirnovTest', 'kurtPop', 'kurtSamp', 'lagInFrame', 'largestTriangleThreeBuckets', 'last_value', 'last_value_respect_nulls', 'leadInFrame', 'lttb', 'mannWhitneyUTest', 'max', 'maxIntersections', 'maxIntersectionsPosition', 'maxMappedArrays', 'meanZTest', 'median', 'medianBFloat16', 'medianBFloat16Weighted', 'medianDD', 'medianDeterministic', 'medianExact', 'medianExactHigh', 'medianExactLow', 'medianExactWeighted', 'medianGK', 'medianInterpolatedWeighted', 'medianTDigest', 'medianTDigestWeighted', 'medianTiming', 'medianTimingWeighted', 'min', 'minMappedArrays', 'nonNegativeDerivative', 'nothing', 'nothingNull', 'nothingUInt64', 'nth_value', 'ntile', 'quantile', 'quantileBFloat16', 'quantileBFloat16Weighted', 'quantileDD', 'quantileDeterministic', 'quantileExact', 'quantileExactExclusive', 'quantileExactHigh', 'quantileExactInclusive', 'quantileExactLow', 'quantileExactWeighted', 'quantileGK', 'quantileInterpolatedWeighted', 'quantileTDigest', 'quantileTDigestWeighted', 'quantileTiming', 'quantileTimingWeighted', 'quantiles', 'quantilesBFloat16', 'quantilesBFloat16Weighted', 'quantilesDD', 'quantilesDeterministic', 'quantilesExact', 'quantilesExactExclusive', 'quantilesExactHigh', 'quantilesExactInclusive', 'quantilesExactLow', 'quantilesExactWeighted', 'quantilesGK', 'quantilesInterpolatedWeighted', 'quantilesTDigest', 'quantilesTDigestWeighted', 'quantilesTiming', 'quantilesTimingWeighted', 'rank', 'rankCorr', 'retention', 'row_number', 'sequenceCount', 'sequenceMatch', 'sequenceNextNode', 'simpleLinearRegression', 'singleValueOrNull', 'skewPop', 'skewSamp', 'sparkBar', 'sparkbar', 'stddevPop', 'stddevPopStable', 'stddevSamp', 'stddevSampStable', 'stochasticLinearRegression', 'stochasticLogisticRegression', 'studentTTest', 'sum', 'sumCount', 'sumKahan', 'sumMapFiltered', 'sumMapFilteredWithOverflow', 'sumMapWithOverflow', 'sumMappedArrays', 'sumWithOverflow', 'theilsU', 'topK', 'topKWeighted', 'uniq', 'uniqCombined', 'uniqCombined64', 'uniqExact', 'uniqHLL12', 'uniqTheta', 'uniqUpTo', 'varPop', 'varPopStable', 'varSamp', 'varSampStable', 'welchTTest', 'windowFunnel']
+ - How to use ClickHouse supported functions:
+ - When using functions try always ClickHouse functions first, then SQL functions.
+ - Do not use any ClickHouse function that is not present in the list of general functions, character insensitive functions and aggregate functions.
+ - If the function is not present in the list, the sql query will fail, so avoid at all costs to use any function that is not present in the list.
+ - When aliasing a column, use first the column name and then the alias.
+ - General functions and aggregate functions are case sensitive.
+ - Character insensitive functions are case insensitive.
+ - Parameters are never quoted in any case.
+ - Use the following syntax in the SQL section for the iceberg table function: iceberg('s3://bucket/path/to/table', {{tb_secret('aws_access_key_id')}}, {{tb_secret('aws_secret_access_key')}})
+ - Use the following syntax in the SQL section for the postgres table function: postgresql('host:port', 'database', 'table', {{tb_secret('db_username')}}, {{tb_secret('db_password')}}), 'schema')
+
+
+
+
+DESCRIPTION >
+ Some meaningful description of the datasource
+
+SCHEMA >
+ `column_name_1` clickhouse_tinybird_compatible_data_type `json:$.column_name_1`,
+ `column_name_2` clickhouse_tinybird_compatible_data_type `json:$.column_name_2`,
+ ...
+ `column_name_n` clickhouse_tinybird_compatible_data_type `json:$.column_name_n`
+
+ENGINE "MergeTree"
+ENGINE_PARTITION_KEY "partition_key"
+ENGINE_SORTING_KEY "sorting_key_1, sorting_key_2, ..."
+
+
+
+
+DESCRIPTION >
+ Some meaningful description of the pipe
+
+NODE node_1
+SQL >
+ [sql query using clickhouse syntax and tinybird templating syntax and starting always with SELECT or %
+SELECT]
+TYPE endpoint
+
+
+
+
+
+- Do not create copy pipes by default, unless the user asks for it.
+- Copy pipes should be created in the /copies folder.
+- In a .pipe file you can define how to export the result of a Pipe to a Data Source, optionally with a schedule.
+- Do not include COPY_SCHEDULE in the .pipe file unless is specifically requested by the user.
+- COPY_SCHEDULE is a cron expression that defines the schedule of the copy pipe.
+- COPY_SCHEDULE is optional and if not provided, the copy pipe will be executed only once.
+- TARGET_DATASOURCE is the name of the Data Source to export the result to.
+- TYPE COPY is the type of the pipe and it is mandatory for copy pipes.
+- If the copy pipe uses parameters, you must include the % character and a newline on top of every query to be able to use the parameters.
+- The content of the .pipe file must follow this format:
+DESCRIPTION Copy Pipe to export sales hour every hour to the sales_hour_copy Data Source
+
+NODE daily_sales
+SQL >
+ %
+ SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales
+ FROM teams
+ WHERE
+ day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now())
+ and country = {{ String(country, 'US')}}
+ GROUP BY day, country
+
+TYPE COPY
+TARGET_DATASOURCE sales_hour_copy
+COPY_SCHEDULE 0 * * * *
+
+
+
+
+- Do not create materialized pipes by default, unless the user asks for it.
+- Materialized pipes should be created in the /materializations folder.
+- In a .pipe file you can define how to materialize each row ingested in the earliest Data Source in the Pipe query to a materialized Data Source. Materialization happens at ingest.
+- DATASOURCE: Required when TYPE is MATERIALIZED. Sets the target Data Source for materialized nodes.
+- TYPE MATERIALIZED is the type of the pipe and it is mandatory for materialized pipes.
+- The content of the .pipe file must follow the materialized_pipe_content format.
+- Use State modifier for the aggregated columns in the pipe.
+
+
+NODE daily_sales
+SQL >
+ SELECT toStartOfDay(starting_date) day, country, sumState(sales) as total_sales
+ FROM teams
+ GROUP BY day, country
+
+TYPE MATERIALIZED
+DATASOURCE sales_by_hour
+
+
+- The target datasource of a materialized pipe must have an AggregatingMergeTree engine.
+- Use AggregateFunction for the aggregated columns in the pipe.
+- Pipes using a materialized data source must use the Merge modifier in the SQL query for the aggregated columns. Example: sumMerge(total_sales)
+- Put all dimensions in the ENGINE_SORTING_KEY, sorted from least to most cardinality.
+
+
+SCHEMA >
+ `total_sales` AggregateFunction(sum, Float64),
+ `sales_count` AggregateFunction(count, UInt64),
+ `column_name_2` AggregateFunction(avg, Float64),
+ `dimension_1` String,
+ `dimension_2` String,
+ ...
+ `date` DateTime
+
+ENGINE "AggregatingMergeTree"
+ENGINE_PARTITION_KEY "toYYYYMM(date)"
+ENGINE_SORTING_KEY "date, dimension_1, dimension_2, ..."
+
+
+
+
+- Do not create sink pipes by default, unless the user asks for it.
+- Sink pipes should be created in the /sinks folder.
+- In a .pipe file you can define how to export the result of a Pipe to an external system, optionally with a schedule.
+- Valid external systems are Kafka, S3, GCS.
+- Sink pipes depend on a connection, if no connection is provided, search for an existing connection that suits the request. If none, create a new connection.
+- Do not include EXPORT_SCHEDULE in the .pipe file unless is specifically requested by the user.
+- EXPORT_SCHEDULE is a cron expression that defines the schedule of the sink pipe.
+- EXPORT_SCHEDULE is optional and if not provided, the sink pipe will be executed only once.
+- EXPORT_CONNECTION_NAME is the name of the connection used to export.
+- TYPE SINK is the type of the pipe and it is mandatory for sink pipes.
+- If the sink pipe uses parameters, you must include the % character and a newline on top of every query to be able to use the parameters.
+- The content of the .pipe file must follow this format:
+DESCRIPTION Sink Pipe to export sales hour every hour using my_connection
+
+NODE daily_sales
+SQL >
+ %
+ SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales
+ FROM teams
+ WHERE
+ day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now())
+ and country = {{ String(country, 'US')}}
+ GROUP BY day, country
+
+TYPE sink
+EXPORT_CONNECTION_NAME "my_connection"
+EXPORT_BUCKET_URI "s3://tinybird-sinks"
+EXPORT_FILE_TEMPLATE "daily_prices"
+EXPORT_SCHEDULE "*/5 * * * *"
+EXPORT_FORMAT "csv"
+EXPORT_COMPRESSION "gz"
+EXPORT_STRATEGY "truncate"
+
+
+
+
+ - Content cannot be empty.
+ - The connection names must be unique.
+ - No indentation is allowed for property names
+ - We support kafka, gcs and s3 connections for now
+
+
+
+
+TYPE kafka
+KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("PRODUCTION_KAFKA_SERVERS", "localhost:9092") }}
+KAFKA_SECURITY_PROTOCOL SASL_SSL
+KAFKA_SASL_MECHANISM PLAIN
+KAFKA_KEY {{ tb_secret("PRODUCTION_KAFKA_USERNAME", "") }}
+KAFKA_SECRET {{ tb_secret("PRODUCTION_KAFKA_PASSWORD", "") }}
+
+
+
+
+TYPE gcs
+GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("PRODUCTION_GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON", "") }}
+
+
+
+
+TYPE gcs
+GCS_HMAC_ACCESS_ID {{ tb_secret("gcs_hmac_access_id") }}
+GCS_HMAC_SECRET {{ tb_secret("gcs_hmac_secret") }}
+
+
+
+
+TYPE s3
+S3_REGION {{ tb_secret("PRODUCTION_S3_REGION", "") }}
+S3_ARN {{ tb_secret("PRODUCTION_S3_ARN", "") }}
+
+
+
+## .test file instructions
+Follow these instructions when creating or updating .yaml files for tests:
+
+- The test file name must match the name of the pipe it is testing.
+- Every scenario name must be unique inside the test file.
+- When looking for the parameters available, you will find them in the pipes in the following format: {{{{String(my_param_name, default_value)}}}}.
+- If there are no parameters, you can omit parameters and generate a single test.
+- The format of the parameters is the following: param1=value1¶m2=value2¶m3=value3
+- If some parameters are provided by the user and you need to use them, preserve in the same format as they were provided, like case sensitive
+- Test as many scenarios as possible.
+- The format of the test file is the following:
+
+- name: kpis_single_day
+ description: Test hourly granularity for a single day
+ parameters: date_from=2024-01-01&date_to=2024-01-01
+ expected_result: |
+ {"date":"2024-01-01 00:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+ {"date":"2024-01-01 01:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+
+- name: kpis_date_range
+ description: Test daily granularity for a date range
+ parameters: date_from=2024-01-01&date_to=2024-01-31
+ expected_result: |
+ {"date":"2024-01-01","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+ {"date":"2024-01-02","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+
+- name: kpis_default_range
+ description: Test default behavior without date parameters (last 7 days)
+ parameters: ''
+ expected_result: |
+ {"date":"2025-01-10","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+ {"date":"2025-01-11","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+
+- name: kpis_fixed_time
+ description: Test with fixed timestamp for consistent testing
+ parameters: fixed_time=2024-01-15T12:00:00
+ expected_result: ''
+
+- name: kpis_single_day
+ description: Test single day with hourly granularity
+ parameters: date_from=2024-01-01&date_to=2024-01-01
+ expected_result: |
+ {"date":"2024-01-01 00:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+ {"date":"2024-01-01 01:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0}
+
+
+
+
+## Deployment instructions
+Follow these instructions when evolving a datasource schema:
+
+- When you make schema changes that are incompatible with the old schema, you must use a forward query in your data source. Forward queries are necessary when introducing breaking changes. Otherwise, your deployment will fail due to a schema mismatch.
+- Forward queries translate the old schema to a new one that you define in the .datasource file. This helps you evolve your schema while continuing to ingest data.
+Follow these steps to evolve your schema using a forward query:
+- Edit the .datasource file to add a forward query.
+- Run tb deploy --check to validate the deployment before creating it.
+- Deploy and promote your changes in Tinybird Cloud using {base_command} --cloud deploy.
+
+SCHEMA >
+ `timestamp` DateTime `json:$.timestamp`,
+ `session_id` UUID `json:$.session_id`,
+ `action` String `json:$.action`,
+ `version` String `json:$.version`,
+ `payload` String `json:$.payload`
+
+FORWARD_QUERY >
+ select timestamp, toUUID(session_id) as session_id, action, version, payload
+
+
+
diff --git a/tests/integration/project/datasources/simple.datasource b/tests/integration/project/datasources/simple.datasource
new file mode 100644
index 0000000..218227b
--- /dev/null
+++ b/tests/integration/project/datasources/simple.datasource
@@ -0,0 +1,10 @@
+DESCRIPTION >
+ Simple Key-Value Data Source
+
+SCHEMA >
+ id UUID `json:$.Id`,
+ timestamp DateTime64(6) `json:$.Timestamp`,
+ key String `json:$.Key`,
+ value String `json:$.Value`
+
+ENGINE "MergeTree"
diff --git a/tests/integration/project/endpoints/simple_kv.pipe b/tests/integration/project/endpoints/simple_kv.pipe
new file mode 100644
index 0000000..c95e8e8
--- /dev/null
+++ b/tests/integration/project/endpoints/simple_kv.pipe
@@ -0,0 +1,14 @@
+VERSION 0
+
+DESCRIPTION >
+ Endpoint to select unique key/value pairs from simple
+
+NODE endpoint
+SQL >
+ %
+ SELECT key, value
+ FROM simple
+ ORDER BY key, timestamp desc
+ LIMIT 1 by key
+
+TYPE ENDPOINT
diff --git a/tests/integration/test_client.py b/tests/integration/test_client.py
new file mode 100644
index 0000000..5de8c04
--- /dev/null
+++ b/tests/integration/test_client.py
@@ -0,0 +1,3 @@
+def test_client_has_token(client):
+ """Makes sure the client fixture loaded the admin token correctly"""
+ assert client.token.startswith("p.e")
diff --git a/tests/integration/test_datasource.py b/tests/integration/test_datasource.py
new file mode 100644
index 0000000..508f9e9
--- /dev/null
+++ b/tests/integration/test_datasource.py
@@ -0,0 +1,60 @@
+import logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class TestDatasource:
+ def test_append_ndjson_query_truncate(self, client):
+ ds = client.datasource("simple")
+ ds.truncate()
+
+ ds.append_ndjson(
+ [
+ {
+ "Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
+ "Timestamp": "2024-01-23T10:30:00.123456",
+ "Key": "foo",
+ "Value": "bar",
+ },
+ {
+ "Id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
+ "Timestamp": "2024-02-23T11:45:00.234567",
+ "Key": "baz",
+ "Value": "ed",
+ },
+ ]
+ )
+
+ query = client.sql("SELECT * FROM simple")
+ response = query.json()
+ assert response.data == [
+ {
+ "id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
+ "timestamp": "2024-01-23 10:30:00.123456",
+ "key": "foo",
+ "value": "bar",
+ },
+ {
+ "id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
+ "timestamp": "2024-02-23 11:45:00.234567",
+ "key": "baz",
+ "value": "ed",
+ },
+ ]
+
+ query = client.sql("SELECT count(*) as cnt FROM simple")
+ response = query.json()
+ assert response.data == [{"cnt": 2}]
+
+ # remove all records from the table
+ ds.truncate()
+
+ # check that the table is empty
+ query = client.sql("SELECT count(*) as cnt FROM simple")
+ response = query.json()
+ assert response.data == [{"cnt": 0}]
+
+ query = client.sql("SELECT * FROM simple")
+ response = query.json()
+ assert response.data == []
diff --git a/tests/integration/test_pipe.py b/tests/integration/test_pipe.py
new file mode 100644
index 0000000..16b7592
--- /dev/null
+++ b/tests/integration/test_pipe.py
@@ -0,0 +1,35 @@
+class TestPipe:
+ def test_pipe_query(self, client):
+ ds = client.datasource("simple")
+ ds.truncate()
+
+ ds.append_ndjson(
+ [
+ {
+ "Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
+ "Timestamp": "2024-01-23T10:30:00.123456",
+ "Key": "foo",
+ "Value": "bar",
+ },
+ {
+ "Id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
+ "Timestamp": "2024-02-23T11:45:00.234567",
+ "Key": "baz",
+ "Value": "ed",
+ },
+ {
+ "Id": "fc71d4d5-7e0c-492a-9e3f-8f1cde9bcfaf",
+ "Timestamp": "2024-03-23T11:45:00.234567",
+ "Key": "foo",
+ "Value": "bar2",
+ },
+ ]
+ )
+
+ pipe = client.pipe("simple_kv")
+
+ response = pipe.query()
+ assert response.data == [
+ {"key": "baz", "value": "ed"},
+ {"key": "foo", "value": "bar2"},
+ ]
diff --git a/verdin/datasource.py b/verdin/datasource.py
index 2342c38..6fc6b6d 100644
--- a/verdin/datasource.py
+++ b/verdin/datasource.py
@@ -103,6 +103,21 @@ def append_ndjson(self, records: List[Dict]) -> requests.Response:
)
return requests.post(url=self.api, params=query, headers=headers, data=data)
+ def truncate(self):
+ """
+ Truncate the datasource which removes all records in the table.
+ """
+ headers = {}
+ if self.token:
+ headers["Authorization"] = f"Bearer {self.token}"
+
+ url = f"{self.api}/{self.canonical_name}/truncate"
+ LOG.debug(
+ "truncating table %s",
+ self.canonical_name,
+ )
+ requests.post(url=url, headers=headers)
+
@staticmethod
def to_csv(records: List[List[Any]], **kwargs):
return to_csv(records, **kwargs)
diff --git a/verdin/test/__init__.py b/verdin/test/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/verdin/test/cli.py b/verdin/test/cli.py
new file mode 100644
index 0000000..b0defe9
--- /dev/null
+++ b/verdin/test/cli.py
@@ -0,0 +1,153 @@
+"""Wrapper around the Tinybird CLI to make available the main commands programmatically."""
+
+import dataclasses
+import logging
+import os
+import re
+import subprocess
+
+LOG = logging.getLogger(__name__)
+
+
+@dataclasses.dataclass
+class Token:
+ id: str
+ name: str
+ token: str
+
+
+class CliError(Exception):
+ def __init__(self, output: str, orig: subprocess.SubprocessError) -> None:
+ super().__init__(output)
+ self.orig = orig
+
+
+class TinybirdCli:
+ """Interface around the Tinybird CLI"""
+
+ def __init__(self, host: str = None, token: str = None, cwd: str = None, local: bool = False):
+ self.host = host
+ self.token = token
+ self.cwd = cwd
+ self.local = local
+
+ def _env(self) -> dict:
+ """
+ Returns a dictionary of environment variables to be used when calling tb CLI commands.
+ """
+ _env = dict(os.environ)
+
+ if self.host:
+ _env["TB_HOST"] = self.host
+ if self.token:
+ _env["TB_TOKEN"] = self.token
+
+ return _env
+
+ def _get_base_args(self) -> list[str]:
+ args = ["tb"]
+ if self.local:
+ args.append("--local")
+ return args
+
+ def token_ls(self) -> list[Token]:
+ """
+ List all tokens.
+
+ :return: List of Token instances
+ """
+ args = [*self._get_base_args(), "token", "ls"]
+
+ output = subprocess.check_output(
+ args,
+ encoding="utf-8",
+ cwd=self.cwd,
+ env=self._env(),
+ )
+ """
+ output looks like this (unfortunately --output=json doesn't work)
+
+ ** Tokens:
+ --------------------------------------------------------------------------------
+ id: 63678691-7e28-4f2d-8ef7-243ab19ad7cb
+ name: workspace admin token
+ token: p.eyJ1IjogIjU2ZThhYmMzLWRjNmYtNDcyYi05Yzg1LTdkZjFiZmUyNjU5YyIsICJpZCI6ICI2MzY3ODY5MS03ZTI4LTRmMmQtOGVmNy0yNDNhYjE5YWQ3Y2IiLCAiaG9zdCI6ICJsb2NhbCJ9.4gzsbiG1cnrIDUfHTxfQd0ZN57YkiOKEIyvuTlnLiaM
+ --------------------------------------------------------------------------------
+ id: 489c8ca1-195b-4383-a388-d84068ff1b2c
+ name: admin local_testing@tinybird.co
+ token: p.eyJ1IjogIjU2ZThhYmMzLWRjNmYtNDcyYi05Yzg1LTdkZjFiZmUyNjU5YyIsICJpZCI6ICI0ODljOGNhMS0xOTViLTQzODMtYTM4OC1kODQwNjhmZjFiMmMiLCAiaG9zdCI6ICJsb2NhbCJ9.MmcBjRTCg6dX53sWsZAv6QzHRHKxwu-pEWkqx8opLHA
+ --------------------------------------------------------------------------------
+ """
+ tokens = []
+ current_token = {}
+
+ for line in output.splitlines():
+ # remove color codes
+ line = re.sub(r"\x1b\[[0-9;]*m", "", line)
+ line = line.strip()
+ if line.startswith("id: "):
+ current_token = {}
+ current_token["id"] = line[4:]
+ elif line.startswith("name: "):
+ current_token["name"] = line[6:]
+ elif line.startswith("token: "):
+ current_token["token"] = line[7:]
+ tokens.append(Token(**current_token))
+
+ return tokens
+
+ def local_start(
+ self, daemon: bool = False, skip_new_version: bool = False, volumes_path: str = None
+ ) -> subprocess.Popen:
+ """
+ Run ``tb local start`` and return the subprocess.
+ """
+ args = ["tb", "local", "start"]
+ if daemon:
+ args.append("-d")
+ if skip_new_version:
+ args.append("--skip-new-version")
+ if volumes_path:
+ args.append("--volumes-path")
+ args.append(volumes_path)
+
+ return subprocess.Popen(args, cwd=self.cwd, env=self._env())
+
+ def local_stop(self):
+ """
+ Run ``tb local stop``.
+ """
+ subprocess.check_output(["tb", "local", "stop"])
+
+ def local_remove(self):
+ """
+ Run ``tb local remove``.
+ """
+ subprocess.check_output(
+ ["tb", "local", "remove"],
+ input=b"y\n",
+ )
+
+ def deploy(
+ self, wait: bool = False, auto: bool = False, allow_destructive_operations: bool = False
+ ):
+ args = [*self._get_base_args(), "deploy"]
+
+ if wait:
+ args.append("--wait")
+ if auto:
+ args.append("--auto")
+ if allow_destructive_operations:
+ args.append("--allow-destructive-operations")
+
+ try:
+ output = subprocess.check_output(
+ args,
+ encoding="utf-8",
+ cwd=self.cwd,
+ env=self._env(),
+ )
+ except subprocess.CalledProcessError as e:
+ raise CliError(f"Failed to deploy project:\n{e.output}", e) from e
+
+ return output
diff --git a/verdin/test/container.py b/verdin/test/container.py
new file mode 100644
index 0000000..6a645fb
--- /dev/null
+++ b/verdin/test/container.py
@@ -0,0 +1,82 @@
+import subprocess
+import time
+
+import requests
+
+from verdin.test.cli import TinybirdCli
+from verdin.client import Client
+
+
+class TinybirdLocalContainer:
+ def __init__(self, cwd: str = None):
+ """
+ Creates a new TinybirdLocalContainer instance.
+
+ :param cwd: The current working directory to use for the tinybird local container.
+ """
+ self.cwd = cwd
+ self.url = "http://localhost:7181"
+ self.proc: None | subprocess.Popen = None
+
+ def start(self):
+ """
+ Start the tinybird local container in a background process.
+ """
+ cli = TinybirdCli(cwd=self.cwd, local=True)
+ self.proc = cli.local_start(daemon=True, skip_new_version=True)
+
+ def client(self) -> Client:
+ """
+ Returns a tinybird Client that connects to this container with admin privileged.
+
+ :return: Tinybird Client
+ """
+ cli = TinybirdCli(host=self.url, cwd=self.cwd, local=True)
+
+ cli_tokens = cli.token_ls()
+
+ # i'm not really sure why this is needed, but when we use a token returned by the /tokens api, the
+ # client cannot find datasources created through ``tb deploy``.
+ token_to_use = None
+ for token in cli_tokens:
+ if token.name == "admin local_testing@tinybird.co":
+ token_to_use = token.token
+ break
+
+ return Client(
+ token=token_to_use,
+ api=self.url,
+ )
+
+ def wait_is_up(self, timeout: int = 120):
+ """
+ Wait for the container to appear by querying the tokens endpoint.
+
+ :param timeout: Timeout in seconds
+ :raises TimeoutError: If the container does not appear within the timeout
+ """
+ # Wait for the service to become available
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ try:
+ response = requests.get(f"{self.url}/tokens")
+ if response.status_code == 200:
+ break
+ except requests.RequestException:
+ pass
+ time.sleep(1)
+ else:
+ raise TimeoutError("Tinybird container failed to start within timeout")
+
+ def stop(self):
+ """
+ Stops and removes the tinybird local container.
+ """
+ cli = TinybirdCli(cwd=self.cwd, local=True)
+ cli.local_stop()
+
+ if self.proc:
+ self.proc.kill()
+ self.proc = None
+
+ cli.local_remove()