diff --git a/pyproject.toml b/pyproject.toml index 55d8dee..016a499 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dev = [ "coverage[toml]>=5.0", "pytest-cov>=2.7.1", "coveralls", + "tinybird", ] [tool.hatch.version] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..aab739c --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,49 @@ +import os +import time + +import pytest + +from verdin.client import Client +from verdin.test.cli import TinybirdCli +from verdin.test.container import TinybirdLocalContainer + + +@pytest.fixture(scope="session") +def client(tinybird_local_container) -> Client: + return tinybird_local_container.client() + + +@pytest.fixture(scope="session") +def cli(tinybird_local_container) -> TinybirdCli: + project_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "project")) + + return TinybirdCli( + host=tinybird_local_container.url, + local=True, + cwd=project_dir, + ) + + +@pytest.fixture(scope="session", autouse=True) +def tinybird_local_container(): + """ + Starts a tinybird local container in the background and waits until it becomes available. + """ + project_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "project")) + + container = TinybirdLocalContainer(cwd=project_dir) + + container.start() + container.wait_is_up() + + yield container + + # cleanup + container.stop() + + +@pytest.fixture(scope="session", autouse=True) +def deployed_project(cli): + time.sleep(5) + cli.deploy(wait=True, auto=True) + yield diff --git a/tests/integration/project/.cursorrules b/tests/integration/project/.cursorrules new file mode 100644 index 0000000..a8744de --- /dev/null +++ b/tests/integration/project/.cursorrules @@ -0,0 +1,374 @@ + +You are an expert in SQL and Tinybird. Follow these instructions when working with .datasource and .pipe files: + + +You have commands at your disposal to develop a tinybird project: +- tb build: to build the project locally and check it works. +- tb deployment create --wait --auto: to create a deployment and promote it automatically +- tb test run: to run existing tests +- tb endpoint url : to get the url of an endpoint, token included. +- tb endpoint data : to get the data of an endpoint. You can pass parameters to the endpoint like this: tb endpoint data --param1 value1 --param2 value2 +- tb token ls: to list all the tokens +There are other commands that you can use, but these are the most common ones. Run `tb -h` to see all the commands if needed. +When you need to work with resources or data in cloud, add always the --cloud flag before the command. Example: tb --cloud datasource ls + + +- When asking to create a tinybird data project, if the needed folders are not already created, use the following structure: +├── connections +├── copies +├── sinks +├── datasources +├── endpoints +├── fixtures +├── materializations +├── pipes +└── tests +- The local development server will be available at http://localhost:7181. Even if some response uses another base url, use always http://localhost:7181. +- After every change in your .datasource, .pipe or .ndjson files, run `tb build` to build the project locally. +- When you need to ingest data locally in a datasource, create a .ndjson file with the same name of the datasource and the data you want and run `tb build` so the data is ingested. +- The format of the generated api endpoint urls is: http://localhost:7181/v0/pipe/.json?token= +- Before running the tests, remember to have the project built with `tb build` with the latest changes. + +When asking for ingesting data, adding data or appending data do the following depending on the environment you want to work with: + +- When building locally, create a .ndjson file with the data you want to ingest and do `tb build` to ingest the data in the build env. +- We call `cloud` the production environment. +- When appending data in cloud, use `tb --cloud datasource append ` +- When you have a response that says “there are rows in quarantine”, do `tb [--cloud] datasource data _quarantine` to understand what is the problem. + + +Follow these instructions when creating or updating .datasource files: + + + - Content cannot be empty. + - The datasource names must be unique. + - No indentation is allowed for property names: DESCRIPTION, SCHEMA, ENGINE, ENGINE_PARTITION_KEY, ENGINE_SORTING_KEY, etc. + - Use MergeTree engine by default. + - Use AggregatingMergeTree engine when the datasource is the target of a materialized pipe. + - Use always json paths to define the schema. Example: `user_id` String `json:$.user_id`, + - Array columns are supported with a special syntax. Example: `items` Array(String) `json:$.items[:]` + - If the datasource is using an S3 or GCS connection, they need to set IMPORT_CONNECTION_NAME, IMPORT_BUCKET_URI and IMPORT_SCHEDULE (GCS @on-demand only, S3 supports @auto too) + - If the datasource is using a Kafka connection, they need to set KAFKA_CONNECTION_NAME as the name of the .connection file, KAFKA_TOPIC topic_name and KAFKA_GROUP_ID as the group id for the datasource + - Unless the user asks for them, do not include ENGINE_PARTITION_KEY and ENGINE_PRIMARY_KEY. + - DateTime64 type without precision is not supported. Use DateTime64(3) instead. + + + + + +Follow these instructions when creating or updating .pipe files: + +Follow these instructions when creating or updating any type of .pipe file: + + - The pipe names must be unique. + - Nodes do NOT use the same name as the Pipe they belong to. So if the pipe name is "my_pipe", the nodes must be named different like "my_pipe_node_1", "my_pipe_node_2", etc. + - Node names MUST be different from the resource names in the project. + - No indentation is allowed for property names: DESCRIPTION, NODE, SQL, TYPE, etc. + - Allowed TYPE values are: endpoint, copy, materialized, sink. + - Add always the output node in the TYPE section or in the last node of the pipe. + + + + + - The SQL query must be a valid ClickHouse SQL query that mixes ClickHouse syntax and Tinybird templating syntax (Tornado templating language under the hood). + - SQL queries with parameters must start with "%" character and a newline on top of every query to be able to use the parameters. Examples: + + SELECT * FROM events WHERE session_id={{String(my_param, "default_value")}} + + + % + SELECT * FROM events WHERE session_id={{String(my_param, "default_value")}} + + - The Parameter functions like this one {{String(my_param_name,default_value)}} can be one of the following: String, DateTime, Date, Float32, Float64, Int, Integer, UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 + - Parameter names must be different from column names. Pass always the param name and a default value to the function. + - Use ALWAYS hardcoded values for default values for parameters. + - Code inside the template {{template_expression}} follows the rules of Tornado templating language so no module is allowed to be imported. So for example you can't use now() as default value for a DateTime parameter. You need an if else block like this: + + AND timestamp BETWEEN {DateTime(start_date, now() - interval 30 day)} AND {DateTime(end_date, now())} + + + {%if not defined(start_date)%} + timestamp BETWEEN now() - interval 30 day + {%else%} + timestamp BETWEEN {{DateTime(start_date)}} + {%end%} + {%if not defined(end_date)%} + AND now() + {%else%} + AND {{DateTime(end_date)}} + {%end%} + + - Parameters must not be quoted. + - When you use defined function with a paremeter inside, do NOT add quotes around the parameter: + {% if defined('my_param') %} + {% if defined(my_param) %} + - Use datasource names as table names when doing SELECT statements. + - Do not use pipe names as table names. + - The available datasource names to use in the SQL are the ones present in the existing_resources section or the ones you will create. + - Use node names as table names only when nodes are present in the same file. + - Do not reference the current node name in the SQL. + - SQL queries only accept SELECT statements with conditions, aggregations, joins, etc. + - Do NOT use CREATE TABLE, INSERT INTO, CREATE DATABASE, etc. + - Use ONLY SELECT statements in the SQL section. + - INSERT INTO is not supported in SQL section. + - ClickHouse functions supported are: + - General functions supported are: ['BLAKE3', 'CAST', 'CHARACTER_LENGTH', 'CHAR_LENGTH', 'CRC32', 'CRC32IEEE', 'CRC64', 'DATABASE', 'DATE', 'DATE_DIFF', 'DATE_FORMAT', 'DATE_TRUNC', 'DAY', 'DAYOFMONTH', 'DAYOFWEEK', 'DAYOFYEAR', 'FORMAT_BYTES', 'FQDN', 'FROM_BASE64', 'FROM_DAYS', 'FROM_UNIXTIME', 'HOUR', 'INET6_ATON', 'INET6_NTOA', 'INET_ATON', 'INET_NTOA', 'IPv4CIDRToRange', 'IPv4NumToString', 'IPv4NumToStringClassC', 'IPv4StringToNum', 'IPv4StringToNumOrDefault', 'IPv4StringToNumOrNull', 'IPv4ToIPv6', 'IPv6CIDRToRange', 'IPv6NumToString', 'IPv6StringToNum', 'IPv6StringToNumOrDefault', 'IPv6StringToNumOrNull', 'JSONArrayLength', 'JSONExtract', 'JSONExtractArrayRaw', 'JSONExtractBool', 'JSONExtractFloat', 'JSONExtractInt', 'JSONExtractKeys', 'JSONExtractKeysAndValues', 'JSONExtractKeysAndValuesRaw', 'JSONExtractRaw', 'JSONExtractString', 'JSONExtractUInt', 'JSONHas', 'JSONKey', 'JSONLength', 'JSONRemoveDynamoDBAnnotations', 'JSONType', 'JSON_ARRAY_LENGTH', 'JSON_EXISTS', 'JSON_QUERY', 'JSON_VALUE', 'L1Distance', 'L1Norm', 'L1Normalize', 'L2Distance', 'L2Norm', 'L2Normalize', 'L2SquaredDistance', 'L2SquaredNorm', 'LAST_DAY', 'LinfDistance', 'LinfNorm', 'LinfNormalize', 'LpDistance', 'LpNorm', 'LpNormalize', 'MACNumToString', 'MACStringToNum', 'MACStringToOUI', 'MAP_FROM_ARRAYS', 'MD4', 'MD5', 'MILLISECOND', 'MINUTE', 'MONTH', 'OCTET_LENGTH', 'QUARTER', 'REGEXP_EXTRACT', 'REGEXP_MATCHES', 'REGEXP_REPLACE', 'SCHEMA', 'SECOND', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'SHA512_256', 'SUBSTRING_INDEX', 'SVG', 'TIMESTAMP_DIFF', 'TO_BASE64', 'TO_DAYS', 'TO_UNIXTIME', 'ULIDStringToDateTime', 'URLHash', 'URLHierarchy', 'URLPathHierarchy', 'UTCTimestamp', 'UTC_timestamp', 'UUIDNumToString', 'UUIDStringToNum', 'UUIDToNum', 'UUIDv7ToDateTime', 'YEAR', 'YYYYMMDDToDate', 'YYYYMMDDToDate32', 'YYYYMMDDhhmmssToDateTime', 'YYYYMMDDhhmmssToDateTime64'] + - Character insensitive functions supported are: ['cast', 'character_length', 'char_length', 'crc32', 'crc32ieee', 'crc64', 'database', 'date', 'date_format', 'date_trunc', 'day', 'dayofmonth', 'dayofweek', 'dayofyear', 'format_bytes', 'fqdn', 'from_base64', 'from_days', 'from_unixtime', 'hour', 'inet6_aton', 'inet6_ntoa', 'inet_aton', 'inet_ntoa', 'json_array_length', 'last_day', 'millisecond', 'minute', 'month', 'octet_length', 'quarter', 'regexp_extract', 'regexp_matches', 'regexp_replace', 'schema', 'second', 'substring_index', 'to_base64', 'to_days', 'to_unixtime', 'utctimestamp', 'utc_timestamp', 'year'] + - Aggregate functions supported are: ['BIT_AND', 'BIT_OR', 'BIT_XOR', 'COVAR_POP', 'COVAR_SAMP', 'STD', 'STDDEV_POP', 'STDDEV_SAMP', 'VAR_POP', 'VAR_SAMP', 'aggThrow', 'analysisOfVariance', 'anova', 'any', 'anyHeavy', 'anyLast', 'anyLast_respect_nulls', 'any_respect_nulls', 'any_value', 'any_value_respect_nulls', 'approx_top_count', 'approx_top_k', 'approx_top_sum', 'argMax', 'argMin', 'array_agg', 'array_concat_agg', 'avg', 'avgWeighted', 'boundingRatio', 'categoricalInformationValue', 'contingency', 'corr', 'corrMatrix', 'corrStable', 'count', 'covarPop', 'covarPopMatrix', 'covarPopStable', 'covarSamp', 'covarSampMatrix', 'covarSampStable', 'cramersV', 'cramersVBiasCorrected', 'deltaSum', 'deltaSumTimestamp', 'dense_rank', 'entropy', 'exponentialMovingAverage', 'exponentialTimeDecayedAvg', 'exponentialTimeDecayedCount', 'exponentialTimeDecayedMax', 'exponentialTimeDecayedSum', 'first_value', 'first_value_respect_nulls', 'flameGraph', 'groupArray', 'groupArrayInsertAt', 'groupArrayIntersect', 'groupArrayLast', 'groupArrayMovingAvg', 'groupArrayMovingSum', 'groupArraySample', 'groupArraySorted', 'groupBitAnd', 'groupBitOr', 'groupBitXor', 'groupBitmap', 'groupBitmapAnd', 'groupBitmapOr', 'groupBitmapXor', 'groupUniqArray', 'histogram', 'intervalLengthSum', 'kolmogorovSmirnovTest', 'kurtPop', 'kurtSamp', 'lagInFrame', 'largestTriangleThreeBuckets', 'last_value', 'last_value_respect_nulls', 'leadInFrame', 'lttb', 'mannWhitneyUTest', 'max', 'maxIntersections', 'maxIntersectionsPosition', 'maxMappedArrays', 'meanZTest', 'median', 'medianBFloat16', 'medianBFloat16Weighted', 'medianDD', 'medianDeterministic', 'medianExact', 'medianExactHigh', 'medianExactLow', 'medianExactWeighted', 'medianGK', 'medianInterpolatedWeighted', 'medianTDigest', 'medianTDigestWeighted', 'medianTiming', 'medianTimingWeighted', 'min', 'minMappedArrays', 'nonNegativeDerivative', 'nothing', 'nothingNull', 'nothingUInt64', 'nth_value', 'ntile', 'quantile', 'quantileBFloat16', 'quantileBFloat16Weighted', 'quantileDD', 'quantileDeterministic', 'quantileExact', 'quantileExactExclusive', 'quantileExactHigh', 'quantileExactInclusive', 'quantileExactLow', 'quantileExactWeighted', 'quantileGK', 'quantileInterpolatedWeighted', 'quantileTDigest', 'quantileTDigestWeighted', 'quantileTiming', 'quantileTimingWeighted', 'quantiles', 'quantilesBFloat16', 'quantilesBFloat16Weighted', 'quantilesDD', 'quantilesDeterministic', 'quantilesExact', 'quantilesExactExclusive', 'quantilesExactHigh', 'quantilesExactInclusive', 'quantilesExactLow', 'quantilesExactWeighted', 'quantilesGK', 'quantilesInterpolatedWeighted', 'quantilesTDigest', 'quantilesTDigestWeighted', 'quantilesTiming', 'quantilesTimingWeighted', 'rank', 'rankCorr', 'retention', 'row_number', 'sequenceCount', 'sequenceMatch', 'sequenceNextNode', 'simpleLinearRegression', 'singleValueOrNull', 'skewPop', 'skewSamp', 'sparkBar', 'sparkbar', 'stddevPop', 'stddevPopStable', 'stddevSamp', 'stddevSampStable', 'stochasticLinearRegression', 'stochasticLogisticRegression', 'studentTTest', 'sum', 'sumCount', 'sumKahan', 'sumMapFiltered', 'sumMapFilteredWithOverflow', 'sumMapWithOverflow', 'sumMappedArrays', 'sumWithOverflow', 'theilsU', 'topK', 'topKWeighted', 'uniq', 'uniqCombined', 'uniqCombined64', 'uniqExact', 'uniqHLL12', 'uniqTheta', 'uniqUpTo', 'varPop', 'varPopStable', 'varSamp', 'varSampStable', 'welchTTest', 'windowFunnel'] + - How to use ClickHouse supported functions: + - When using functions try always ClickHouse functions first, then SQL functions. + - Do not use any ClickHouse function that is not present in the list of general functions, character insensitive functions and aggregate functions. + - If the function is not present in the list, the sql query will fail, so avoid at all costs to use any function that is not present in the list. + - When aliasing a column, use first the column name and then the alias. + - General functions and aggregate functions are case sensitive. + - Character insensitive functions are case insensitive. + - Parameters are never quoted in any case. + - Use the following syntax in the SQL section for the iceberg table function: iceberg('s3://bucket/path/to/table', {{tb_secret('aws_access_key_id')}}, {{tb_secret('aws_secret_access_key')}}) + - Use the following syntax in the SQL section for the postgres table function: postgresql('host:port', 'database', 'table', {{tb_secret('db_username')}}, {{tb_secret('db_password')}}), 'schema') + + + + +DESCRIPTION > + Some meaningful description of the datasource + +SCHEMA > + `column_name_1` clickhouse_tinybird_compatible_data_type `json:$.column_name_1`, + `column_name_2` clickhouse_tinybird_compatible_data_type `json:$.column_name_2`, + ... + `column_name_n` clickhouse_tinybird_compatible_data_type `json:$.column_name_n` + +ENGINE "MergeTree" +ENGINE_PARTITION_KEY "partition_key" +ENGINE_SORTING_KEY "sorting_key_1, sorting_key_2, ..." + + + + +DESCRIPTION > + Some meaningful description of the pipe + +NODE node_1 +SQL > + [sql query using clickhouse syntax and tinybird templating syntax and starting always with SELECT or % +SELECT] +TYPE endpoint + + + + + +- Do not create copy pipes by default, unless the user asks for it. +- Copy pipes should be created in the /copies folder. +- In a .pipe file you can define how to export the result of a Pipe to a Data Source, optionally with a schedule. +- Do not include COPY_SCHEDULE in the .pipe file unless is specifically requested by the user. +- COPY_SCHEDULE is a cron expression that defines the schedule of the copy pipe. +- COPY_SCHEDULE is optional and if not provided, the copy pipe will be executed only once. +- TARGET_DATASOURCE is the name of the Data Source to export the result to. +- TYPE COPY is the type of the pipe and it is mandatory for copy pipes. +- If the copy pipe uses parameters, you must include the % character and a newline on top of every query to be able to use the parameters. +- The content of the .pipe file must follow this format: +DESCRIPTION Copy Pipe to export sales hour every hour to the sales_hour_copy Data Source + +NODE daily_sales +SQL > + % + SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales + FROM teams + WHERE + day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now()) + and country = {{ String(country, 'US')}} + GROUP BY day, country + +TYPE COPY +TARGET_DATASOURCE sales_hour_copy +COPY_SCHEDULE 0 * * * * + + + + +- Do not create materialized pipes by default, unless the user asks for it. +- Materialized pipes should be created in the /materializations folder. +- In a .pipe file you can define how to materialize each row ingested in the earliest Data Source in the Pipe query to a materialized Data Source. Materialization happens at ingest. +- DATASOURCE: Required when TYPE is MATERIALIZED. Sets the target Data Source for materialized nodes. +- TYPE MATERIALIZED is the type of the pipe and it is mandatory for materialized pipes. +- The content of the .pipe file must follow the materialized_pipe_content format. +- Use State modifier for the aggregated columns in the pipe. + + +NODE daily_sales +SQL > + SELECT toStartOfDay(starting_date) day, country, sumState(sales) as total_sales + FROM teams + GROUP BY day, country + +TYPE MATERIALIZED +DATASOURCE sales_by_hour + + +- The target datasource of a materialized pipe must have an AggregatingMergeTree engine. +- Use AggregateFunction for the aggregated columns in the pipe. +- Pipes using a materialized data source must use the Merge modifier in the SQL query for the aggregated columns. Example: sumMerge(total_sales) +- Put all dimensions in the ENGINE_SORTING_KEY, sorted from least to most cardinality. + + +SCHEMA > + `total_sales` AggregateFunction(sum, Float64), + `sales_count` AggregateFunction(count, UInt64), + `column_name_2` AggregateFunction(avg, Float64), + `dimension_1` String, + `dimension_2` String, + ... + `date` DateTime + +ENGINE "AggregatingMergeTree" +ENGINE_PARTITION_KEY "toYYYYMM(date)" +ENGINE_SORTING_KEY "date, dimension_1, dimension_2, ..." + + + + +- Do not create sink pipes by default, unless the user asks for it. +- Sink pipes should be created in the /sinks folder. +- In a .pipe file you can define how to export the result of a Pipe to an external system, optionally with a schedule. +- Valid external systems are Kafka, S3, GCS. +- Sink pipes depend on a connection, if no connection is provided, search for an existing connection that suits the request. If none, create a new connection. +- Do not include EXPORT_SCHEDULE in the .pipe file unless is specifically requested by the user. +- EXPORT_SCHEDULE is a cron expression that defines the schedule of the sink pipe. +- EXPORT_SCHEDULE is optional and if not provided, the sink pipe will be executed only once. +- EXPORT_CONNECTION_NAME is the name of the connection used to export. +- TYPE SINK is the type of the pipe and it is mandatory for sink pipes. +- If the sink pipe uses parameters, you must include the % character and a newline on top of every query to be able to use the parameters. +- The content of the .pipe file must follow this format: +DESCRIPTION Sink Pipe to export sales hour every hour using my_connection + +NODE daily_sales +SQL > + % + SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales + FROM teams + WHERE + day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now()) + and country = {{ String(country, 'US')}} + GROUP BY day, country + +TYPE sink +EXPORT_CONNECTION_NAME "my_connection" +EXPORT_BUCKET_URI "s3://tinybird-sinks" +EXPORT_FILE_TEMPLATE "daily_prices" +EXPORT_SCHEDULE "*/5 * * * *" +EXPORT_FORMAT "csv" +EXPORT_COMPRESSION "gz" +EXPORT_STRATEGY "truncate" + + + + + - Content cannot be empty. + - The connection names must be unique. + - No indentation is allowed for property names + - We support kafka, gcs and s3 connections for now + + + + +TYPE kafka +KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("PRODUCTION_KAFKA_SERVERS", "localhost:9092") }} +KAFKA_SECURITY_PROTOCOL SASL_SSL +KAFKA_SASL_MECHANISM PLAIN +KAFKA_KEY {{ tb_secret("PRODUCTION_KAFKA_USERNAME", "") }} +KAFKA_SECRET {{ tb_secret("PRODUCTION_KAFKA_PASSWORD", "") }} + + + + +TYPE gcs +GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("PRODUCTION_GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON", "") }} + + + + +TYPE gcs +GCS_HMAC_ACCESS_ID {{ tb_secret("gcs_hmac_access_id") }} +GCS_HMAC_SECRET {{ tb_secret("gcs_hmac_secret") }} + + + + +TYPE s3 +S3_REGION {{ tb_secret("PRODUCTION_S3_REGION", "") }} +S3_ARN {{ tb_secret("PRODUCTION_S3_ARN", "") }} + + + + +Follow these instructions when creating or updating .yaml files for tests: + +- The test file name must match the name of the pipe it is testing. +- Every scenario name must be unique inside the test file. +- When looking for the parameters available, you will find them in the pipes in the following format: {{{{String(my_param_name, default_value)}}}}. +- If there are no parameters, you can omit parameters and generate a single test. +- The format of the parameters is the following: param1=value1¶m2=value2¶m3=value3 +- If some parameters are provided by the user and you need to use them, preserve in the same format as they were provided, like case sensitive +- Test as many scenarios as possible. +- The format of the test file is the following: + +- name: kpis_single_day + description: Test hourly granularity for a single day + parameters: date_from=2024-01-01&date_to=2024-01-01 + expected_result: | + {"date":"2024-01-01 00:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + {"date":"2024-01-01 01:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + +- name: kpis_date_range + description: Test daily granularity for a date range + parameters: date_from=2024-01-01&date_to=2024-01-31 + expected_result: | + {"date":"2024-01-01","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + {"date":"2024-01-02","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + +- name: kpis_default_range + description: Test default behavior without date parameters (last 7 days) + parameters: '' + expected_result: | + {"date":"2025-01-10","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + {"date":"2025-01-11","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + +- name: kpis_fixed_time + description: Test with fixed timestamp for consistent testing + parameters: fixed_time=2024-01-15T12:00:00 + expected_result: '' + +- name: kpis_single_day + description: Test single day with hourly granularity + parameters: date_from=2024-01-01&date_to=2024-01-01 + expected_result: | + {"date":"2024-01-01 00:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + {"date":"2024-01-01 01:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + + + + + +Follow these instructions when evolving a datasource schema: + +- When you make schema changes that are incompatible with the old schema, you must use a forward query in your data source. Forward queries are necessary when introducing breaking changes. Otherwise, your deployment will fail due to a schema mismatch. +- Forward queries translate the old schema to a new one that you define in the .datasource file. This helps you evolve your schema while continuing to ingest data. +Follow these steps to evolve your schema using a forward query: +- Edit the .datasource file to add a forward query. +- Run tb deploy --check to validate the deployment before creating it. +- Deploy and promote your changes in Tinybird Cloud using {base_command} --cloud deploy. + +SCHEMA > + `timestamp` DateTime `json:$.timestamp`, + `session_id` UUID `json:$.session_id`, + `action` String `json:$.action`, + `version` String `json:$.version`, + `payload` String `json:$.payload` + +FORWARD_QUERY > + select timestamp, toUUID(session_id) as session_id, action, version, payload + + + + diff --git a/tests/integration/project/.env.local b/tests/integration/project/.env.local new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/project/.gitignore b/tests/integration/project/.gitignore new file mode 100644 index 0000000..464c1bb --- /dev/null +++ b/tests/integration/project/.gitignore @@ -0,0 +1,2 @@ +.tinyb +.terraform diff --git a/tests/integration/project/CLAUDE.md b/tests/integration/project/CLAUDE.md new file mode 100644 index 0000000..ffcff51 --- /dev/null +++ b/tests/integration/project/CLAUDE.md @@ -0,0 +1,373 @@ + +# Tinybird CLI rules + +## Commands +You have commands at your disposal to develop a tinybird project: +- tb build: to build the project locally and check it works. +- tb deployment create --wait --auto: to create a deployment and promote it automatically +- tb test run: to run existing tests +- tb endpoint url : to get the url of an endpoint, token included. +- tb endpoint data : to get the data of an endpoint. You can pass parameters to the endpoint like this: tb endpoint data --param1 value1 --param2 value2 +- tb token ls: to list all the tokens +There are other commands that you can use, but these are the most common ones. Run `tb -h` to see all the commands if needed. +When you need to work with resources or data in cloud, add always the --cloud flag before the command. Example: tb --cloud datasource ls + +## Development instructions +- When asking to create a tinybird data project, if the needed folders are not already created, use the following structure: +├── connections +├── copies +├── sinks +├── datasources +├── endpoints +├── fixtures +├── materializations +├── pipes +└── tests +- The local development server will be available at http://localhost:7181. Even if some response uses another base url, use always http://localhost:7181. +- After every change in your .datasource, .pipe or .ndjson files, run `tb build` to build the project locally. +- When you need to ingest data locally in a datasource, create a .ndjson file with the same name of the datasource and the data you want and run `tb build` so the data is ingested. +- The format of the generated api endpoint urls is: http://localhost:7181/v0/pipe/.json?token= +- Before running the tests, remember to have the project built with `tb build` with the latest changes. + +When asking for ingesting data, adding data or appending data do the following depending on the environment you want to work with: + +## Ingestion instructions +- When building locally, create a .ndjson file with the data you want to ingest and do `tb build` to ingest the data in the build env. +- We call `cloud` the production environment. +- When appending data in cloud, use `tb --cloud datasource append ` +- When you have a response that says “there are rows in quarantine”, do `tb [--cloud] datasource data _quarantine` to understand what is the problem. + +## .datasource file instructions +Follow these instructions when creating or updating .datasource files: + + + - Content cannot be empty. + - The datasource names must be unique. + - No indentation is allowed for property names: DESCRIPTION, SCHEMA, ENGINE, ENGINE_PARTITION_KEY, ENGINE_SORTING_KEY, etc. + - Use MergeTree engine by default. + - Use AggregatingMergeTree engine when the datasource is the target of a materialized pipe. + - Use always json paths to define the schema. Example: `user_id` String `json:$.user_id`, + - Array columns are supported with a special syntax. Example: `items` Array(String) `json:$.items[:]` + - If the datasource is using an S3 or GCS connection, they need to set IMPORT_CONNECTION_NAME, IMPORT_BUCKET_URI and IMPORT_SCHEDULE (GCS @on-demand only, S3 supports @auto too) + - If the datasource is using a Kafka connection, they need to set KAFKA_CONNECTION_NAME as the name of the .connection file, KAFKA_TOPIC topic_name and KAFKA_GROUP_ID as the group id for the datasource + - Unless the user asks for them, do not include ENGINE_PARTITION_KEY and ENGINE_PRIMARY_KEY. + - DateTime64 type without precision is not supported. Use DateTime64(3) instead. + + + +## .pipe file instructions +Follow these instructions when creating or updating .pipe files: + +Follow these instructions when creating or updating any type of .pipe file: + + - The pipe names must be unique. + - Nodes do NOT use the same name as the Pipe they belong to. So if the pipe name is "my_pipe", the nodes must be named different like "my_pipe_node_1", "my_pipe_node_2", etc. + - Node names MUST be different from the resource names in the project. + - No indentation is allowed for property names: DESCRIPTION, NODE, SQL, TYPE, etc. + - Allowed TYPE values are: endpoint, copy, materialized, sink. + - Add always the output node in the TYPE section or in the last node of the pipe. + + + + + - The SQL query must be a valid ClickHouse SQL query that mixes ClickHouse syntax and Tinybird templating syntax (Tornado templating language under the hood). + - SQL queries with parameters must start with "%" character and a newline on top of every query to be able to use the parameters. Examples: + + SELECT * FROM events WHERE session_id={{String(my_param, "default_value")}} + + + % + SELECT * FROM events WHERE session_id={{String(my_param, "default_value")}} + + - The Parameter functions like this one {{String(my_param_name,default_value)}} can be one of the following: String, DateTime, Date, Float32, Float64, Int, Integer, UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 + - Parameter names must be different from column names. Pass always the param name and a default value to the function. + - Use ALWAYS hardcoded values for default values for parameters. + - Code inside the template {{template_expression}} follows the rules of Tornado templating language so no module is allowed to be imported. So for example you can't use now() as default value for a DateTime parameter. You need an if else block like this: + + AND timestamp BETWEEN {DateTime(start_date, now() - interval 30 day)} AND {DateTime(end_date, now())} + + + {%if not defined(start_date)%} + timestamp BETWEEN now() - interval 30 day + {%else%} + timestamp BETWEEN {{DateTime(start_date)}} + {%end%} + {%if not defined(end_date)%} + AND now() + {%else%} + AND {{DateTime(end_date)}} + {%end%} + + - Parameters must not be quoted. + - When you use defined function with a paremeter inside, do NOT add quotes around the parameter: + {% if defined('my_param') %} + {% if defined(my_param) %} + - Use datasource names as table names when doing SELECT statements. + - Do not use pipe names as table names. + - The available datasource names to use in the SQL are the ones present in the existing_resources section or the ones you will create. + - Use node names as table names only when nodes are present in the same file. + - Do not reference the current node name in the SQL. + - SQL queries only accept SELECT statements with conditions, aggregations, joins, etc. + - Do NOT use CREATE TABLE, INSERT INTO, CREATE DATABASE, etc. + - Use ONLY SELECT statements in the SQL section. + - INSERT INTO is not supported in SQL section. + - ClickHouse functions supported are: + - General functions supported are: ['BLAKE3', 'CAST', 'CHARACTER_LENGTH', 'CHAR_LENGTH', 'CRC32', 'CRC32IEEE', 'CRC64', 'DATABASE', 'DATE', 'DATE_DIFF', 'DATE_FORMAT', 'DATE_TRUNC', 'DAY', 'DAYOFMONTH', 'DAYOFWEEK', 'DAYOFYEAR', 'FORMAT_BYTES', 'FQDN', 'FROM_BASE64', 'FROM_DAYS', 'FROM_UNIXTIME', 'HOUR', 'INET6_ATON', 'INET6_NTOA', 'INET_ATON', 'INET_NTOA', 'IPv4CIDRToRange', 'IPv4NumToString', 'IPv4NumToStringClassC', 'IPv4StringToNum', 'IPv4StringToNumOrDefault', 'IPv4StringToNumOrNull', 'IPv4ToIPv6', 'IPv6CIDRToRange', 'IPv6NumToString', 'IPv6StringToNum', 'IPv6StringToNumOrDefault', 'IPv6StringToNumOrNull', 'JSONArrayLength', 'JSONExtract', 'JSONExtractArrayRaw', 'JSONExtractBool', 'JSONExtractFloat', 'JSONExtractInt', 'JSONExtractKeys', 'JSONExtractKeysAndValues', 'JSONExtractKeysAndValuesRaw', 'JSONExtractRaw', 'JSONExtractString', 'JSONExtractUInt', 'JSONHas', 'JSONKey', 'JSONLength', 'JSONRemoveDynamoDBAnnotations', 'JSONType', 'JSON_ARRAY_LENGTH', 'JSON_EXISTS', 'JSON_QUERY', 'JSON_VALUE', 'L1Distance', 'L1Norm', 'L1Normalize', 'L2Distance', 'L2Norm', 'L2Normalize', 'L2SquaredDistance', 'L2SquaredNorm', 'LAST_DAY', 'LinfDistance', 'LinfNorm', 'LinfNormalize', 'LpDistance', 'LpNorm', 'LpNormalize', 'MACNumToString', 'MACStringToNum', 'MACStringToOUI', 'MAP_FROM_ARRAYS', 'MD4', 'MD5', 'MILLISECOND', 'MINUTE', 'MONTH', 'OCTET_LENGTH', 'QUARTER', 'REGEXP_EXTRACT', 'REGEXP_MATCHES', 'REGEXP_REPLACE', 'SCHEMA', 'SECOND', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'SHA512_256', 'SUBSTRING_INDEX', 'SVG', 'TIMESTAMP_DIFF', 'TO_BASE64', 'TO_DAYS', 'TO_UNIXTIME', 'ULIDStringToDateTime', 'URLHash', 'URLHierarchy', 'URLPathHierarchy', 'UTCTimestamp', 'UTC_timestamp', 'UUIDNumToString', 'UUIDStringToNum', 'UUIDToNum', 'UUIDv7ToDateTime', 'YEAR', 'YYYYMMDDToDate', 'YYYYMMDDToDate32', 'YYYYMMDDhhmmssToDateTime', 'YYYYMMDDhhmmssToDateTime64'] + - Character insensitive functions supported are: ['cast', 'character_length', 'char_length', 'crc32', 'crc32ieee', 'crc64', 'database', 'date', 'date_format', 'date_trunc', 'day', 'dayofmonth', 'dayofweek', 'dayofyear', 'format_bytes', 'fqdn', 'from_base64', 'from_days', 'from_unixtime', 'hour', 'inet6_aton', 'inet6_ntoa', 'inet_aton', 'inet_ntoa', 'json_array_length', 'last_day', 'millisecond', 'minute', 'month', 'octet_length', 'quarter', 'regexp_extract', 'regexp_matches', 'regexp_replace', 'schema', 'second', 'substring_index', 'to_base64', 'to_days', 'to_unixtime', 'utctimestamp', 'utc_timestamp', 'year'] + - Aggregate functions supported are: ['BIT_AND', 'BIT_OR', 'BIT_XOR', 'COVAR_POP', 'COVAR_SAMP', 'STD', 'STDDEV_POP', 'STDDEV_SAMP', 'VAR_POP', 'VAR_SAMP', 'aggThrow', 'analysisOfVariance', 'anova', 'any', 'anyHeavy', 'anyLast', 'anyLast_respect_nulls', 'any_respect_nulls', 'any_value', 'any_value_respect_nulls', 'approx_top_count', 'approx_top_k', 'approx_top_sum', 'argMax', 'argMin', 'array_agg', 'array_concat_agg', 'avg', 'avgWeighted', 'boundingRatio', 'categoricalInformationValue', 'contingency', 'corr', 'corrMatrix', 'corrStable', 'count', 'covarPop', 'covarPopMatrix', 'covarPopStable', 'covarSamp', 'covarSampMatrix', 'covarSampStable', 'cramersV', 'cramersVBiasCorrected', 'deltaSum', 'deltaSumTimestamp', 'dense_rank', 'entropy', 'exponentialMovingAverage', 'exponentialTimeDecayedAvg', 'exponentialTimeDecayedCount', 'exponentialTimeDecayedMax', 'exponentialTimeDecayedSum', 'first_value', 'first_value_respect_nulls', 'flameGraph', 'groupArray', 'groupArrayInsertAt', 'groupArrayIntersect', 'groupArrayLast', 'groupArrayMovingAvg', 'groupArrayMovingSum', 'groupArraySample', 'groupArraySorted', 'groupBitAnd', 'groupBitOr', 'groupBitXor', 'groupBitmap', 'groupBitmapAnd', 'groupBitmapOr', 'groupBitmapXor', 'groupUniqArray', 'histogram', 'intervalLengthSum', 'kolmogorovSmirnovTest', 'kurtPop', 'kurtSamp', 'lagInFrame', 'largestTriangleThreeBuckets', 'last_value', 'last_value_respect_nulls', 'leadInFrame', 'lttb', 'mannWhitneyUTest', 'max', 'maxIntersections', 'maxIntersectionsPosition', 'maxMappedArrays', 'meanZTest', 'median', 'medianBFloat16', 'medianBFloat16Weighted', 'medianDD', 'medianDeterministic', 'medianExact', 'medianExactHigh', 'medianExactLow', 'medianExactWeighted', 'medianGK', 'medianInterpolatedWeighted', 'medianTDigest', 'medianTDigestWeighted', 'medianTiming', 'medianTimingWeighted', 'min', 'minMappedArrays', 'nonNegativeDerivative', 'nothing', 'nothingNull', 'nothingUInt64', 'nth_value', 'ntile', 'quantile', 'quantileBFloat16', 'quantileBFloat16Weighted', 'quantileDD', 'quantileDeterministic', 'quantileExact', 'quantileExactExclusive', 'quantileExactHigh', 'quantileExactInclusive', 'quantileExactLow', 'quantileExactWeighted', 'quantileGK', 'quantileInterpolatedWeighted', 'quantileTDigest', 'quantileTDigestWeighted', 'quantileTiming', 'quantileTimingWeighted', 'quantiles', 'quantilesBFloat16', 'quantilesBFloat16Weighted', 'quantilesDD', 'quantilesDeterministic', 'quantilesExact', 'quantilesExactExclusive', 'quantilesExactHigh', 'quantilesExactInclusive', 'quantilesExactLow', 'quantilesExactWeighted', 'quantilesGK', 'quantilesInterpolatedWeighted', 'quantilesTDigest', 'quantilesTDigestWeighted', 'quantilesTiming', 'quantilesTimingWeighted', 'rank', 'rankCorr', 'retention', 'row_number', 'sequenceCount', 'sequenceMatch', 'sequenceNextNode', 'simpleLinearRegression', 'singleValueOrNull', 'skewPop', 'skewSamp', 'sparkBar', 'sparkbar', 'stddevPop', 'stddevPopStable', 'stddevSamp', 'stddevSampStable', 'stochasticLinearRegression', 'stochasticLogisticRegression', 'studentTTest', 'sum', 'sumCount', 'sumKahan', 'sumMapFiltered', 'sumMapFilteredWithOverflow', 'sumMapWithOverflow', 'sumMappedArrays', 'sumWithOverflow', 'theilsU', 'topK', 'topKWeighted', 'uniq', 'uniqCombined', 'uniqCombined64', 'uniqExact', 'uniqHLL12', 'uniqTheta', 'uniqUpTo', 'varPop', 'varPopStable', 'varSamp', 'varSampStable', 'welchTTest', 'windowFunnel'] + - How to use ClickHouse supported functions: + - When using functions try always ClickHouse functions first, then SQL functions. + - Do not use any ClickHouse function that is not present in the list of general functions, character insensitive functions and aggregate functions. + - If the function is not present in the list, the sql query will fail, so avoid at all costs to use any function that is not present in the list. + - When aliasing a column, use first the column name and then the alias. + - General functions and aggregate functions are case sensitive. + - Character insensitive functions are case insensitive. + - Parameters are never quoted in any case. + - Use the following syntax in the SQL section for the iceberg table function: iceberg('s3://bucket/path/to/table', {{tb_secret('aws_access_key_id')}}, {{tb_secret('aws_secret_access_key')}}) + - Use the following syntax in the SQL section for the postgres table function: postgresql('host:port', 'database', 'table', {{tb_secret('db_username')}}, {{tb_secret('db_password')}}), 'schema') + + + + +DESCRIPTION > + Some meaningful description of the datasource + +SCHEMA > + `column_name_1` clickhouse_tinybird_compatible_data_type `json:$.column_name_1`, + `column_name_2` clickhouse_tinybird_compatible_data_type `json:$.column_name_2`, + ... + `column_name_n` clickhouse_tinybird_compatible_data_type `json:$.column_name_n` + +ENGINE "MergeTree" +ENGINE_PARTITION_KEY "partition_key" +ENGINE_SORTING_KEY "sorting_key_1, sorting_key_2, ..." + + + + +DESCRIPTION > + Some meaningful description of the pipe + +NODE node_1 +SQL > + [sql query using clickhouse syntax and tinybird templating syntax and starting always with SELECT or % +SELECT] +TYPE endpoint + + + + + +- Do not create copy pipes by default, unless the user asks for it. +- Copy pipes should be created in the /copies folder. +- In a .pipe file you can define how to export the result of a Pipe to a Data Source, optionally with a schedule. +- Do not include COPY_SCHEDULE in the .pipe file unless is specifically requested by the user. +- COPY_SCHEDULE is a cron expression that defines the schedule of the copy pipe. +- COPY_SCHEDULE is optional and if not provided, the copy pipe will be executed only once. +- TARGET_DATASOURCE is the name of the Data Source to export the result to. +- TYPE COPY is the type of the pipe and it is mandatory for copy pipes. +- If the copy pipe uses parameters, you must include the % character and a newline on top of every query to be able to use the parameters. +- The content of the .pipe file must follow this format: +DESCRIPTION Copy Pipe to export sales hour every hour to the sales_hour_copy Data Source + +NODE daily_sales +SQL > + % + SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales + FROM teams + WHERE + day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now()) + and country = {{ String(country, 'US')}} + GROUP BY day, country + +TYPE COPY +TARGET_DATASOURCE sales_hour_copy +COPY_SCHEDULE 0 * * * * + + + + +- Do not create materialized pipes by default, unless the user asks for it. +- Materialized pipes should be created in the /materializations folder. +- In a .pipe file you can define how to materialize each row ingested in the earliest Data Source in the Pipe query to a materialized Data Source. Materialization happens at ingest. +- DATASOURCE: Required when TYPE is MATERIALIZED. Sets the target Data Source for materialized nodes. +- TYPE MATERIALIZED is the type of the pipe and it is mandatory for materialized pipes. +- The content of the .pipe file must follow the materialized_pipe_content format. +- Use State modifier for the aggregated columns in the pipe. + + +NODE daily_sales +SQL > + SELECT toStartOfDay(starting_date) day, country, sumState(sales) as total_sales + FROM teams + GROUP BY day, country + +TYPE MATERIALIZED +DATASOURCE sales_by_hour + + +- The target datasource of a materialized pipe must have an AggregatingMergeTree engine. +- Use AggregateFunction for the aggregated columns in the pipe. +- Pipes using a materialized data source must use the Merge modifier in the SQL query for the aggregated columns. Example: sumMerge(total_sales) +- Put all dimensions in the ENGINE_SORTING_KEY, sorted from least to most cardinality. + + +SCHEMA > + `total_sales` AggregateFunction(sum, Float64), + `sales_count` AggregateFunction(count, UInt64), + `column_name_2` AggregateFunction(avg, Float64), + `dimension_1` String, + `dimension_2` String, + ... + `date` DateTime + +ENGINE "AggregatingMergeTree" +ENGINE_PARTITION_KEY "toYYYYMM(date)" +ENGINE_SORTING_KEY "date, dimension_1, dimension_2, ..." + + + + +- Do not create sink pipes by default, unless the user asks for it. +- Sink pipes should be created in the /sinks folder. +- In a .pipe file you can define how to export the result of a Pipe to an external system, optionally with a schedule. +- Valid external systems are Kafka, S3, GCS. +- Sink pipes depend on a connection, if no connection is provided, search for an existing connection that suits the request. If none, create a new connection. +- Do not include EXPORT_SCHEDULE in the .pipe file unless is specifically requested by the user. +- EXPORT_SCHEDULE is a cron expression that defines the schedule of the sink pipe. +- EXPORT_SCHEDULE is optional and if not provided, the sink pipe will be executed only once. +- EXPORT_CONNECTION_NAME is the name of the connection used to export. +- TYPE SINK is the type of the pipe and it is mandatory for sink pipes. +- If the sink pipe uses parameters, you must include the % character and a newline on top of every query to be able to use the parameters. +- The content of the .pipe file must follow this format: +DESCRIPTION Sink Pipe to export sales hour every hour using my_connection + +NODE daily_sales +SQL > + % + SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales + FROM teams + WHERE + day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now()) + and country = {{ String(country, 'US')}} + GROUP BY day, country + +TYPE sink +EXPORT_CONNECTION_NAME "my_connection" +EXPORT_BUCKET_URI "s3://tinybird-sinks" +EXPORT_FILE_TEMPLATE "daily_prices" +EXPORT_SCHEDULE "*/5 * * * *" +EXPORT_FORMAT "csv" +EXPORT_COMPRESSION "gz" +EXPORT_STRATEGY "truncate" + + + + + - Content cannot be empty. + - The connection names must be unique. + - No indentation is allowed for property names + - We support kafka, gcs and s3 connections for now + + + + +TYPE kafka +KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("PRODUCTION_KAFKA_SERVERS", "localhost:9092") }} +KAFKA_SECURITY_PROTOCOL SASL_SSL +KAFKA_SASL_MECHANISM PLAIN +KAFKA_KEY {{ tb_secret("PRODUCTION_KAFKA_USERNAME", "") }} +KAFKA_SECRET {{ tb_secret("PRODUCTION_KAFKA_PASSWORD", "") }} + + + + +TYPE gcs +GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("PRODUCTION_GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON", "") }} + + + + +TYPE gcs +GCS_HMAC_ACCESS_ID {{ tb_secret("gcs_hmac_access_id") }} +GCS_HMAC_SECRET {{ tb_secret("gcs_hmac_secret") }} + + + + +TYPE s3 +S3_REGION {{ tb_secret("PRODUCTION_S3_REGION", "") }} +S3_ARN {{ tb_secret("PRODUCTION_S3_ARN", "") }} + + + +## .test file instructions +Follow these instructions when creating or updating .yaml files for tests: + +- The test file name must match the name of the pipe it is testing. +- Every scenario name must be unique inside the test file. +- When looking for the parameters available, you will find them in the pipes in the following format: {{{{String(my_param_name, default_value)}}}}. +- If there are no parameters, you can omit parameters and generate a single test. +- The format of the parameters is the following: param1=value1¶m2=value2¶m3=value3 +- If some parameters are provided by the user and you need to use them, preserve in the same format as they were provided, like case sensitive +- Test as many scenarios as possible. +- The format of the test file is the following: + +- name: kpis_single_day + description: Test hourly granularity for a single day + parameters: date_from=2024-01-01&date_to=2024-01-01 + expected_result: | + {"date":"2024-01-01 00:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + {"date":"2024-01-01 01:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + +- name: kpis_date_range + description: Test daily granularity for a date range + parameters: date_from=2024-01-01&date_to=2024-01-31 + expected_result: | + {"date":"2024-01-01","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + {"date":"2024-01-02","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + +- name: kpis_default_range + description: Test default behavior without date parameters (last 7 days) + parameters: '' + expected_result: | + {"date":"2025-01-10","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + {"date":"2025-01-11","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + +- name: kpis_fixed_time + description: Test with fixed timestamp for consistent testing + parameters: fixed_time=2024-01-15T12:00:00 + expected_result: '' + +- name: kpis_single_day + description: Test single day with hourly granularity + parameters: date_from=2024-01-01&date_to=2024-01-01 + expected_result: | + {"date":"2024-01-01 00:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + {"date":"2024-01-01 01:00:00","visits":0,"pageviews":0,"bounce_rate":null,"avg_session_sec":0} + + + + +## Deployment instructions +Follow these instructions when evolving a datasource schema: + +- When you make schema changes that are incompatible with the old schema, you must use a forward query in your data source. Forward queries are necessary when introducing breaking changes. Otherwise, your deployment will fail due to a schema mismatch. +- Forward queries translate the old schema to a new one that you define in the .datasource file. This helps you evolve your schema while continuing to ingest data. +Follow these steps to evolve your schema using a forward query: +- Edit the .datasource file to add a forward query. +- Run tb deploy --check to validate the deployment before creating it. +- Deploy and promote your changes in Tinybird Cloud using {base_command} --cloud deploy. + +SCHEMA > + `timestamp` DateTime `json:$.timestamp`, + `session_id` UUID `json:$.session_id`, + `action` String `json:$.action`, + `version` String `json:$.version`, + `payload` String `json:$.payload` + +FORWARD_QUERY > + select timestamp, toUUID(session_id) as session_id, action, version, payload + + + diff --git a/tests/integration/project/datasources/simple.datasource b/tests/integration/project/datasources/simple.datasource new file mode 100644 index 0000000..218227b --- /dev/null +++ b/tests/integration/project/datasources/simple.datasource @@ -0,0 +1,10 @@ +DESCRIPTION > + Simple Key-Value Data Source + +SCHEMA > + id UUID `json:$.Id`, + timestamp DateTime64(6) `json:$.Timestamp`, + key String `json:$.Key`, + value String `json:$.Value` + +ENGINE "MergeTree" diff --git a/tests/integration/project/endpoints/simple_kv.pipe b/tests/integration/project/endpoints/simple_kv.pipe new file mode 100644 index 0000000..c95e8e8 --- /dev/null +++ b/tests/integration/project/endpoints/simple_kv.pipe @@ -0,0 +1,14 @@ +VERSION 0 + +DESCRIPTION > + Endpoint to select unique key/value pairs from simple + +NODE endpoint +SQL > + % + SELECT key, value + FROM simple + ORDER BY key, timestamp desc + LIMIT 1 by key + +TYPE ENDPOINT diff --git a/tests/integration/test_client.py b/tests/integration/test_client.py new file mode 100644 index 0000000..5de8c04 --- /dev/null +++ b/tests/integration/test_client.py @@ -0,0 +1,3 @@ +def test_client_has_token(client): + """Makes sure the client fixture loaded the admin token correctly""" + assert client.token.startswith("p.e") diff --git a/tests/integration/test_datasource.py b/tests/integration/test_datasource.py new file mode 100644 index 0000000..508f9e9 --- /dev/null +++ b/tests/integration/test_datasource.py @@ -0,0 +1,60 @@ +import logging + + +LOG = logging.getLogger(__name__) + + +class TestDatasource: + def test_append_ndjson_query_truncate(self, client): + ds = client.datasource("simple") + ds.truncate() + + ds.append_ndjson( + [ + { + "Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0", + "Timestamp": "2024-01-23T10:30:00.123456", + "Key": "foo", + "Value": "bar", + }, + { + "Id": "d7792957-21d8-46e6-a4e0-188eb36e2758", + "Timestamp": "2024-02-23T11:45:00.234567", + "Key": "baz", + "Value": "ed", + }, + ] + ) + + query = client.sql("SELECT * FROM simple") + response = query.json() + assert response.data == [ + { + "id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0", + "timestamp": "2024-01-23 10:30:00.123456", + "key": "foo", + "value": "bar", + }, + { + "id": "d7792957-21d8-46e6-a4e0-188eb36e2758", + "timestamp": "2024-02-23 11:45:00.234567", + "key": "baz", + "value": "ed", + }, + ] + + query = client.sql("SELECT count(*) as cnt FROM simple") + response = query.json() + assert response.data == [{"cnt": 2}] + + # remove all records from the table + ds.truncate() + + # check that the table is empty + query = client.sql("SELECT count(*) as cnt FROM simple") + response = query.json() + assert response.data == [{"cnt": 0}] + + query = client.sql("SELECT * FROM simple") + response = query.json() + assert response.data == [] diff --git a/tests/integration/test_pipe.py b/tests/integration/test_pipe.py new file mode 100644 index 0000000..16b7592 --- /dev/null +++ b/tests/integration/test_pipe.py @@ -0,0 +1,35 @@ +class TestPipe: + def test_pipe_query(self, client): + ds = client.datasource("simple") + ds.truncate() + + ds.append_ndjson( + [ + { + "Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0", + "Timestamp": "2024-01-23T10:30:00.123456", + "Key": "foo", + "Value": "bar", + }, + { + "Id": "d7792957-21d8-46e6-a4e0-188eb36e2758", + "Timestamp": "2024-02-23T11:45:00.234567", + "Key": "baz", + "Value": "ed", + }, + { + "Id": "fc71d4d5-7e0c-492a-9e3f-8f1cde9bcfaf", + "Timestamp": "2024-03-23T11:45:00.234567", + "Key": "foo", + "Value": "bar2", + }, + ] + ) + + pipe = client.pipe("simple_kv") + + response = pipe.query() + assert response.data == [ + {"key": "baz", "value": "ed"}, + {"key": "foo", "value": "bar2"}, + ] diff --git a/verdin/datasource.py b/verdin/datasource.py index 2342c38..6fc6b6d 100644 --- a/verdin/datasource.py +++ b/verdin/datasource.py @@ -103,6 +103,21 @@ def append_ndjson(self, records: List[Dict]) -> requests.Response: ) return requests.post(url=self.api, params=query, headers=headers, data=data) + def truncate(self): + """ + Truncate the datasource which removes all records in the table. + """ + headers = {} + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + + url = f"{self.api}/{self.canonical_name}/truncate" + LOG.debug( + "truncating table %s", + self.canonical_name, + ) + requests.post(url=url, headers=headers) + @staticmethod def to_csv(records: List[List[Any]], **kwargs): return to_csv(records, **kwargs) diff --git a/verdin/test/__init__.py b/verdin/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/verdin/test/cli.py b/verdin/test/cli.py new file mode 100644 index 0000000..b0defe9 --- /dev/null +++ b/verdin/test/cli.py @@ -0,0 +1,153 @@ +"""Wrapper around the Tinybird CLI to make available the main commands programmatically.""" + +import dataclasses +import logging +import os +import re +import subprocess + +LOG = logging.getLogger(__name__) + + +@dataclasses.dataclass +class Token: + id: str + name: str + token: str + + +class CliError(Exception): + def __init__(self, output: str, orig: subprocess.SubprocessError) -> None: + super().__init__(output) + self.orig = orig + + +class TinybirdCli: + """Interface around the Tinybird CLI""" + + def __init__(self, host: str = None, token: str = None, cwd: str = None, local: bool = False): + self.host = host + self.token = token + self.cwd = cwd + self.local = local + + def _env(self) -> dict: + """ + Returns a dictionary of environment variables to be used when calling tb CLI commands. + """ + _env = dict(os.environ) + + if self.host: + _env["TB_HOST"] = self.host + if self.token: + _env["TB_TOKEN"] = self.token + + return _env + + def _get_base_args(self) -> list[str]: + args = ["tb"] + if self.local: + args.append("--local") + return args + + def token_ls(self) -> list[Token]: + """ + List all tokens. + + :return: List of Token instances + """ + args = [*self._get_base_args(), "token", "ls"] + + output = subprocess.check_output( + args, + encoding="utf-8", + cwd=self.cwd, + env=self._env(), + ) + """ + output looks like this (unfortunately --output=json doesn't work) + + ** Tokens: + -------------------------------------------------------------------------------- + id: 63678691-7e28-4f2d-8ef7-243ab19ad7cb + name: workspace admin token + token: p.eyJ1IjogIjU2ZThhYmMzLWRjNmYtNDcyYi05Yzg1LTdkZjFiZmUyNjU5YyIsICJpZCI6ICI2MzY3ODY5MS03ZTI4LTRmMmQtOGVmNy0yNDNhYjE5YWQ3Y2IiLCAiaG9zdCI6ICJsb2NhbCJ9.4gzsbiG1cnrIDUfHTxfQd0ZN57YkiOKEIyvuTlnLiaM + -------------------------------------------------------------------------------- + id: 489c8ca1-195b-4383-a388-d84068ff1b2c + name: admin local_testing@tinybird.co + token: p.eyJ1IjogIjU2ZThhYmMzLWRjNmYtNDcyYi05Yzg1LTdkZjFiZmUyNjU5YyIsICJpZCI6ICI0ODljOGNhMS0xOTViLTQzODMtYTM4OC1kODQwNjhmZjFiMmMiLCAiaG9zdCI6ICJsb2NhbCJ9.MmcBjRTCg6dX53sWsZAv6QzHRHKxwu-pEWkqx8opLHA + -------------------------------------------------------------------------------- + """ + tokens = [] + current_token = {} + + for line in output.splitlines(): + # remove color codes + line = re.sub(r"\x1b\[[0-9;]*m", "", line) + line = line.strip() + if line.startswith("id: "): + current_token = {} + current_token["id"] = line[4:] + elif line.startswith("name: "): + current_token["name"] = line[6:] + elif line.startswith("token: "): + current_token["token"] = line[7:] + tokens.append(Token(**current_token)) + + return tokens + + def local_start( + self, daemon: bool = False, skip_new_version: bool = False, volumes_path: str = None + ) -> subprocess.Popen: + """ + Run ``tb local start`` and return the subprocess. + """ + args = ["tb", "local", "start"] + if daemon: + args.append("-d") + if skip_new_version: + args.append("--skip-new-version") + if volumes_path: + args.append("--volumes-path") + args.append(volumes_path) + + return subprocess.Popen(args, cwd=self.cwd, env=self._env()) + + def local_stop(self): + """ + Run ``tb local stop``. + """ + subprocess.check_output(["tb", "local", "stop"]) + + def local_remove(self): + """ + Run ``tb local remove``. + """ + subprocess.check_output( + ["tb", "local", "remove"], + input=b"y\n", + ) + + def deploy( + self, wait: bool = False, auto: bool = False, allow_destructive_operations: bool = False + ): + args = [*self._get_base_args(), "deploy"] + + if wait: + args.append("--wait") + if auto: + args.append("--auto") + if allow_destructive_operations: + args.append("--allow-destructive-operations") + + try: + output = subprocess.check_output( + args, + encoding="utf-8", + cwd=self.cwd, + env=self._env(), + ) + except subprocess.CalledProcessError as e: + raise CliError(f"Failed to deploy project:\n{e.output}", e) from e + + return output diff --git a/verdin/test/container.py b/verdin/test/container.py new file mode 100644 index 0000000..6a645fb --- /dev/null +++ b/verdin/test/container.py @@ -0,0 +1,82 @@ +import subprocess +import time + +import requests + +from verdin.test.cli import TinybirdCli +from verdin.client import Client + + +class TinybirdLocalContainer: + def __init__(self, cwd: str = None): + """ + Creates a new TinybirdLocalContainer instance. + + :param cwd: The current working directory to use for the tinybird local container. + """ + self.cwd = cwd + self.url = "http://localhost:7181" + self.proc: None | subprocess.Popen = None + + def start(self): + """ + Start the tinybird local container in a background process. + """ + cli = TinybirdCli(cwd=self.cwd, local=True) + self.proc = cli.local_start(daemon=True, skip_new_version=True) + + def client(self) -> Client: + """ + Returns a tinybird Client that connects to this container with admin privileged. + + :return: Tinybird Client + """ + cli = TinybirdCli(host=self.url, cwd=self.cwd, local=True) + + cli_tokens = cli.token_ls() + + # i'm not really sure why this is needed, but when we use a token returned by the /tokens api, the + # client cannot find datasources created through ``tb deploy``. + token_to_use = None + for token in cli_tokens: + if token.name == "admin local_testing@tinybird.co": + token_to_use = token.token + break + + return Client( + token=token_to_use, + api=self.url, + ) + + def wait_is_up(self, timeout: int = 120): + """ + Wait for the container to appear by querying the tokens endpoint. + + :param timeout: Timeout in seconds + :raises TimeoutError: If the container does not appear within the timeout + """ + # Wait for the service to become available + start_time = time.time() + while time.time() - start_time < timeout: + try: + response = requests.get(f"{self.url}/tokens") + if response.status_code == 200: + break + except requests.RequestException: + pass + time.sleep(1) + else: + raise TimeoutError("Tinybird container failed to start within timeout") + + def stop(self): + """ + Stops and removes the tinybird local container. + """ + cli = TinybirdCli(cwd=self.cwd, local=True) + cli.local_stop() + + if self.proc: + self.proc.kill() + self.proc = None + + cli.local_remove()