diff --git a/docs/data-operate/import/streaming-job/continuous-load-mysql-multi.md b/docs/data-operate/import/streaming-job/continuous-load-mysql-database.md similarity index 82% rename from docs/data-operate/import/streaming-job/continuous-load-mysql-multi.md rename to docs/data-operate/import/streaming-job/continuous-load-mysql-database.md index 54161db48bac2..87e93d0531101 100644 --- a/docs/data-operate/import/streaming-job/continuous-load-mysql-multi.md +++ b/docs/data-operate/import/streaming-job/continuous-load-mysql-database.md @@ -1,17 +1,17 @@ --- { - "title": "MySQL Full Database Sync", - "sidebar_label": "Full Database Sync", + "title": "MySQL Database-level Sync", + "sidebar_label": "Database-level Sync", "language": "en", - "description": "Doris can continuously synchronize full and incremental data from an entire database or selected tables in MySQL into Doris using Streaming Job." + "description": "Doris can continuously sync full and incremental data of a group of MySQL tables into Doris at the database level via Streaming Job, auto-creating downstream tables on first sync." } --- ## Overview -Supports using Job to continuously synchronize full and incremental data from an entire database or selected tables in a MySQL database to Doris via Stream Load. Suitable for scenarios requiring real-time full database sync to Doris. +Database-level Sync is implemented via the native `FROM MYSQL (...) TO DATABASE (...)` DDL, **using a database as the sync unit with a Doris database as the target container**. You can sync one, several, or all tables via `include_tables`; on first sync Doris automatically creates downstream primary-key tables and keeps primary keys consistent with the upstream. Suitable for mirror replication scenarios where downstream schema should track upstream automatically and no SQL processing is needed. -By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris supports reading change logs from MySQL databases, enabling full and incremental full database sync. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream. +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris reads change logs from MySQL and continuously writes full + incremental data of a group of tables into Doris via Stream Load. If you need column mapping, filtering, or data transformation during sync, see [MySQL Table-level Sync](./continuous-load-mysql-table.md). **Notes:** @@ -99,7 +99,7 @@ For more common operations (pause, resume, delete, check Task, etc.), see [Conti ### Import Command -Syntax for creating a full database sync job: +Syntax for creating a database-level sync job: ```sql CREATE JOB diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-single.md b/docs/data-operate/import/streaming-job/continuous-load-mysql-table.md similarity index 74% rename from versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-single.md rename to docs/data-operate/import/streaming-job/continuous-load-mysql-table.md index 823a262aa1a8b..2a0058c7c25ca 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-single.md +++ b/docs/data-operate/import/streaming-job/continuous-load-mysql-table.md @@ -1,9 +1,9 @@ --- { - "title": "MySQL Single Table Sync", - "sidebar_label": "Single Table Sync", + "title": "MySQL Table-level Sync", + "sidebar_label": "Table-level Sync", "language": "en", - "description": "Doris supports continuously synchronizing full and incremental data from a single MySQL table into Doris using Job + CDC Stream TVF." + "description": "Doris can continuously sync MySQL data into a specified Doris table using Job + CDC Stream TVF at the table level, with support for column mapping and data transformation." } --- @@ -13,9 +13,9 @@ This feature is supported since version 4.1.0. ## Overview -Doris supports continuously synchronizing full and incremental data from a single MySQL table into a specified Doris table using Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md). This is suitable for real-time synchronization scenarios that require flexible column mapping and data transformation on a single table. +Table-level Sync is implemented via Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md), targeting an existing Doris table (`INSERT INTO tbl SELECT * FROM cdc_stream(...)`). It leverages Doris SQL to support column mapping, filtering and data transformation, with exactly-once semantics. Suitable for real-time sync scenarios that require data processing. -By integrating [Flink CDC](https://github.com/apache/flink-cdc) reading capabilities, Doris supports reading change logs (Binlog) from MySQL databases, enabling full and incremental data synchronization for a single table. +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris reads change logs (Binlog) from MySQL, enabling full + incremental sync from the source table to the target table. If you prefer Doris to auto-create downstream tables or sync a group of tables at the database granularity, see [MySQL Database-level Sync](./continuous-load-mysql-database.md). **Notes:** diff --git a/docs/data-operate/import/streaming-job/continuous-load-overview.md b/docs/data-operate/import/streaming-job/continuous-load-overview.md index f1f610a78b74b..533264cb71c33 100644 --- a/docs/data-operate/import/streaming-job/continuous-load-overview.md +++ b/docs/data-operate/import/streaming-job/continuous-load-overview.md @@ -13,16 +13,53 @@ Doris supports continuously loading data from multiple data sources into Doris t Continuous Load supports the following data sources and import modes: -| Data Source | Supported Versions | Single Table Sync | Full Database Sync | Setup Guide | +| Data Source | Supported Versions | Table-level Sync | Database-level Sync | Setup Guide | |:------|:--------|:--------|:--------|:--------| -| MySQL | 5.6, 5.7, 8.0.x | [MySQL Single Table Sync](./continuous-load-mysql-single.md) | [MySQL Full Database Sync](./continuous-load-mysql-multi.md) | [Amazon RDS MySQL](./prerequisites/amazon-rds-mysql.md) · [Amazon Aurora MySQL](./prerequisites/amazon-aurora-mysql.md) | -| PostgreSQL | 14, 15, 16, 17 | [PostgreSQL Single Table Sync](./continuous-load-postgresql-single.md) | [PostgreSQL Full Database Sync](./continuous-load-postgresql-multi.md) | [Amazon RDS PostgreSQL](./prerequisites/amazon-rds-postgresql.md) · [Amazon Aurora PostgreSQL](./prerequisites/amazon-aurora-postgresql.md) | +| MySQL | 5.6, 5.7, 8.0.x | [MySQL Table-level Sync](./continuous-load-mysql-table.md) | [MySQL Database-level Sync](./continuous-load-mysql-database.md) | [Amazon RDS MySQL](./prerequisites/amazon-rds-mysql.md) · [Amazon Aurora MySQL](./prerequisites/amazon-aurora-mysql.md) | +| PostgreSQL | 14, 15, 16, 17 | [PostgreSQL Table-level Sync](./continuous-load-postgresql-table.md) | [PostgreSQL Database-level Sync](./continuous-load-postgresql-database.md) | [Amazon RDS PostgreSQL](./prerequisites/amazon-rds-postgresql.md) · [Amazon Aurora PostgreSQL](./prerequisites/amazon-aurora-postgresql.md) | | S3 | - | [S3 Continuous Load](./continuous-load-s3.md) | - | - | -:::tip -- **Single Table Sync**: Uses CDC Stream TVF or S3 TVF to continuously load data into a specific Doris table, supporting flexible column mapping and data transformation. -- **Full Database Sync**: Uses native multi-table CDC capability to continuously sync an entire database or selected tables from the source to Doris, automatically creating downstream tables on first sync. -::: +## How to Choose + +Table-level Sync and Database-level Sync are **two fundamentally different mechanisms**, not a distinction by "number of tables". **Database-level Sync can also sync just one table via `include_tables`**, so the choice should be driven by capability requirements: + +| Capability | Table-level Sync | Database-level Sync | +|:--------|:--------|:--------| +| Underlying mechanism | Job + TVF (`INSERT INTO tbl SELECT * FROM tvf()`) | Job + native database DDL (`FROM src TO DATABASE db`) | +| Target granularity | One existing Doris table | A Doris database container | +| Sync scope | A single table | One to many to all tables (controlled by `include_tables`) | +| Auto-create tables | ❌ Requires pre-creation | ✅ Automatically creates primary-key tables on first sync | +| SQL expressiveness | ✅ Column mapping, filtering, transformation (via SELECT) | ❌ Direct replication, no ETL | +| Delivery semantics | exactly-once | at-least-once | +| Required privileges | Load | Load + Create (when auto-creating) | +| Typical scenarios | Real-time sync needing column pruning, renaming, type conversion, or conditional filtering | Mirror replication of a database or group of tables, where downstream schema should track upstream automatically | + +- **Need SQL transformations or strict exactly-once semantics** → Choose **Table-level Sync** +- **Want Doris to auto-create tables and sync a group of tables with one config** → Choose **Database-level Sync** +- **Source is S3 object storage** → Only Table-level Sync is supported (via S3 TVF) + +## Job Lifecycle + +A Streaming Job transitions between the following states during its lifecycle. Both Table-level Sync and Database-level Sync follow the same state machine: + +```mermaid +stateDiagram-v2 + [*] --> PENDING: create job + PENDING --> RUNNING: createStreamingTask() + RUNNING --> FINISHED: source consumed + RUNNING --> PAUSED: task failed (with failReason) + PAUSED --> PENDING: autoResume after exponential backoff + FINISHED --> [*] +``` + +| State | Description | +|:----|:----| +| **PENDING** | The job has been created but no `StreamingTask` has been dispatched yet; awaiting the next scheduling round | +| **RUNNING** | A child task has been dispatched and is running, reading incremental data from the source and writing into Doris | +| **FINISHED** | The source has been fully consumed and the job has terminated. S3 TVF jobs enter this state once all files have been imported | +| **PAUSED** | A child task failed; the job is automatically paused and a `failReason` is recorded. Check the `ErrorMsg` column in `select * from jobs(...)` for details | + +**Auto-resume:** After entering `PAUSED`, the scheduler periodically retries with an exponential backoff strategy and transitions the job back to `PENDING` to dispatch a new task. **Transient failures (network jitter, brief upstream unavailability, etc.) are absorbed automatically without manual intervention.** To resume immediately after diagnosing a failure, use [`RESUME JOB`](#resume-import-job); to stop scheduling entirely, use [`PAUSE JOB`](#pause-import-job) (manually paused jobs are NOT woken up by auto-resume) or [`DROP JOB`](#delete-import-job). ## Common Operations diff --git a/docs/data-operate/import/streaming-job/continuous-load-postgresql-multi.md b/docs/data-operate/import/streaming-job/continuous-load-postgresql-database.md similarity index 79% rename from docs/data-operate/import/streaming-job/continuous-load-postgresql-multi.md rename to docs/data-operate/import/streaming-job/continuous-load-postgresql-database.md index 6b22f51145586..7d3fa361e99f9 100644 --- a/docs/data-operate/import/streaming-job/continuous-load-postgresql-multi.md +++ b/docs/data-operate/import/streaming-job/continuous-load-postgresql-database.md @@ -1,17 +1,17 @@ --- { - "title": "PostgreSQL Full Database Sync", - "sidebar_label": "Full Database Sync", + "title": "PostgreSQL Database-level Sync", + "sidebar_label": "Database-level Sync", "language": "en", - "description": "Doris can continuously synchronize full and incremental data from an entire database or selected tables in PostgreSQL into Doris using Streaming Job." + "description": "Doris can continuously sync full and incremental data of a group of PostgreSQL tables into Doris at the database level via Streaming Job, auto-creating downstream tables on first sync." } --- ## Overview -Supports using Job to continuously synchronize full and incremental data from an entire database or selected tables in a PostgreSQL database to Doris via Stream Load. Suitable for scenarios requiring real-time full database sync to Doris. +Database-level Sync is implemented via the native `FROM POSTGRES (...) TO DATABASE (...)` DDL, **using a database as the sync unit with a Doris database as the target container**. You can sync one, several, or all tables via `include_tables`; on first sync Doris automatically creates downstream primary-key tables and keeps primary keys consistent with the upstream. Suitable for mirror replication scenarios where downstream schema should track upstream automatically and no SQL processing is needed. -By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris supports reading change logs from PostgreSQL databases, enabling full and incremental full database sync. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream. +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris reads change logs from PostgreSQL and continuously writes full + incremental data of a group of tables into Doris via Stream Load. If you need column mapping, filtering, or data transformation during sync, see [PostgreSQL Table-level Sync](./continuous-load-postgresql-table.md). **Notes:** @@ -73,7 +73,7 @@ For more common operations (pause, resume, delete, check Task, etc.), see [Conti ### Import Command -Syntax for creating a full database sync job: +Syntax for creating a database-level sync job: ```sql CREATE JOB diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-single.md b/docs/data-operate/import/streaming-job/continuous-load-postgresql-table.md similarity index 75% rename from versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-single.md rename to docs/data-operate/import/streaming-job/continuous-load-postgresql-table.md index fd30ec8ff59cb..9335a4c4dd16f 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-single.md +++ b/docs/data-operate/import/streaming-job/continuous-load-postgresql-table.md @@ -1,9 +1,9 @@ --- { - "title": "PostgreSQL Single Table Sync", - "sidebar_label": "Single Table Sync", + "title": "PostgreSQL Table-level Sync", + "sidebar_label": "Table-level Sync", "language": "en", - "description": "Doris supports continuously synchronizing full and incremental data from a single PostgreSQL table into Doris using Job + CDC Stream TVF." + "description": "Doris can continuously sync PostgreSQL data into a specified Doris table using Job + CDC Stream TVF at the table level, with support for column mapping and data transformation." } --- @@ -13,9 +13,9 @@ This feature is supported since version 4.1.0. ## Overview -Doris supports continuously synchronizing full and incremental data from a single PostgreSQL table into a specified Doris table using Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md). This is suitable for real-time synchronization scenarios that require flexible column mapping and data transformation on a single table. +Table-level Sync is implemented via Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md), targeting an existing Doris table (`INSERT INTO tbl SELECT * FROM cdc_stream(...)`). It leverages Doris SQL to support column mapping, filtering and data transformation, with exactly-once semantics. Suitable for real-time sync scenarios that require data processing. -By integrating [Flink CDC](https://github.com/apache/flink-cdc) reading capabilities, Doris supports reading change logs (WAL) from PostgreSQL databases, enabling full and incremental data synchronization for a single table. +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris reads change logs (WAL) from PostgreSQL, enabling full + incremental sync from the source table to the target table. If you prefer Doris to auto-create downstream tables or sync a group of tables at the database granularity, see [PostgreSQL Database-level Sync](./continuous-load-postgresql-database.md). **Notes:** diff --git a/docs/ecosystem/flink-doris-connector/flink-doris-connector.md b/docs/ecosystem/flink-doris-connector/flink-doris-connector.md index c448329988463..2a01fb29833b5 100644 --- a/docs/ecosystem/flink-doris-connector/flink-doris-connector.md +++ b/docs/ecosystem/flink-doris-connector/flink-doris-connector.md @@ -1148,4 +1148,13 @@ In the whole database synchronization tool provided by the Connector, no additio 7. **stream load error: HTTP/1.1 307 Temporary Redirect** - Flink will first request FE, and after receiving 307, it will request BE after redirection. When FE is in FullGC/high pressure/network delay, HttpClient will send data without waiting for a response within a certain period of time (3 seconds) by default. Since the request body is InputStream by default, when a 307 response is received, the data cannot be replayed and an error will be reported directly. There are three ways to solve this problem: 1. Upgrade to Connector25.1.0 or above to increase the default time; 2. Modify auto-redirect=false to directly initiate a request to BE (not applicable to some cloud scenarios); 3. The unique key model can enable batch mode. \ No newline at end of file + Flink will first request FE, and after receiving 307, it will request BE after redirection. When FE is in FullGC/high pressure/network delay, HttpClient will send data without waiting for a response within a certain period of time (3 seconds) by default. Since the request body is InputStream by default, when a 307 response is received, the data cannot be replayed and an error will be reported directly. There are three ways to solve this problem: 1. Upgrade to Connector25.1.0 or above to increase the default time; 2. Modify auto-redirect=false to directly initiate a request to BE (not applicable to some cloud scenarios); 3. The unique key model can enable batch mode. + +8. **When using Flink CDC to sync large tables from databases such as Oracle, an `I/O exception (java.net.SocketException) ... Broken pipe` error is reported. How to handle it?** + + This error usually occurs when the data volume of a single Stream Load request exceeds the limit on the BE side. You can adjust it from the following aspects: + - Increase the `streaming_load_max_mb` parameter in `be.conf` on the BE side (default 10240, in MB), so that a single Stream Load can carry more data. The BE needs to be restarted to take effect. + - Enable batch mode (`sink.enable.batch-mode=true`), so that the Connector automatically splits the data into batches internally, avoiding too much data in a single Stream Load. + - Try to increase the parallelism of Oracle CDC by adding `--oracle-conf scan.incremental.snapshot.enabled=true` (experimental feature) to the startup command, which enables parallel reading of Oracle full data. + + For more Flink CDC related issues, please refer to [Flink CDC FAQ](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/faq/faq/). \ No newline at end of file diff --git a/docs/sql-manual/sql-functions/table-valued-functions/cdc-stream.md b/docs/sql-manual/sql-functions/table-valued-functions/cdc-stream.md index 687281d0a6ab3..dc94c843ce5d4 100644 --- a/docs/sql-manual/sql-functions/table-valued-functions/cdc-stream.md +++ b/docs/sql-manual/sql-functions/table-valued-functions/cdc-stream.md @@ -13,7 +13,7 @@ The CDC Stream table-valued-function (TVF) enables users to read change data from relational databases (such as MySQL, PostgreSQL) via CDC. By integrating [Flink CDC](https://github.com/apache/flink-cdc) reading capabilities, it supports full and incremental data synchronization. -It is typically used with `CREATE JOB ON STREAMING` to achieve continuous single-table data synchronization. For detailed usage, see [MySQL Single-table Import](../../../data-operate/import/streaming-job/continuous-load-mysql-single.md) and [PostgreSQL Single-table Import](../../../data-operate/import/streaming-job/continuous-load-postgresql-single.md). +It is typically used with `CREATE JOB ON STREAMING` to achieve continuous table-level data synchronization. For detailed usage, see [MySQL Table-level Sync](../../../data-operate/import/streaming-job/continuous-load-mysql-table.md) and [PostgreSQL Table-level Sync](../../../data-operate/import/streaming-job/continuous-load-postgresql-table.md). ## Syntax diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-multi.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-database.md similarity index 85% rename from i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-multi.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-database.md index 101bd8601af8e..8eeb65cffb53c 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-multi.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-database.md @@ -1,17 +1,17 @@ --- { - "title": "MySQL 整库同步", - "sidebar_label": "整库同步", + "title": "MySQL 库级同步", "language": "zh-CN", - "description": "Doris 可以通过 Streaming Job 的方式,将 MySQL 整库的全量和增量数据持续同步到 Doris 中。" + "sidebar_label": "库级同步", + "description": "Doris 可以通过 Streaming Job 的方式,以库为单位将 MySQL 一组表的全量和增量数据持续同步到 Doris 中,首次同步自动创建下游表。" } --- ## 概述 -支持通过 Job 将 MySQL 整库或指定多张表的全量和增量数据,通过 Stream Load 的方式持续同步到 Doris 中。适用于需要实时同步整库数据到 Doris 的场景。 +库级同步通过原生 `FROM MYSQL (...) TO DATABASE (...)` DDL 实现,**以库为同步单位,目标是一个 Doris database 容器**;可以通过 `include_tables` 控制同步一张、多张或全部表,首次同步时 Doris 会自动创建下游主键表,并保持主键与上游一致。适用于不需要对数据做 SQL 加工、希望下游表结构自动跟随上游的镜像复制场景。 -通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 支持从 MySQL 数据库读取变更日志,实现整库的全量和增量数据同步。首次同步时会自动创建 Doris 下游表(主键表),并保持主键与上游一致。 +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 从 MySQL 读取变更日志,将一组表的全量 + 增量数据通过 Stream Load 持续写入 Doris。若需要在同步过程中做列映射、过滤或数据转换,请参考 [MySQL 表级同步](./continuous-load-mysql-table.md)。 **注意事项:** @@ -99,7 +99,7 @@ TO DATABASE target_test_db ### 导入命令 -创建整库同步作业语法如下: +创建库级同步作业语法如下: ```sql CREATE JOB diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-single.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-table.md similarity index 79% rename from i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-single.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-table.md index c2522dd02c155..c2536b3b1ffee 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-single.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-table.md @@ -1,9 +1,9 @@ --- { - "title": "MySQL 单表同步", - "sidebar_label": "单表同步", + "title": "MySQL 表级同步", "language": "zh-CN", - "description": "Doris 可以通过 Job + CDC Stream TVF 的方式,将 MySQL 单张表的全量和增量数据持续同步到 Doris 中。" + "sidebar_label": "表级同步", + "description": "Doris 可以通过 Job + CDC Stream TVF 的方式,以表为单位将 MySQL 数据持续同步到指定的 Doris 表中,支持列映射与数据转换。" } --- @@ -13,9 +13,9 @@ ## 概述 -Doris 支持通过 Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md) 的方式,将 MySQL 单张表的全量和增量数据持续同步到指定的 Doris 表中。适用于需要对单张表进行灵活列映射和数据转换的实时同步场景。 +表级同步通过 Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md) 实现,目标是一张已存在的 Doris 表(`INSERT INTO tbl SELECT * FROM cdc_stream(...)`),借助 Doris SQL 的表达能力支持列映射、过滤和数据转换,保证 exactly-once 语义。适用于对数据需要做加工的实时同步场景。 -通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,Doris 支持从 MySQL 数据库读取变更日志(Binlog),实现单表的全量和增量数据同步。 +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,Doris 从 MySQL 读取变更日志(Binlog),实现源表到目标表的全量 + 增量同步。若希望 Doris 自动创建下游表、按库为单位同步一组表,请参考 [MySQL 库级同步](./continuous-load-mysql-database.md)。 **注意事项:** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-overview.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-overview.md index 5e64812d41d52..81a177075f12f 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-overview.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-overview.md @@ -13,16 +13,53 @@ Doris 支持通过 Streaming Job 的方式,从多种数据源持续导入数 持续导入支持以下数据源和导入模式: -| 数据源 | 支持版本 | 单表同步 | 整库同步 | 配置指南 | +| 数据源 | 支持版本 | 表级同步 | 库级同步 | 配置指南 | |:------|:--------|:--------|:--------|:--------| -| MySQL | 5.6、5.7、8.0.x | [MySQL 单表同步](./continuous-load-mysql-single.md) | [MySQL 整库同步](./continuous-load-mysql-multi.md) | [Amazon RDS MySQL](./prerequisites/amazon-rds-mysql.md) · [Amazon Aurora MySQL](./prerequisites/amazon-aurora-mysql.md) | -| PostgreSQL | 14、15、16、17 | [PostgreSQL 单表同步](./continuous-load-postgresql-single.md) | [PostgreSQL 整库同步](./continuous-load-postgresql-multi.md) | [Amazon RDS PostgreSQL](./prerequisites/amazon-rds-postgresql.md) · [Amazon Aurora PostgreSQL](./prerequisites/amazon-aurora-postgresql.md) | +| MySQL | 5.6、5.7、8.0.x | [MySQL 表级同步](./continuous-load-mysql-table.md) | [MySQL 库级同步](./continuous-load-mysql-database.md) | [Amazon RDS MySQL](./prerequisites/amazon-rds-mysql.md) · [Amazon Aurora MySQL](./prerequisites/amazon-aurora-mysql.md) | +| PostgreSQL | 14、15、16、17 | [PostgreSQL 表级同步](./continuous-load-postgresql-table.md) | [PostgreSQL 库级同步](./continuous-load-postgresql-database.md) | [Amazon RDS PostgreSQL](./prerequisites/amazon-rds-postgresql.md) · [Amazon Aurora PostgreSQL](./prerequisites/amazon-aurora-postgresql.md) | | S3 | - | [S3 持续导入](./continuous-load-s3.md) | - | - | -:::tip -- **单表同步**:通过 CDC Stream TVF 或 S3 TVF,将数据持续导入到指定的单张 Doris 表中,支持灵活的列映射和数据转换。 -- **整库同步**:通过原生 CDC 能力,将源端整库或指定多张表的全量和增量数据持续同步到 Doris 中,首次同步时自动创建下游表。 -::: +## 如何选择 + +表级同步和库级同步是两种**实现机制完全不同**的持续导入方式,并非"表数量"的区别。**库级同步也支持通过 `include_tables` 只同步一张表**,因此选型应以能力需求为准: + +| 能力维度 | 表级同步 | 库级同步 | +|:--------|:--------|:--------| +| 底层机制 | Job + TVF(`INSERT INTO tbl SELECT * FROM tvf()`) | Job + 原生整库 DDL(`FROM src TO DATABASE db`) | +| 目标层级 | 一张已存在的 Doris 表 | 一个 Doris database 容器 | +| 同步范围 | 单张表 | 一张到多张到整库(由 `include_tables` 控制) | +| 自动建表 | ❌ 需预建 | ✅ 首次同步自动创建主键表 | +| SQL 灵活表达 | ✅ 支持列映射、过滤、转换(SELECT 子句) | ❌ 原样复制,不支持 ETL | +| 语义保证 | exactly-once | at-least-once | +| 所需权限 | Load | Load + Create(自动建表时) | +| 典型适用场景 | 需要列裁剪、字段重命名、类型转换、条件过滤的实时同步 | 整库或一组表的镜像复制,希望下游表结构自动跟随上游 | + +- **需要对数据做 SQL 加工,或对精确一次语义有严格要求** → 选 **表级同步** +- **希望 Doris 自动建表、一次配置同步一组表** → 选 **库级同步** +- **数据源是 S3 对象存储** → 只支持表级同步(S3 TVF 方式) + +## 作业状态流转 + +Streaming Job 在运行过程中会在以下状态之间迁移,表级同步和库级同步遵循同一套状态机: + +```mermaid +stateDiagram-v2 + [*] --> PENDING: create job + PENDING --> RUNNING: createStreamingTask() + RUNNING --> FINISHED: 源消费完成 + RUNNING --> PAUSED: 执行失败(记录 failReason) + PAUSED --> PENDING: autoResume 指数退避到期 + FINISHED --> [*] +``` + +| 状态 | 含义 | +|:----|:----| +| **PENDING** | 作业已创建但尚未调度出子任务;等待下一次调度创建 `StreamingTask` | +| **RUNNING** | 已派生子任务并在执行中,从源端读取增量数据并写入 Doris | +| **FINISHED** | 源消费完成,作业终止。S3 TVF 文件全部导入完成后会进入该状态 | +| **PAUSED** | 子任务执行失败,作业自动暂停并记录 `failReason`;可通过 `select * from jobs(...)` 的 `ErrorMsg` 字段查看原因 | + +**自动恢复(autoResume):** 作业进入 `PAUSED` 后,调度器会按指数退避策略定时尝试恢复,恢复时回到 `PENDING` 继续创建子任务。**无需人工介入临时故障(网络抖动、上游短暂不可用等)会被自动消化。** 若需立即恢复或排查故障后手动启动,使用 [`RESUME JOB`](#恢复导入作业);需要彻底停止不再调度则使用 [`PAUSE JOB`](#暂停导入作业)(手动暂停不会被 autoResume 唤醒)或 [`DROP JOB`](#删除导入作业)。 ## 通用操作 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-multi.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-database.md similarity index 82% rename from i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-multi.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-database.md index 2fa6b9b172c71..b5f652d4c06ff 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-multi.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-database.md @@ -1,17 +1,17 @@ --- { - "title": "PostgreSQL 整库同步", - "sidebar_label": "整库同步", + "title": "PostgreSQL 库级同步", "language": "zh-CN", - "description": "Doris 可以通过 Streaming Job 的方式,将 PostgreSQL 整库的全量和增量数据持续同步到 Doris 中。" + "sidebar_label": "库级同步", + "description": "Doris 可以通过 Streaming Job 的方式,以库为单位将 PostgreSQL 一组表的全量和增量数据持续同步到 Doris 中,首次同步自动创建下游表。" } --- ## 概述 -支持通过 Job 将 PostgreSQL 整库或指定多张表的全量和增量数据,通过 Stream Load 的方式持续同步到 Doris 中。适用于需要实时同步整库数据到 Doris 的场景。 +库级同步通过原生 `FROM POSTGRES (...) TO DATABASE (...)` DDL 实现,**以库为同步单位,目标是一个 Doris database 容器**;可以通过 `include_tables` 控制同步一张、多张或全部表,首次同步时 Doris 会自动创建下游主键表,并保持主键与上游一致。适用于不需要对数据做 SQL 加工、希望下游表结构自动跟随上游的镜像复制场景。 -通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 支持从 PostgreSQL 数据库读取变更日志,实现整库的全量和增量数据同步。首次同步时会自动创建 Doris 下游表(主键表),并保持主键与上游一致。 +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 从 PostgreSQL 读取变更日志,将一组表的全量 + 增量数据通过 Stream Load 持续写入 Doris。若需要在同步过程中做列映射、过滤或数据转换,请参考 [PostgreSQL 表级同步](./continuous-load-postgresql-table.md)。 **注意事项:** @@ -73,7 +73,7 @@ select * from jobs("type"="insert") where ExecuteType = "STREAMING"; ### 导入命令 -创建整库同步作业语法如下: +创建库级同步作业语法如下: ```sql CREATE JOB diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-single.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-table.md similarity index 80% rename from i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-single.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-table.md index a74a81a327443..6a3eb9ade271e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-single.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-table.md @@ -1,9 +1,9 @@ --- { - "title": "PostgreSQL 单表同步", - "sidebar_label": "单表同步", + "title": "PostgreSQL 表级同步", "language": "zh-CN", - "description": "Doris 可以通过 Job + CDC Stream TVF 的方式,将 PostgreSQL 单张表的全量和增量数据持续同步到 Doris 中。" + "sidebar_label": "表级同步", + "description": "Doris 可以通过 Job + CDC Stream TVF 的方式,以表为单位将 PostgreSQL 数据持续同步到指定的 Doris 表中,支持列映射与数据转换。" } --- @@ -13,9 +13,9 @@ ## 概述 -Doris 支持通过 Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md) 的方式,将 PostgreSQL 单张表的全量和增量数据持续同步到指定的 Doris 表中。适用于需要对单张表进行灵活列映射和数据转换的实时同步场景。 +表级同步通过 Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md) 实现,目标是一张已存在的 Doris 表(`INSERT INTO tbl SELECT * FROM cdc_stream(...)`),借助 Doris SQL 的表达能力支持列映射、过滤和数据转换,保证 exactly-once 语义。适用于对数据需要做加工的实时同步场景。 -通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,Doris 支持从 PostgreSQL 数据库读取变更日志(WAL),实现单表的全量和增量数据同步。 +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,Doris 从 PostgreSQL 读取变更日志(WAL),实现源表到目标表的全量 + 增量同步。若希望 Doris 自动创建下游表、按库为单位同步一组表,请参考 [PostgreSQL 库级同步](./continuous-load-postgresql-database.md)。 **注意事项:** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector/flink-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector/flink-doris-connector.md index ea6727f383b9c..76166faf9cdcf 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector/flink-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector/flink-doris-connector.md @@ -1149,3 +1149,12 @@ from KAFKA_SOURCE; 7. **stream load error: HTTP/1.1 307 Temporary Redirect** Flink 会先向 FE 请求,收到 307 后会向重定向后的 BE 请求。当 FE 在 FullGC/压力大/网络延迟的时候,HttpClient 默认会在一定时间 (3 秒) 没有等到响应会发送数据,由于默认情况下请求体是 InputStream,当收到 307 响应时,数据无法重放,会直接报错。有三种方式可以解决:1.升级到 Connector25.1.0 以上,调长了默认时间;2.修改 auto-redirect=false,直接向 BE 发起请求(不适用部分云上场景);3.主键模型可以开启攒批模式。 + +8. **使用 Flink CDC 同步 Oracle 等数据库的大表时,出现 `I/O exception (java.net.SocketException) ... Broken pipe` 报错怎么办?** + + 该报错通常是单次 Stream Load 写入的数据量超过了 BE 端的限制导致的。可以从以下几个方面进行调整: + - 调大 BE 端 `be.conf` 中的 `streaming_load_max_mb` 参数(默认 10240,单位 MB),允许单次 Stream Load 写入更多数据,修改后需要重启 BE 生效。 + - 开启攒批模式(`sink.enable.batch-mode=true`),由 Connector 内部按批次大小自动切分写入,避免单次写入数据量过大。 + - 尝试调高 Oracle CDC 的并发度,在启动命令中增加 `--oracle-conf scan.incremental.snapshot.enabled=true`(实验性功能),开启后可以多并发读取 Oracle 全量数据。 + + 更多 Flink CDC 相关问题可以参考 [Flink CDC FAQ](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/faq/faq/)。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-functions/table-valued-functions/cdc-stream.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-functions/table-valued-functions/cdc-stream.md index 9f9638cbc273c..356992153d9c9 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-functions/table-valued-functions/cdc-stream.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-functions/table-valued-functions/cdc-stream.md @@ -13,7 +13,7 @@ CDC Stream 表函数(table-valued-function, tvf)可以让用户通过 CDC 方式读取关系型数据库(如 MySQL、PostgreSQL)的增量变更数据。通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,持续读取数据库的变更日志(Binlog/WAL)并写入 Doris。 -通常与 `CREATE JOB ON STREAMING` 配合使用,实现持续的单表增量数据同步。详细使用方式请参考 [MySQL 单表导入](../../../data-operate/import/streaming-job/continuous-load-mysql-single.md) 和 [PostgreSQL 单表导入](../../../data-operate/import/streaming-job/continuous-load-postgresql-single.md)。 +通常与 `CREATE JOB ON STREAMING` 配合使用,实现持续的表级增量数据同步。详细使用方式请参考 [MySQL 表级同步](../../../data-operate/import/streaming-job/continuous-load-mysql-table.md) 和 [PostgreSQL 表级同步](../../../data-operate/import/streaming-job/continuous-load-postgresql-table.md)。 :::note CDC Stream TVF 单独使用时仅支持增量数据同步,不支持全量快照读取。配合 [CREATE STREAMING JOB](../../sql-statements/job/CREATE-STREAMING-JOB.md) 使用时支持全量 + 增量同步。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-multi.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-database.md similarity index 85% rename from i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-multi.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-database.md index 7c676d73e9c15..1e0388fe3f5ae 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-multi.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-database.md @@ -1,17 +1,17 @@ --- { - "title": "MySQL 整库同步", + "title": "MySQL 库级同步", + "sidebar_label": "库级同步", "language": "zh-CN", - "sidebar_label": "整库同步", - "description": "Doris 可以通过 Streaming Job 的方式,将 MySQL 整库的全量和增量数据持续同步到 Doris 中。" + "description": "Doris 可以通过 Streaming Job 的方式,以库为单位将 MySQL 一组表的全量和增量数据持续同步到 Doris 中,首次同步自动创建下游表。" } --- ## 概述 -支持通过 Job 将 MySQL 整库或指定多张表的全量和增量数据,通过 Stream Load 的方式持续同步到 Doris 中。适用于需要实时同步整库数据到 Doris 的场景。 +库级同步通过原生 `FROM MYSQL (...) TO DATABASE (...)` DDL 实现,**以库为同步单位,目标是一个 Doris database 容器**;可以通过 `include_tables` 控制同步一张、多张或全部表,首次同步时 Doris 会自动创建下游主键表,并保持主键与上游一致。适用于不需要对数据做 SQL 加工、希望下游表结构自动跟随上游的镜像复制场景。 -通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 支持从 MySQL 数据库读取变更日志,实现整库的全量和增量数据同步。首次同步时会自动创建 Doris 下游表(主键表),并保持主键与上游一致。 +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 从 MySQL 读取变更日志,将一组表的全量 + 增量数据通过 Stream Load 持续写入 Doris。若需要在同步过程中做列映射、过滤或数据转换,请参考 [MySQL 表级同步](./continuous-load-mysql-table.md)。 **注意事项:** @@ -99,7 +99,7 @@ TO DATABASE target_test_db ### 导入命令 -创建整库同步作业语法如下: +创建库级同步作业语法如下: ```sql CREATE JOB diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-single.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-table.md similarity index 79% rename from i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-single.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-table.md index c43178eb6a432..89473b2e99759 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-mysql-single.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-table.md @@ -1,9 +1,9 @@ --- { - "title": "MySQL 单表同步", + "title": "MySQL 表级同步", + "sidebar_label": "表级同步", "language": "zh-CN", - "sidebar_label": "单表同步", - "description": "Doris 可以通过 Job + CDC Stream TVF 的方式,将 MySQL 单张表的全量和增量数据持续同步到 Doris 中。" + "description": "Doris 可以通过 Job + CDC Stream TVF 的方式,以表为单位将 MySQL 数据持续同步到指定的 Doris 表中,支持列映射与数据转换。" } --- @@ -13,9 +13,9 @@ ## 概述 -Doris 支持通过 Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md) 的方式,将 MySQL 单张表的全量和增量数据持续同步到指定的 Doris 表中。适用于需要对单张表进行灵活列映射和数据转换的实时同步场景。 +表级同步通过 Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md) 实现,目标是一张已存在的 Doris 表(`INSERT INTO tbl SELECT * FROM cdc_stream(...)`),借助 Doris SQL 的表达能力支持列映射、过滤和数据转换,保证 exactly-once 语义。适用于对数据需要做加工的实时同步场景。 -通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,Doris 支持从 MySQL 数据库读取变更日志(Binlog),实现单表的全量和增量数据同步。 +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,Doris 从 MySQL 读取变更日志(Binlog),实现源表到目标表的全量 + 增量同步。若希望 Doris 自动创建下游表、按库为单位同步一组表,请参考 [MySQL 库级同步](./continuous-load-mysql-database.md)。 **注意事项:** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-overview.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-overview.md index 22596203bbc72..679218180e635 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-overview.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-overview.md @@ -13,16 +13,53 @@ Doris 支持通过 Streaming Job 的方式,从多种数据源持续导入数 持续导入支持以下数据源和导入模式: -| 数据源 | 支持版本 | 单表同步 | 整库同步 | 配置指南 | +| 数据源 | 支持版本 | 表级同步 | 库级同步 | 配置指南 | |:------|:--------|:--------|:--------|:--------| -| MySQL | 5.6、5.7、8.0.x | [MySQL 单表同步](./continuous-load-mysql-single.md) | [MySQL 整库同步](./continuous-load-mysql-multi.md) | [Amazon RDS MySQL](./prerequisites/amazon-rds-mysql.md) · [Amazon Aurora MySQL](./prerequisites/amazon-aurora-mysql.md) | -| PostgreSQL | 14、15、16、17 | [PostgreSQL 单表同步](./continuous-load-postgresql-single.md) | [PostgreSQL 整库同步](./continuous-load-postgresql-multi.md) | [Amazon RDS PostgreSQL](./prerequisites/amazon-rds-postgresql.md) · [Amazon Aurora PostgreSQL](./prerequisites/amazon-aurora-postgresql.md) | +| MySQL | 5.6、5.7、8.0.x | [MySQL 表级同步](./continuous-load-mysql-table.md) | [MySQL 库级同步](./continuous-load-mysql-database.md) | [Amazon RDS MySQL](./prerequisites/amazon-rds-mysql.md) · [Amazon Aurora MySQL](./prerequisites/amazon-aurora-mysql.md) | +| PostgreSQL | 14、15、16、17 | [PostgreSQL 表级同步](./continuous-load-postgresql-table.md) | [PostgreSQL 库级同步](./continuous-load-postgresql-database.md) | [Amazon RDS PostgreSQL](./prerequisites/amazon-rds-postgresql.md) · [Amazon Aurora PostgreSQL](./prerequisites/amazon-aurora-postgresql.md) | | S3 | - | [S3 持续导入](./continuous-load-s3.md) | - | - | -:::tip -- **单表同步**:通过 CDC Stream TVF 或 S3 TVF,将数据持续导入到指定的单张 Doris 表中,支持灵活的列映射和数据转换。 -- **整库同步**:通过原生 CDC 能力,将源端整库或指定多张表的全量和增量数据持续同步到 Doris 中,首次同步时自动创建下游表。 -::: +## 如何选择 + +表级同步和库级同步是两种**实现机制完全不同**的持续导入方式,并非"表数量"的区别。**库级同步也支持通过 `include_tables` 只同步一张表**,因此选型应以能力需求为准: + +| 能力维度 | 表级同步 | 库级同步 | +|:--------|:--------|:--------| +| 底层机制 | Job + TVF(`INSERT INTO tbl SELECT * FROM tvf()`) | Job + 原生整库 DDL(`FROM src TO DATABASE db`) | +| 目标层级 | 一张已存在的 Doris 表 | 一个 Doris database 容器 | +| 同步范围 | 单张表 | 一张到多张到整库(由 `include_tables` 控制) | +| 自动建表 | ❌ 需预建 | ✅ 首次同步自动创建主键表 | +| SQL 灵活表达 | ✅ 支持列映射、过滤、转换(SELECT 子句) | ❌ 原样复制,不支持 ETL | +| 语义保证 | exactly-once | at-least-once | +| 所需权限 | Load | Load + Create(自动建表时) | +| 典型适用场景 | 需要列裁剪、字段重命名、类型转换、条件过滤的实时同步 | 整库或一组表的镜像复制,希望下游表结构自动跟随上游 | + +- **需要对数据做 SQL 加工,或对精确一次语义有严格要求** → 选 **表级同步** +- **希望 Doris 自动建表、一次配置同步一组表** → 选 **库级同步** +- **数据源是 S3 对象存储** → 只支持表级同步(S3 TVF 方式) + +## 作业状态流转 + +Streaming Job 在运行过程中会在以下状态之间迁移,表级同步和库级同步遵循同一套状态机: + +```mermaid +stateDiagram-v2 + [*] --> PENDING: create job + PENDING --> RUNNING: createStreamingTask() + RUNNING --> FINISHED: 源消费完成 + RUNNING --> PAUSED: 执行失败(记录 failReason) + PAUSED --> PENDING: autoResume 指数退避到期 + FINISHED --> [*] +``` + +| 状态 | 含义 | +|:----|:----| +| **PENDING** | 作业已创建但尚未调度出子任务;等待下一次调度创建 `StreamingTask` | +| **RUNNING** | 已派生子任务并在执行中,从源端读取增量数据并写入 Doris | +| **FINISHED** | 源消费完成,作业终止。S3 TVF 文件全部导入完成后会进入该状态 | +| **PAUSED** | 子任务执行失败,作业自动暂停并记录 `failReason`;可通过 `select * from jobs(...)` 的 `ErrorMsg` 字段查看原因 | + +**自动恢复(autoResume):** 作业进入 `PAUSED` 后,调度器会按指数退避策略定时尝试恢复,恢复时回到 `PENDING` 继续创建子任务。**无需人工介入临时故障(网络抖动、上游短暂不可用等)会被自动消化。** 若需立即恢复或排查故障后手动启动,使用 [`RESUME JOB`](#恢复导入作业);需要彻底停止不再调度则使用 [`PAUSE JOB`](#暂停导入作业)(手动暂停不会被 autoResume 唤醒)或 [`DROP JOB`](#删除导入作业)。 ## 通用操作 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-multi.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-database.md similarity index 82% rename from i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-multi.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-database.md index 608318f1204f7..d7627f4f063ee 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-multi.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-database.md @@ -1,17 +1,17 @@ --- { - "title": "PostgreSQL 整库同步", + "title": "PostgreSQL 库级同步", + "sidebar_label": "库级同步", "language": "zh-CN", - "sidebar_label": "整库同步", - "description": "Doris 可以通过 Streaming Job 的方式,将 PostgreSQL 整库的全量和增量数据持续同步到 Doris 中。" + "description": "Doris 可以通过 Streaming Job 的方式,以库为单位将 PostgreSQL 一组表的全量和增量数据持续同步到 Doris 中,首次同步自动创建下游表。" } --- ## 概述 -支持通过 Job 将 PostgreSQL 整库或指定多张表的全量和增量数据,通过 Stream Load 的方式持续同步到 Doris 中。适用于需要实时同步整库数据到 Doris 的场景。 +库级同步通过原生 `FROM POSTGRES (...) TO DATABASE (...)` DDL 实现,**以库为同步单位,目标是一个 Doris database 容器**;可以通过 `include_tables` 控制同步一张、多张或全部表,首次同步时 Doris 会自动创建下游主键表,并保持主键与上游一致。适用于不需要对数据做 SQL 加工、希望下游表结构自动跟随上游的镜像复制场景。 -通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 支持从 PostgreSQL 数据库读取变更日志,实现整库的全量和增量数据同步。首次同步时会自动创建 Doris 下游表(主键表),并保持主键与上游一致。 +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 从 PostgreSQL 读取变更日志,将一组表的全量 + 增量数据通过 Stream Load 持续写入 Doris。若需要在同步过程中做列映射、过滤或数据转换,请参考 [PostgreSQL 表级同步](./continuous-load-postgresql-table.md)。 **注意事项:** @@ -73,7 +73,7 @@ select * from jobs("type"="insert") where ExecuteType = "STREAMING"; ### 导入命令 -创建整库同步作业语法如下: +创建库级同步作业语法如下: ```sql CREATE JOB diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-single.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-table.md similarity index 80% rename from i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-single.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-table.md index 36b0be0b90d77..aa73b8163405e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/continuous-load-postgresql-single.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-table.md @@ -1,9 +1,9 @@ --- { - "title": "PostgreSQL 单表同步", + "title": "PostgreSQL 表级同步", + "sidebar_label": "表级同步", "language": "zh-CN", - "sidebar_label": "单表同步", - "description": "Doris 可以通过 Job + CDC Stream TVF 的方式,将 PostgreSQL 单张表的全量和增量数据持续同步到 Doris 中。" + "description": "Doris 可以通过 Job + CDC Stream TVF 的方式,以表为单位将 PostgreSQL 数据持续同步到指定的 Doris 表中,支持列映射与数据转换。" } --- @@ -13,9 +13,9 @@ ## 概述 -Doris 支持通过 Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md) 的方式,将 PostgreSQL 单张表的全量和增量数据持续同步到指定的 Doris 表中。适用于需要对单张表进行灵活列映射和数据转换的实时同步场景。 +表级同步通过 Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md) 实现,目标是一张已存在的 Doris 表(`INSERT INTO tbl SELECT * FROM cdc_stream(...)`),借助 Doris SQL 的表达能力支持列映射、过滤和数据转换,保证 exactly-once 语义。适用于对数据需要做加工的实时同步场景。 -通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,Doris 支持从 PostgreSQL 数据库读取变更日志(WAL),实现单表的全量和增量数据同步。 +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,Doris 从 PostgreSQL 读取变更日志(WAL),实现源表到目标表的全量 + 增量同步。若希望 Doris 自动创建下游表、按库为单位同步一组表,请参考 [PostgreSQL 库级同步](./continuous-load-postgresql-database.md)。 **注意事项:** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/ecosystem/flink-doris-connector/flink-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/ecosystem/flink-doris-connector/flink-doris-connector.md index ea6727f383b9c..76166faf9cdcf 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/ecosystem/flink-doris-connector/flink-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/ecosystem/flink-doris-connector/flink-doris-connector.md @@ -1149,3 +1149,12 @@ from KAFKA_SOURCE; 7. **stream load error: HTTP/1.1 307 Temporary Redirect** Flink 会先向 FE 请求,收到 307 后会向重定向后的 BE 请求。当 FE 在 FullGC/压力大/网络延迟的时候,HttpClient 默认会在一定时间 (3 秒) 没有等到响应会发送数据,由于默认情况下请求体是 InputStream,当收到 307 响应时,数据无法重放,会直接报错。有三种方式可以解决:1.升级到 Connector25.1.0 以上,调长了默认时间;2.修改 auto-redirect=false,直接向 BE 发起请求(不适用部分云上场景);3.主键模型可以开启攒批模式。 + +8. **使用 Flink CDC 同步 Oracle 等数据库的大表时,出现 `I/O exception (java.net.SocketException) ... Broken pipe` 报错怎么办?** + + 该报错通常是单次 Stream Load 写入的数据量超过了 BE 端的限制导致的。可以从以下几个方面进行调整: + - 调大 BE 端 `be.conf` 中的 `streaming_load_max_mb` 参数(默认 10240,单位 MB),允许单次 Stream Load 写入更多数据,修改后需要重启 BE 生效。 + - 开启攒批模式(`sink.enable.batch-mode=true`),由 Connector 内部按批次大小自动切分写入,避免单次写入数据量过大。 + - 尝试调高 Oracle CDC 的并发度,在启动命令中增加 `--oracle-conf scan.incremental.snapshot.enabled=true`(实验性功能),开启后可以多并发读取 Oracle 全量数据。 + + 更多 Flink CDC 相关问题可以参考 [Flink CDC FAQ](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/faq/faq/)。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-functions/table-valued-functions/cdc-stream.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-functions/table-valued-functions/cdc-stream.md index 9f9638cbc273c..356992153d9c9 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-functions/table-valued-functions/cdc-stream.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-functions/table-valued-functions/cdc-stream.md @@ -13,7 +13,7 @@ CDC Stream 表函数(table-valued-function, tvf)可以让用户通过 CDC 方式读取关系型数据库(如 MySQL、PostgreSQL)的增量变更数据。通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 的读取能力,持续读取数据库的变更日志(Binlog/WAL)并写入 Doris。 -通常与 `CREATE JOB ON STREAMING` 配合使用,实现持续的单表增量数据同步。详细使用方式请参考 [MySQL 单表导入](../../../data-operate/import/streaming-job/continuous-load-mysql-single.md) 和 [PostgreSQL 单表导入](../../../data-operate/import/streaming-job/continuous-load-postgresql-single.md)。 +通常与 `CREATE JOB ON STREAMING` 配合使用,实现持续的表级增量数据同步。详细使用方式请参考 [MySQL 表级同步](../../../data-operate/import/streaming-job/continuous-load-mysql-table.md) 和 [PostgreSQL 表级同步](../../../data-operate/import/streaming-job/continuous-load-postgresql-table.md)。 :::note CDC Stream TVF 单独使用时仅支持增量数据同步,不支持全量快照读取。配合 [CREATE STREAMING JOB](../../sql-statements/job/CREATE-STREAMING-JOB.md) 使用时支持全量 + 增量同步。 diff --git a/sidebars.ts b/sidebars.ts index 4d2a5e3613077..7a1e4ac4c51b3 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -231,16 +231,16 @@ const sidebars: SidebarsConfig = { type: 'category', label: 'MySQL', items: [ - 'data-operate/import/streaming-job/continuous-load-mysql-single', - 'data-operate/import/streaming-job/continuous-load-mysql-multi', + 'data-operate/import/streaming-job/continuous-load-mysql-table', + 'data-operate/import/streaming-job/continuous-load-mysql-database', ], }, { type: 'category', label: 'PostgreSQL', items: [ - 'data-operate/import/streaming-job/continuous-load-postgresql-single', - 'data-operate/import/streaming-job/continuous-load-postgresql-multi', + 'data-operate/import/streaming-job/continuous-load-postgresql-table', + 'data-operate/import/streaming-job/continuous-load-postgresql-database', ], }, 'data-operate/import/streaming-job/continuous-load-s3', diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-multi.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-database.md similarity index 82% rename from versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-multi.md rename to versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-database.md index 5336b2e8c53be..87e93d0531101 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-multi.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-database.md @@ -1,17 +1,17 @@ --- { - "title": "MySQL Full Database Sync", - "sidebar_label": "Full Database Sync", + "title": "MySQL Database-level Sync", + "sidebar_label": "Database-level Sync", "language": "en", - "description": "Doris can continuously synchronize full and incremental data from an entire MySQL database or selected tables into Doris using Streaming Job." + "description": "Doris can continuously sync full and incremental data of a group of MySQL tables into Doris at the database level via Streaming Job, auto-creating downstream tables on first sync." } --- ## Overview -Supports using Job to continuously synchronize full and incremental data from an entire MySQL database or selected tables to Doris via Stream Load. Suitable for scenarios requiring real-time full database sync to Doris. +Database-level Sync is implemented via the native `FROM MYSQL (...) TO DATABASE (...)` DDL, **using a database as the sync unit with a Doris database as the target container**. You can sync one, several, or all tables via `include_tables`; on first sync Doris automatically creates downstream primary-key tables and keeps primary keys consistent with the upstream. Suitable for mirror replication scenarios where downstream schema should track upstream automatically and no SQL processing is needed. -By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris supports reading change logs from MySQL databases, enabling full and incremental full database sync. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream. +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris reads change logs from MySQL and continuously writes full + incremental data of a group of tables into Doris via Stream Load. If you need column mapping, filtering, or data transformation during sync, see [MySQL Table-level Sync](./continuous-load-mysql-table.md). **Notes:** @@ -99,7 +99,7 @@ For more common operations (pause, resume, delete, check Task, etc.), see [Conti ### Import Command -Syntax for creating a full database sync job: +Syntax for creating a database-level sync job: ```sql CREATE JOB diff --git a/docs/data-operate/import/streaming-job/continuous-load-mysql-single.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-table.md similarity index 74% rename from docs/data-operate/import/streaming-job/continuous-load-mysql-single.md rename to versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-table.md index 823a262aa1a8b..2a0058c7c25ca 100644 --- a/docs/data-operate/import/streaming-job/continuous-load-mysql-single.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-mysql-table.md @@ -1,9 +1,9 @@ --- { - "title": "MySQL Single Table Sync", - "sidebar_label": "Single Table Sync", + "title": "MySQL Table-level Sync", + "sidebar_label": "Table-level Sync", "language": "en", - "description": "Doris supports continuously synchronizing full and incremental data from a single MySQL table into Doris using Job + CDC Stream TVF." + "description": "Doris can continuously sync MySQL data into a specified Doris table using Job + CDC Stream TVF at the table level, with support for column mapping and data transformation." } --- @@ -13,9 +13,9 @@ This feature is supported since version 4.1.0. ## Overview -Doris supports continuously synchronizing full and incremental data from a single MySQL table into a specified Doris table using Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md). This is suitable for real-time synchronization scenarios that require flexible column mapping and data transformation on a single table. +Table-level Sync is implemented via Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md), targeting an existing Doris table (`INSERT INTO tbl SELECT * FROM cdc_stream(...)`). It leverages Doris SQL to support column mapping, filtering and data transformation, with exactly-once semantics. Suitable for real-time sync scenarios that require data processing. -By integrating [Flink CDC](https://github.com/apache/flink-cdc) reading capabilities, Doris supports reading change logs (Binlog) from MySQL databases, enabling full and incremental data synchronization for a single table. +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris reads change logs (Binlog) from MySQL, enabling full + incremental sync from the source table to the target table. If you prefer Doris to auto-create downstream tables or sync a group of tables at the database granularity, see [MySQL Database-level Sync](./continuous-load-mysql-database.md). **Notes:** diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-overview.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-overview.md index 170be458b04de..d80a46f7c4883 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-overview.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-overview.md @@ -13,16 +13,53 @@ Doris supports continuously loading data from multiple data sources into Doris t Continuous Load supports the following data sources and import modes: -| Data Source | Supported Versions | Single Table Sync | Full Database Sync | Setup Guide | +| Data Source | Supported Versions | Table-level Sync | Database-level Sync | Setup Guide | |:------|:--------|:--------|:--------|:--------| -| MySQL | 5.6, 5.7, 8.0.x | [MySQL Single Table Sync](./continuous-load-mysql-single.md) | [MySQL Full Database Sync](./continuous-load-mysql-multi.md) | [Amazon RDS MySQL](./prerequisites/amazon-rds-mysql.md) · [Amazon Aurora MySQL](./prerequisites/amazon-aurora-mysql.md) | -| PostgreSQL | 14, 15, 16, 17 | [PostgreSQL Single Table Sync](./continuous-load-postgresql-single.md) | [PostgreSQL Full Database Sync](./continuous-load-postgresql-multi.md) | [Amazon RDS PostgreSQL](./prerequisites/amazon-rds-postgresql.md) · [Amazon Aurora PostgreSQL](./prerequisites/amazon-aurora-postgresql.md) | +| MySQL | 5.6, 5.7, 8.0.x | [MySQL Table-level Sync](./continuous-load-mysql-table.md) | [MySQL Database-level Sync](./continuous-load-mysql-database.md) | [Amazon RDS MySQL](./prerequisites/amazon-rds-mysql.md) · [Amazon Aurora MySQL](./prerequisites/amazon-aurora-mysql.md) | +| PostgreSQL | 14, 15, 16, 17 | [PostgreSQL Table-level Sync](./continuous-load-postgresql-table.md) | [PostgreSQL Database-level Sync](./continuous-load-postgresql-database.md) | [Amazon RDS PostgreSQL](./prerequisites/amazon-rds-postgresql.md) · [Amazon Aurora PostgreSQL](./prerequisites/amazon-aurora-postgresql.md) | | S3 | - | [S3 Continuous Load](./continuous-load-s3.md) | - | - | -:::tip -- **Single Table Sync**: Uses CDC Stream TVF or S3 TVF to continuously load data into a specific Doris table, supporting flexible column mapping and data transformation. -- **Full Database Sync**: Uses native CDC capability to continuously synchronize full and incremental data from an entire database or selected tables into Doris, automatically creating downstream tables on first sync. -::: +## How to Choose + +Table-level Sync and Database-level Sync are **two fundamentally different mechanisms**, not a distinction by "number of tables". **Database-level Sync can also sync just one table via `include_tables`**, so the choice should be driven by capability requirements: + +| Capability | Table-level Sync | Database-level Sync | +|:--------|:--------|:--------| +| Underlying mechanism | Job + TVF (`INSERT INTO tbl SELECT * FROM tvf()`) | Job + native database DDL (`FROM src TO DATABASE db`) | +| Target granularity | One existing Doris table | A Doris database container | +| Sync scope | A single table | One to many to all tables (controlled by `include_tables`) | +| Auto-create tables | ❌ Requires pre-creation | ✅ Auto-creates primary-key tables on first sync | +| SQL expressiveness | ✅ Column mapping, filtering, transformation (via SELECT) | ❌ Direct replication, no ETL | +| Delivery semantics | exactly-once | at-least-once | +| Required privileges | Load | Load + Create (when auto-creating) | +| Typical scenarios | Real-time sync needing column pruning, renaming, type conversion, or conditional filtering | Mirror replication of a database or group of tables, where downstream schema should track upstream automatically | + +- **Need SQL transformations or strict exactly-once semantics** → Choose **Table-level Sync** +- **Want Doris to auto-create tables and sync a group of tables with one config** → Choose **Database-level Sync** +- **Source is S3 object storage** → Only Table-level Sync is supported (via S3 TVF) + +## Job Lifecycle + +A Streaming Job transitions between the following states during its lifecycle. Both Table-level Sync and Database-level Sync follow the same state machine: + +```mermaid +stateDiagram-v2 + [*] --> PENDING: create job + PENDING --> RUNNING: createStreamingTask() + RUNNING --> FINISHED: source consumed + RUNNING --> PAUSED: task failed (with failReason) + PAUSED --> PENDING: autoResume after exponential backoff + FINISHED --> [*] +``` + +| State | Description | +|:----|:----| +| **PENDING** | The job has been created but no `StreamingTask` has been dispatched yet; awaiting the next scheduling round | +| **RUNNING** | A child task has been dispatched and is running, reading incremental data from the source and writing into Doris | +| **FINISHED** | The source has been fully consumed and the job has terminated. S3 TVF jobs enter this state once all files have been imported | +| **PAUSED** | A child task failed; the job is automatically paused and a `failReason` is recorded. Check the `ErrorMsg` column in `select * from jobs(...)` for details | + +**Auto-resume:** After entering `PAUSED`, the scheduler periodically retries with an exponential backoff strategy and transitions the job back to `PENDING` to dispatch a new task. **Transient failures (network jitter, brief upstream unavailability, etc.) are absorbed automatically without manual intervention.** To resume immediately after diagnosing a failure, use [`RESUME JOB`](#resume-import-job); to stop scheduling entirely, use [`PAUSE JOB`](#pause-import-job) (manually paused jobs are NOT woken up by auto-resume) or [`DROP JOB`](#delete-import-job). ## Common Operations diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-multi.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-database.md similarity index 79% rename from versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-multi.md rename to versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-database.md index a8332b365a182..7d3fa361e99f9 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-multi.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-database.md @@ -1,17 +1,17 @@ --- { - "title": "PostgreSQL Full Database Sync", - "sidebar_label": "Full Database Sync", + "title": "PostgreSQL Database-level Sync", + "sidebar_label": "Database-level Sync", "language": "en", - "description": "Doris can continuously synchronize full and incremental data from an entire PostgreSQL database or selected tables into Doris using Streaming Job." + "description": "Doris can continuously sync full and incremental data of a group of PostgreSQL tables into Doris at the database level via Streaming Job, auto-creating downstream tables on first sync." } --- ## Overview -Supports using Job to continuously synchronize full and incremental data from an entire PostgreSQL database or selected tables to Doris via Stream Load. Suitable for scenarios requiring real-time full database sync to Doris. +Database-level Sync is implemented via the native `FROM POSTGRES (...) TO DATABASE (...)` DDL, **using a database as the sync unit with a Doris database as the target container**. You can sync one, several, or all tables via `include_tables`; on first sync Doris automatically creates downstream primary-key tables and keeps primary keys consistent with the upstream. Suitable for mirror replication scenarios where downstream schema should track upstream automatically and no SQL processing is needed. -By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris supports reading change logs from PostgreSQL databases, enabling full and incremental full database sync. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream. +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris reads change logs from PostgreSQL and continuously writes full + incremental data of a group of tables into Doris via Stream Load. If you need column mapping, filtering, or data transformation during sync, see [PostgreSQL Table-level Sync](./continuous-load-postgresql-table.md). **Notes:** @@ -73,7 +73,7 @@ For more common operations (pause, resume, delete, check Task, etc.), see [Conti ### Import Command -Syntax for creating a full database sync job: +Syntax for creating a database-level sync job: ```sql CREATE JOB diff --git a/docs/data-operate/import/streaming-job/continuous-load-postgresql-single.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-table.md similarity index 75% rename from docs/data-operate/import/streaming-job/continuous-load-postgresql-single.md rename to versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-table.md index fd30ec8ff59cb..9335a4c4dd16f 100644 --- a/docs/data-operate/import/streaming-job/continuous-load-postgresql-single.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/continuous-load-postgresql-table.md @@ -1,9 +1,9 @@ --- { - "title": "PostgreSQL Single Table Sync", - "sidebar_label": "Single Table Sync", + "title": "PostgreSQL Table-level Sync", + "sidebar_label": "Table-level Sync", "language": "en", - "description": "Doris supports continuously synchronizing full and incremental data from a single PostgreSQL table into Doris using Job + CDC Stream TVF." + "description": "Doris can continuously sync PostgreSQL data into a specified Doris table using Job + CDC Stream TVF at the table level, with support for column mapping and data transformation." } --- @@ -13,9 +13,9 @@ This feature is supported since version 4.1.0. ## Overview -Doris supports continuously synchronizing full and incremental data from a single PostgreSQL table into a specified Doris table using Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md). This is suitable for real-time synchronization scenarios that require flexible column mapping and data transformation on a single table. +Table-level Sync is implemented via Job + [CDC Stream TVF](../../../sql-manual/sql-functions/table-valued-functions/cdc-stream.md), targeting an existing Doris table (`INSERT INTO tbl SELECT * FROM cdc_stream(...)`). It leverages Doris SQL to support column mapping, filtering and data transformation, with exactly-once semantics. Suitable for real-time sync scenarios that require data processing. -By integrating [Flink CDC](https://github.com/apache/flink-cdc) reading capabilities, Doris supports reading change logs (WAL) from PostgreSQL databases, enabling full and incremental data synchronization for a single table. +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris reads change logs (WAL) from PostgreSQL, enabling full + incremental sync from the source table to the target table. If you prefer Doris to auto-create downstream tables or sync a group of tables at the database granularity, see [PostgreSQL Database-level Sync](./continuous-load-postgresql-database.md). **Notes:** diff --git a/versioned_docs/version-4.x/ecosystem/flink-doris-connector/flink-doris-connector.md b/versioned_docs/version-4.x/ecosystem/flink-doris-connector/flink-doris-connector.md index c448329988463..2a01fb29833b5 100644 --- a/versioned_docs/version-4.x/ecosystem/flink-doris-connector/flink-doris-connector.md +++ b/versioned_docs/version-4.x/ecosystem/flink-doris-connector/flink-doris-connector.md @@ -1148,4 +1148,13 @@ In the whole database synchronization tool provided by the Connector, no additio 7. **stream load error: HTTP/1.1 307 Temporary Redirect** - Flink will first request FE, and after receiving 307, it will request BE after redirection. When FE is in FullGC/high pressure/network delay, HttpClient will send data without waiting for a response within a certain period of time (3 seconds) by default. Since the request body is InputStream by default, when a 307 response is received, the data cannot be replayed and an error will be reported directly. There are three ways to solve this problem: 1. Upgrade to Connector25.1.0 or above to increase the default time; 2. Modify auto-redirect=false to directly initiate a request to BE (not applicable to some cloud scenarios); 3. The unique key model can enable batch mode. \ No newline at end of file + Flink will first request FE, and after receiving 307, it will request BE after redirection. When FE is in FullGC/high pressure/network delay, HttpClient will send data without waiting for a response within a certain period of time (3 seconds) by default. Since the request body is InputStream by default, when a 307 response is received, the data cannot be replayed and an error will be reported directly. There are three ways to solve this problem: 1. Upgrade to Connector25.1.0 or above to increase the default time; 2. Modify auto-redirect=false to directly initiate a request to BE (not applicable to some cloud scenarios); 3. The unique key model can enable batch mode. + +8. **When using Flink CDC to sync large tables from databases such as Oracle, an `I/O exception (java.net.SocketException) ... Broken pipe` error is reported. How to handle it?** + + This error usually occurs when the data volume of a single Stream Load request exceeds the limit on the BE side. You can adjust it from the following aspects: + - Increase the `streaming_load_max_mb` parameter in `be.conf` on the BE side (default 10240, in MB), so that a single Stream Load can carry more data. The BE needs to be restarted to take effect. + - Enable batch mode (`sink.enable.batch-mode=true`), so that the Connector automatically splits the data into batches internally, avoiding too much data in a single Stream Load. + - Try to increase the parallelism of Oracle CDC by adding `--oracle-conf scan.incremental.snapshot.enabled=true` (experimental feature) to the startup command, which enables parallel reading of Oracle full data. + + For more Flink CDC related issues, please refer to [Flink CDC FAQ](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/faq/faq/). \ No newline at end of file diff --git a/versioned_docs/version-4.x/sql-manual/sql-functions/table-valued-functions/cdc-stream.md b/versioned_docs/version-4.x/sql-manual/sql-functions/table-valued-functions/cdc-stream.md index 687281d0a6ab3..dc94c843ce5d4 100644 --- a/versioned_docs/version-4.x/sql-manual/sql-functions/table-valued-functions/cdc-stream.md +++ b/versioned_docs/version-4.x/sql-manual/sql-functions/table-valued-functions/cdc-stream.md @@ -13,7 +13,7 @@ The CDC Stream table-valued-function (TVF) enables users to read change data from relational databases (such as MySQL, PostgreSQL) via CDC. By integrating [Flink CDC](https://github.com/apache/flink-cdc) reading capabilities, it supports full and incremental data synchronization. -It is typically used with `CREATE JOB ON STREAMING` to achieve continuous single-table data synchronization. For detailed usage, see [MySQL Single-table Import](../../../data-operate/import/streaming-job/continuous-load-mysql-single.md) and [PostgreSQL Single-table Import](../../../data-operate/import/streaming-job/continuous-load-postgresql-single.md). +It is typically used with `CREATE JOB ON STREAMING` to achieve continuous table-level data synchronization. For detailed usage, see [MySQL Table-level Sync](../../../data-operate/import/streaming-job/continuous-load-mysql-table.md) and [PostgreSQL Table-level Sync](../../../data-operate/import/streaming-job/continuous-load-postgresql-table.md). ## Syntax diff --git a/versioned_sidebars/version-4.x-sidebars.json b/versioned_sidebars/version-4.x-sidebars.json index 67a6e701858db..b74ae60a42338 100644 --- a/versioned_sidebars/version-4.x-sidebars.json +++ b/versioned_sidebars/version-4.x-sidebars.json @@ -234,16 +234,16 @@ "type": "category", "label": "MySQL", "items": [ - "data-operate/import/streaming-job/continuous-load-mysql-single", - "data-operate/import/streaming-job/continuous-load-mysql-multi" + "data-operate/import/streaming-job/continuous-load-mysql-table", + "data-operate/import/streaming-job/continuous-load-mysql-database" ] }, { "type": "category", "label": "PostgreSQL", "items": [ - "data-operate/import/streaming-job/continuous-load-postgresql-single", - "data-operate/import/streaming-job/continuous-load-postgresql-multi" + "data-operate/import/streaming-job/continuous-load-postgresql-table", + "data-operate/import/streaming-job/continuous-load-postgresql-database" ] }, "data-operate/import/streaming-job/continuous-load-s3",