diff --git a/docs/data-sources/source_table.md b/docs/data-sources/source_table.md
new file mode 100644
index 00000000..29a9fa8b
--- /dev/null
+++ b/docs/data-sources/source_table.md
@@ -0,0 +1,65 @@
+---
+# generated by https://github.com/hashicorp/terraform-plugin-docs
+page_title: "materialize_source_table Data Source - terraform-provider-materialize"
+subcategory: ""
+description: |-
+
+---
+
+# materialize_source_table (Data Source)
+
+
+
+## Example Usage
+
+```terraform
+data "materialize_source_table" "all" {}
+
+data "materialize_source_table" "materialize" {
+ database_name = "materialize"
+}
+
+data "materialize_source_table" "materialize_schema" {
+ database_name = "materialize"
+ schema_name = "schema"
+}
+```
+
+
+## Schema
+
+### Optional
+
+- `database_name` (String) Limit tables to a specific database
+- `region` (String) The region in which the resource is located.
+- `schema_name` (String) Limit tables to a specific schema within a specific database
+
+### Read-Only
+
+- `id` (String) The ID of this resource.
+- `tables` (List of Object) The source tables in the account (see [below for nested schema](#nestedatt--tables))
+
+
+### Nested Schema for `tables`
+
+Read-Only:
+
+- `comment` (String)
+- `database_name` (String)
+- `id` (String)
+- `name` (String)
+- `owner_name` (String)
+- `schema_name` (String)
+- `source` (List of Object) (see [below for nested schema](#nestedobjatt--tables--source))
+- `source_type` (String)
+- `upstream_name` (String)
+- `upstream_schema_name` (String)
+
+
+### Nested Schema for `tables.source`
+
+Read-Only:
+
+- `database_name` (String)
+- `name` (String)
+- `schema_name` (String)
diff --git a/examples/data-sources/materialize_source_table/data-source.tf b/examples/data-sources/materialize_source_table/data-source.tf
new file mode 100644
index 00000000..30e5c940
--- /dev/null
+++ b/examples/data-sources/materialize_source_table/data-source.tf
@@ -0,0 +1,10 @@
+data "materialize_source_table" "all" {}
+
+data "materialize_source_table" "materialize" {
+ database_name = "materialize"
+}
+
+data "materialize_source_table" "materialize_schema" {
+ database_name = "materialize"
+ schema_name = "schema"
+}
diff --git a/pkg/datasources/datasource_source_table.go b/pkg/datasources/datasource_source_table.go
new file mode 100644
index 00000000..8d39b625
--- /dev/null
+++ b/pkg/datasources/datasource_source_table.go
@@ -0,0 +1,157 @@
+package datasources
+
+import (
+ "context"
+
+ "github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize"
+ "github.com/MaterializeInc/terraform-provider-materialize/pkg/utils"
+
+ "github.com/hashicorp/terraform-plugin-sdk/v2/diag"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
+)
+
+func SourceTable() *schema.Resource {
+ return &schema.Resource{
+ ReadContext: sourceTableRead,
+ Schema: map[string]*schema.Schema{
+ "database_name": {
+ Type: schema.TypeString,
+ Optional: true,
+ Description: "Limit tables to a specific database",
+ },
+ "schema_name": {
+ Type: schema.TypeString,
+ Optional: true,
+ Description: "Limit tables to a specific schema within a specific database",
+ RequiredWith: []string{"database_name"},
+ },
+ "tables": {
+ Type: schema.TypeList,
+ Computed: true,
+ Description: "The source tables in the account",
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "id": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The ID of the source table",
+ },
+ "name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The name of the source table",
+ },
+ "schema_name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The schema name of the source table",
+ },
+ "database_name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The database name of the source table",
+ },
+ "source": {
+ Type: schema.TypeList,
+ Computed: true,
+ Description: "Information about the source",
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The name of the source",
+ },
+ "schema_name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The schema name of the source",
+ },
+ "database_name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The database name of the source",
+ },
+ },
+ },
+ },
+ "source_type": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The type of the source",
+ },
+ "upstream_name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The name of the upstream table",
+ },
+ "upstream_schema_name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The schema name of the upstream table",
+ },
+ "comment": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The comment on the source table",
+ },
+ "owner_name": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "The name of the owner of the source table",
+ },
+ },
+ },
+ },
+ "region": RegionSchema(),
+ },
+ }
+}
+
+func sourceTableRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
+ schemaName := d.Get("schema_name").(string)
+ databaseName := d.Get("database_name").(string)
+
+ var diags diag.Diagnostics
+
+ metaDb, region, err := utils.GetDBClientFromMeta(meta, d)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+ dataSource, err := materialize.ListSourceTables(metaDb, schemaName, databaseName)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ tableFormats := []map[string]interface{}{}
+ for _, p := range dataSource {
+ tableMap := map[string]interface{}{
+ "id": p.TableId.String,
+ "name": p.TableName.String,
+ "schema_name": p.SchemaName.String,
+ "database_name": p.DatabaseName.String,
+ "source_type": p.SourceType.String,
+ "upstream_name": p.UpstreamName.String,
+ "upstream_schema_name": p.UpstreamSchemaName.String,
+ "comment": p.Comment.String,
+ "owner_name": p.OwnerName.String,
+ }
+
+ sourceMap := map[string]interface{}{
+ "name": p.SourceName.String,
+ "schema_name": p.SourceSchemaName.String,
+ "database_name": p.SourceDatabaseName.String,
+ }
+ tableMap["source"] = []interface{}{sourceMap}
+
+ tableFormats = append(tableFormats, tableMap)
+ }
+
+ if err := d.Set("tables", tableFormats); err != nil {
+ return diag.FromErr(err)
+ }
+
+ SetId(string(region), "source_tables", databaseName, schemaName, d)
+
+ return diags
+}
diff --git a/pkg/datasources/datasource_source_table_test.go b/pkg/datasources/datasource_source_table_test.go
new file mode 100644
index 00000000..59413f73
--- /dev/null
+++ b/pkg/datasources/datasource_source_table_test.go
@@ -0,0 +1,53 @@
+package datasources
+
+import (
+ "context"
+ "testing"
+
+ "github.com/MaterializeInc/terraform-provider-materialize/pkg/testhelpers"
+ "github.com/MaterializeInc/terraform-provider-materialize/pkg/utils"
+
+ "github.com/DATA-DOG/go-sqlmock"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
+ "github.com/stretchr/testify/require"
+)
+
+func TestSourceTableDatasource(t *testing.T) {
+ r := require.New(t)
+
+ in := map[string]interface{}{
+ "schema_name": "schema",
+ "database_name": "database",
+ }
+ d := schema.TestResourceDataRaw(t, SourceTable().Schema, in)
+ r.NotNil(d)
+
+ testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) {
+ p := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema'`
+ testhelpers.MockSourceTableScan(mock, p)
+
+ if err := sourceTableRead(context.TODO(), d, db); err != nil {
+ t.Fatal(err)
+ }
+
+ // Verify the results
+ tables := d.Get("tables").([]interface{})
+ r.Equal(1, len(tables))
+
+ table := tables[0].(map[string]interface{})
+ r.Equal("u1", table["id"])
+ r.Equal("table", table["name"])
+ r.Equal("schema", table["schema_name"])
+ r.Equal("database", table["database_name"])
+ r.Equal("KAFKA", table["source_type"])
+ r.Equal("table", table["upstream_name"])
+ r.Equal("schema", table["upstream_schema_name"])
+ r.Equal("comment", table["comment"])
+ r.Equal("materialize", table["owner_name"])
+
+ source := table["source"].([]interface{})[0].(map[string]interface{})
+ r.Equal("source", source["name"])
+ r.Equal("public", source["schema_name"])
+ r.Equal("materialize", source["database_name"])
+ })
+}
diff --git a/pkg/provider/acceptance_datasource_source_table_test.go b/pkg/provider/acceptance_datasource_source_table_test.go
new file mode 100644
index 00000000..43c48589
--- /dev/null
+++ b/pkg/provider/acceptance_datasource_source_table_test.go
@@ -0,0 +1,86 @@
+package provider
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
+ "github.com/hashicorp/terraform-plugin-testing/helper/resource"
+)
+
+func TestAccDataSourceSourceTable_basic(t *testing.T) {
+ nameSpace := acctest.RandomWithPrefix("tf_test")
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProviderFactories: testAccProviderFactories,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccDataSourceSourceTable(nameSpace),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.name", fmt.Sprintf("%s_table", nameSpace)),
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.schema_name", "public"),
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.database_name", "materialize"),
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.source.#", "1"),
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.source.0.name", fmt.Sprintf("%s_source", nameSpace)),
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.source.0.schema_name", "public"),
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.source.0.database_name", "materialize"),
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.source_type", "postgres"),
+ resource.TestCheckResourceAttr("data.materialize_source_table.test", "tables.0.comment", "test comment"),
+ resource.TestCheckResourceAttrSet("data.materialize_source_table.test", "tables.0.owner_name"),
+ ),
+ },
+ },
+ })
+}
+
+func testAccDataSourceSourceTable(nameSpace string) string {
+ return fmt.Sprintf(`
+resource "materialize_secret" "postgres_password" {
+ name = "%[1]s_secret_postgres"
+ value = "c2VjcmV0Cg=="
+}
+
+resource "materialize_connection_postgres" "postgres_connection" {
+ name = "%[1]s_connection_postgres"
+ host = "postgres"
+ port = 5432
+ user {
+ text = "postgres"
+ }
+ password {
+ name = materialize_secret.postgres_password.name
+ }
+ database = "postgres"
+}
+
+resource "materialize_source_postgres" "test" {
+ name = "%[1]s_source"
+ cluster_name = "quickstart"
+
+ postgres_connection {
+ name = materialize_connection_postgres.postgres_connection.name
+ }
+ publication = "mz_source"
+}
+
+resource "materialize_source_table_postgres" "test" {
+ name = "%[1]s_table"
+ schema_name = "public"
+ database_name = "materialize"
+ source {
+ name = materialize_source_postgres.test.name
+ schema_name = "public"
+ database_name = "materialize"
+ }
+ upstream_name = "table2"
+ upstream_schema_name = "public"
+ comment = "test comment"
+}
+
+data "materialize_source_table" "test" {
+ schema_name = "public"
+ database_name = "materialize"
+ depends_on = [materialize_source_table_postgres.test]
+}
+`, nameSpace)
+}
diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go
index 9bd02f62..021a71d0 100644
--- a/pkg/provider/provider.go
+++ b/pkg/provider/provider.go
@@ -167,6 +167,7 @@ func Provider(version string) *schema.Provider {
"materialize_scim_groups": datasources.SCIMGroups(),
"materialize_scim_configs": datasources.SCIMConfigs(),
"materialize_sso_config": datasources.SSOConfig(),
+ "materialize_source_table": datasources.SourceTable(),
"materialize_system_parameter": datasources.SystemParameter(),
"materialize_table": datasources.Table(),
"materialize_type": datasources.Type(),