diff --git a/api-doc.yaml b/api-doc.yaml index bd27199d..55782be9 100644 --- a/api-doc.yaml +++ b/api-doc.yaml @@ -530,15 +530,16 @@ paths: required: true schema: $ref: "#/components/schemas/IdentifierPattern" - - name: fetchTableSchema + - name: tableNames in: query description: | - When `false`, returns tables with `resource` set but does not load per-table column metadata (no Malloy `fetchTableSchema`). - Omitted or `null` is treated as `true`. Use `false` for fast explorer listing; call get-table with `fetchTableSchema=true` for details. + List of table names to filter results. When provided, only returns metadata + for the specified tables. When omitted, returns all tables in the schema. required: false schema: - type: boolean - default: true + type: array + items: + type: string responses: "200": description: A list of table names available in the specified schema @@ -758,6 +759,7 @@ paths: $ref: "#/components/responses/InternalServerError" "503": $ref: "#/components/responses/ServiceUnavailable" + /projects/{projectName}/connections/{connectionName}/sqlTemporaryTable: post: tags: @@ -855,59 +857,6 @@ paths: "503": $ref: "#/components/responses/ServiceUnavailable" - # TODO: Remove this endpoint. This is deprecated and replaced by /projects/{projectName}/connections/{connectionName}/table resource. - /projects/{projectName}/connections/{connectionName}/tableSource: - get: - tags: - - connections - operationId: get-tablesource - summary: Get table source information - deprecated: true - description: | - Retrieves information about a specific table or view from the database connection. - This includes table schema, column definitions, and metadata. The table can be specified - by either tableKey or tablePath parameters, depending on the database type. - parameters: - - name: projectName - in: path - description: Name of the project - required: true - schema: - $ref: "#/components/schemas/IdentifierPattern" - - name: connectionName - in: path - description: Name of the connection - required: true - schema: - $ref: "#/components/schemas/IdentifierPattern" - - name: tableKey - in: query - description: Table key - required: false - schema: - type: string - - name: tablePath - in: query - description: Table path - required: false - schema: - type: string - responses: - "200": - description: Table source information - content: - application/json: - schema: - $ref: "#/components/schemas/TableSource" - "401": - $ref: "#/components/responses/Unauthorized" - "404": - $ref: "#/components/responses/NotFound" - "500": - $ref: "#/components/responses/InternalServerError" - "503": - $ref: "#/components/responses/ServiceUnavailable" - # TODO: Remove this endpoint. /projects/{projectName}/connections/{connectionName}/queryData: get: diff --git a/packages/sdk/src/components/Project/ConnectionExplorer.tsx b/packages/sdk/src/components/Project/ConnectionExplorer.tsx index 0efe19d1..cd60d5d2 100644 --- a/packages/sdk/src/components/Project/ConnectionExplorer.tsx +++ b/packages/sdk/src/components/Project/ConnectionExplorer.tsx @@ -329,7 +329,6 @@ function TablesInSchema({ projectName, connectionName, schemaName, - false, ), }); diff --git a/packages/server/src/controller/connection.controller.ts b/packages/server/src/controller/connection.controller.ts index d16a3f05..bc13e551 100644 --- a/packages/server/src/controller/connection.controller.ts +++ b/packages/server/src/controller/connection.controller.ts @@ -1,4 +1,4 @@ -import { Connection, RunSQLOptions } from "@malloydata/malloy"; +import { Connection, RunSQLOptions, TableSourceDef } from "@malloydata/malloy"; import { PersistSQLResults } from "@malloydata/malloy/connection"; import { components } from "../api"; import { BadRequestError, ConnectionError } from "../errors"; @@ -6,16 +6,14 @@ import { logger } from "../logger"; import { testConnectionConfig } from "../service/connection"; import { ConnectionService } from "../service/connection_service"; import { - getConnectionTableSource, getSchemasForConnection, - getTablesForSchema, + listTablesForSchema, } from "../service/db_utils"; import { ProjectStore } from "../service/project_store"; type ApiConnection = components["schemas"]["Connection"]; type ApiConnectionStatus = components["schemas"]["ConnectionStatus"]; type ApiSqlSource = components["schemas"]["SqlSource"]; -type ApiTableSource = components["schemas"]["TableSource"]; type ApiTable = components["schemas"]["Table"]; type ApiQueryData = components["schemas"]["QueryData"]; type ApiTemporaryTable = components["schemas"]["TemporaryTable"]; @@ -29,28 +27,6 @@ const AZURE_DATA_EXTENSIONS = [ ".ndjson", ]; -/** - * `fetchTableSchema` query param: default true when omitted, null, or empty. - * Only explicit false/0 disables schema fetch. - */ -export function parseFetchTableSchemaQueryParam(raw: unknown): boolean { - if (raw === undefined || raw === null) { - return true; - } - const v = Array.isArray(raw) ? raw[0] : raw; - if (v === "" || v === undefined) { - return true; - } - if (typeof v === "boolean") { - return v; - } - const s = String(v).trim().toLowerCase(); - if (s === "false" || s === "0") { - return false; - } - return true; -} - /** * Validates an Azure URL against the three supported patterns: * 1. Single file: path/file.parquet @@ -144,6 +120,52 @@ export class ConnectionController { } } + /** + * Fetches a table's schema via the Malloy connection's fetchTableSchema, + * returning an ApiTable with columns and the raw source JSON. + */ + private async fetchTable( + malloyConnection: Connection, + tableKey: string, + tablePath: string, + ): Promise { + try { + const source = await ( + malloyConnection as Connection & { + fetchTableSchema: ( + tableKey: string, + tablePath: string, + ) => Promise; + } + ).fetchTableSchema(tableKey, tablePath); + if (!source) { + throw new ConnectionError(`Table ${tablePath} not found`); + } + + return { + source: JSON.stringify(source), + resource: tablePath, + columns: (source.fields || []).map((f) => ({ + name: f.name, + type: f.type, + })), + }; + } catch (error) { + const errorMessage = + error instanceof Error + ? error.message + : typeof error === "string" + ? error + : JSON.stringify(error); + logger.error("fetchTableSchema error", { + error, + tableKey, + tablePath, + }); + throw new ConnectionError(errorMessage); + } + } + public async getConnection( projectName: string, connectionName: string, @@ -183,7 +205,7 @@ export class ConnectionController { projectName: string, connectionName: string, schemaName: string, - fetchTableSchema = true, + tableNames?: string[], ): Promise { const project = await this.projectStore.getProject(projectName, false); const connection = project.getApiConnection(connectionName); @@ -192,11 +214,11 @@ export class ConnectionController { connectionName, ); - return getTablesForSchema( + return listTablesForSchema( connection, schemaName, malloyConnection, - fetchTableSchema, + tableNames, ); } @@ -231,20 +253,6 @@ export class ConnectionController { } } - public async getConnectionTableSource( - projectName: string, - connectionName: string, - tableKey: string, - tablePath: string, - ): Promise { - const malloyConnection = await this.getMalloyConnection( - projectName, - connectionName, - ); - - return getConnectionTableSource(malloyConnection, tableKey, tablePath); - } - public async getTable( projectName: string, connectionName: string, @@ -259,6 +267,8 @@ export class ConnectionController { const project = await this.projectStore.getProject(projectName, false); const connection = project.getApiConnection(connectionName); + // TODO: Move this database connection logic to the db_utils.ts file -- and + // ultimately into a connection-specific class. if (connection.type === "ducklake") { if (tablePath.split(".").length === 1) { // tablePath is just the table name, construct full path @@ -306,16 +316,12 @@ export class ConnectionController { ); const fullFileUrl = `${dirPath}${fileName}${queryString}`; - const tableSource = await getConnectionTableSource( + const table = await this.fetchTable( malloyConnection, fileName, fullFileUrl, ); - return { - resource: tablePath, - columns: tableSource.columns, - source: tableSource.source, - }; + return { ...table, resource: tablePath }; } } } @@ -325,17 +331,7 @@ export class ConnectionController { throw new Error(`Invalid tablePath: ${tablePath}`); } - const tableSource = await getConnectionTableSource( - malloyConnection, - tableKey, // tableKey is the table name - tablePath, - ); - - return { - resource: tablePath, - columns: tableSource.columns, - source: tableSource.source, - }; + return this.fetchTable(malloyConnection, tableKey, tablePath); } public async getConnectionQueryData( diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 48ac36b6..feacaefb 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -14,10 +14,7 @@ import { createProxyMiddleware } from "http-proxy-middleware"; import { AddressInfo } from "net"; import * as path from "path"; import { CompileController } from "./controller/compile.controller"; -import { - ConnectionController, - parseFetchTableSchemaQueryParam, -} from "./controller/connection.controller"; +import { ConnectionController } from "./controller/connection.controller"; import { DatabaseController } from "./controller/database.controller"; import { ModelController } from "./controller/model.controller"; import { PackageController } from "./controller/package.controller"; @@ -33,6 +30,14 @@ import { logger, loggerMiddleware } from "./logger"; import { initializeMcpServer } from "./mcp/server"; import { ProjectStore } from "./service/project_store"; + +/** Normalize an Express query param into a string[] or undefined. */ +export function normalizeQueryArray(value: unknown): string[] | undefined { + if (value === undefined || value === null) return undefined; + if (Array.isArray(value)) return value.map(String); + return [String(value)]; +} + // Parse command line arguments function parseArgs() { const args = process.argv.slice(2); @@ -470,7 +475,7 @@ app.get( req.params.projectName, req.params.connectionName, req.params.schemaName, - parseFetchTableSchemaQueryParam(req.query.fetchTableSchema), + normalizeQueryArray(req.query.tableNames), ); res.status(200).json(results); } catch (error) { @@ -542,26 +547,6 @@ app.post( }, ); -app.get( - `${API_PREFIX}/projects/:projectName/connections/:connectionName/tableSource`, - async (req, res) => { - try { - res.status(200).json( - await connectionController.getConnectionTableSource( - req.params.projectName, - req.params.connectionName, - req.query.tableKey as string, - req.query.tablePath as string, - ), - ); - } catch (error) { - logger.error(error); - const { json, status } = internalErrorToHttpError(error as Error); - res.status(status).json(json); - } - }, -); - /** * @deprecated Use /projects/:projectName/connections/:connectionName/queryData POST method instead */ diff --git a/packages/server/src/service/db_utils.spec.ts b/packages/server/src/service/db_utils.spec.ts new file mode 100644 index 00000000..86e1b857 --- /dev/null +++ b/packages/server/src/service/db_utils.spec.ts @@ -0,0 +1,712 @@ +import { describe, expect, it, mock } from "bun:test"; + +// Stub the missing optional dependency so db_utils.ts can be imported +mock.module("@azure/identity", () => ({ + ClientSecretCredential: class {}, +})); +mock.module("@azure/storage-blob", () => ({ + ContainerClient: class {}, +})); +mock.module("@google-cloud/bigquery", () => ({ + BigQuery: class {}, +})); + +import { Connection } from "@malloydata/malloy"; +import { normalizeQueryArray } from "../server"; +import { + extractErrorDataFromError, + getSchemasForConnection, + listTablesForSchema, + sqlInFilter, +} from "./db_utils"; +import { components } from "../api"; + +type ApiConnection = components["schemas"]["Connection"]; + +/** + * Minimal mock Connection whose runSQL captures the SQL string + * and returns configurable rows. + */ +function mockConnection(rows: unknown[] = []) { + let lastSQL = ""; + return { + get lastSQL() { + return lastSQL; + }, + conn: { + runSQL: async (sql: string) => { + lastSQL = sql; + return { rows }; + }, + } as unknown as Connection, + }; +} + +// --------------------------------------------------------------------------- +// sqlInFilter +// --------------------------------------------------------------------------- +describe("sqlInFilter", () => { + it("returns empty string for undefined", () => { + expect(sqlInFilter("col", undefined)).toBe(""); + }); + + it("returns empty string for empty array", () => { + expect(sqlInFilter("col", [])).toBe(""); + }); + + it("builds single-value IN clause", () => { + expect(sqlInFilter("TABLE_NAME", ["orders"])).toBe( + "AND TABLE_NAME IN ('orders')", + ); + }); + + it("builds multi-value IN clause", () => { + expect(sqlInFilter("t", ["a", "b", "c"])).toBe( + "AND t IN ('a', 'b', 'c')", + ); + }); + + it("escapes single quotes in values", () => { + expect(sqlInFilter("t", ["it's", "a'b"])).toBe( + "AND t IN ('it''s', 'a''b')", + ); + }); +}); + +// --------------------------------------------------------------------------- +// normalizeQueryArray +// --------------------------------------------------------------------------- +describe("normalizeQueryArray", () => { + it("returns undefined for undefined", () => { + expect(normalizeQueryArray(undefined)).toBeUndefined(); + }); + + it("returns undefined for null", () => { + expect(normalizeQueryArray(null)).toBeUndefined(); + }); + + it("wraps a single string in an array", () => { + expect(normalizeQueryArray("table1")).toEqual(["table1"]); + }); + + it("passes through an array of strings", () => { + expect(normalizeQueryArray(["a", "b"])).toEqual(["a", "b"]); + }); + + it("converts non-string array elements to strings", () => { + expect(normalizeQueryArray([1, true])).toEqual(["1", "true"]); + }); + + it("converts a numeric value to a string array", () => { + expect(normalizeQueryArray(42)).toEqual(["42"]); + }); +}); + +// --------------------------------------------------------------------------- +// listTablesForSchema – SQL generation & result grouping +// --------------------------------------------------------------------------- +describe("listTablesForSchema", () => { + const columnRows = [ + { TABLE_NAME: "orders", COLUMN_NAME: "id", DATA_TYPE: "INTEGER" }, + { TABLE_NAME: "orders", COLUMN_NAME: "total", DATA_TYPE: "DECIMAL" }, + { TABLE_NAME: "customers", COLUMN_NAME: "id", DATA_TYPE: "INTEGER" }, + { TABLE_NAME: "customers", COLUMN_NAME: "name", DATA_TYPE: "VARCHAR" }, + ]; + + describe("mysql", () => { + const conn: ApiConnection = { + name: "test", + type: "mysql", + mysqlConnection: { + host: "localhost", + port: 3306, + user: "root", + password: "", + database: "testdb", + }, + }; + + it("queries INFORMATION_SCHEMA.COLUMNS and groups into ApiTable[]", async () => { + const m = mockConnection(columnRows); + const tables = await listTablesForSchema(conn, "testdb", m.conn); + + expect(m.lastSQL).toContain("information_schema.columns"); + expect(m.lastSQL).toContain("table_schema = 'testdb'"); + expect(tables).toHaveLength(2); + expect(tables[0].resource).toBe("testdb.orders"); + expect(tables[0].columns).toEqual([ + { name: "id", type: "integer" }, + { name: "total", type: "decimal" }, + ]); + expect(tables[1].resource).toBe("testdb.customers"); + }); + + it("includes IN filter when tableNames provided", async () => { + const m = mockConnection(columnRows.slice(0, 2)); + await listTablesForSchema(conn, "testdb", m.conn, ["orders"]); + expect(m.lastSQL).toContain("AND TABLE_NAME IN ('orders')"); + }); + + it("omits IN filter when tableNames is undefined", async () => { + const m = mockConnection(columnRows); + await listTablesForSchema(conn, "testdb", m.conn); + expect(m.lastSQL).not.toContain("IN ("); + }); + }); + + describe("postgres", () => { + const conn: ApiConnection = { + name: "test", + type: "postgres", + postgresConnection: { + host: "localhost", + port: 5432, + userName: "postgres", + password: "", + databaseName: "testdb", + }, + }; + + it("queries information_schema.columns with correct schema", async () => { + const m = mockConnection(columnRows); + const tables = await listTablesForSchema(conn, "public", m.conn); + + expect(m.lastSQL).toContain("information_schema.columns"); + expect(m.lastSQL).toContain("table_schema = 'public'"); + expect(tables).toHaveLength(2); + expect(tables[0].resource).toBe("public.orders"); + }); + + it("includes IN filter when tableNames provided", async () => { + const m = mockConnection([]); + await listTablesForSchema(conn, "public", m.conn, ["orders"]); + expect(m.lastSQL).toContain("AND table_name IN ('orders')"); + }); + }); + + describe("snowflake", () => { + const conn: ApiConnection = { + name: "test", + type: "snowflake", + snowflakeConnection: { + account: "test_account", + username: "user", + password: "pass", + database: "MY_DB", + schema: "PUBLIC", + }, + }; + + it("queries DATABASE.INFORMATION_SCHEMA.COLUMNS", async () => { + const m = mockConnection(columnRows); + const tables = await listTablesForSchema(conn, "MY_DB.PUBLIC", m.conn); + + expect(m.lastSQL).toContain("MY_DB.INFORMATION_SCHEMA.COLUMNS"); + expect(m.lastSQL).toContain("TABLE_SCHEMA = 'PUBLIC'"); + expect(tables).toHaveLength(2); + expect(tables[0].resource).toBe("MY_DB.PUBLIC.orders"); + }); + + it("falls back to connection database when schema is unqualified", async () => { + const m = mockConnection(columnRows); + const tables = await listTablesForSchema(conn, "PUBLIC", m.conn); + + expect(m.lastSQL).toContain("MY_DB.INFORMATION_SCHEMA.COLUMNS"); + expect(tables[0].resource).toBe("MY_DB.PUBLIC.orders"); + }); + + it("includes IN filter when tableNames provided", async () => { + const m = mockConnection([]); + await listTablesForSchema(conn, "MY_DB.PUBLIC", m.conn, [ + "orders", + "customers", + ]); + expect(m.lastSQL).toContain( + "AND TABLE_NAME IN ('orders', 'customers')", + ); + }); + }); + + describe("trino", () => { + it("uses catalog-prefixed information_schema.columns", async () => { + const conn: ApiConnection = { + name: "test", + type: "trino", + trinoConnection: { + server: "localhost", + port: 8080, + catalog: "hive", + }, + }; + const m = mockConnection(columnRows); + const tables = await listTablesForSchema(conn, "default", m.conn); + + expect(m.lastSQL).toContain("hive.information_schema.columns"); + expect(m.lastSQL).toContain("table_schema = 'default'"); + expect(tables[0].resource).toBe("hive.default.orders"); + }); + + it("extracts catalog from schemaName when no explicit catalog", async () => { + const conn: ApiConnection = { + name: "test", + type: "trino", + trinoConnection: { server: "localhost", port: 8080 }, + }; + const m = mockConnection(columnRows); + const tables = await listTablesForSchema(conn, "hive.default", m.conn); + + expect(m.lastSQL).toContain("hive.information_schema.columns"); + expect(m.lastSQL).toContain("table_schema = 'default'"); + expect(tables[0].resource).toBe("hive.default.orders"); + }); + }); + + describe("duckdb", () => { + const conn: ApiConnection = { + name: "test", + type: "duckdb", + duckdbConnection: {}, + }; + + it("queries information_schema.columns with catalog and schema", async () => { + const rows = columnRows.map((r) => ({ + table_name: r.TABLE_NAME, + column_name: r.COLUMN_NAME, + data_type: r.DATA_TYPE, + })); + const m = mockConnection(rows); + const tables = await listTablesForSchema(conn, "memory.main", m.conn); + + expect(m.lastSQL).toContain("information_schema.columns"); + expect(m.lastSQL).toContain("table_schema = 'main'"); + expect(m.lastSQL).toContain("table_catalog = 'memory'"); + expect(tables).toHaveLength(2); + expect(tables[0].resource).toBe("memory.main.orders"); + }); + }); + + describe("motherduck", () => { + const conn: ApiConnection = { + name: "test", + type: "motherduck", + motherduckConnection: { accessToken: "fake" }, + }; + + it("queries information_schema.columns", async () => { + const m = mockConnection(columnRows); + const tables = await listTablesForSchema(conn, "main", m.conn); + + expect(m.lastSQL).toContain("information_schema.columns"); + expect(m.lastSQL).toContain("table_schema = 'main'"); + expect(tables).toHaveLength(2); + expect(tables[0].resource).toBe("main.orders"); + }); + }); + + describe("ducklake", () => { + const conn: ApiConnection = { + name: "test", + type: "ducklake", + ducklakeConnection: { + catalog: { + postgresConnection: { + host: "localhost", + port: 5432, + userName: "postgres", + password: "", + databaseName: "testdb", + }, + }, + storage: { bucketUrl: "s3://bucket" }, + }, + }; + + it("queries information_schema.columns with catalog and schema", async () => { + const m = mockConnection(columnRows); + const tables = await listTablesForSchema( + conn, + "mycat.myschema", + m.conn, + ); + + expect(m.lastSQL).toContain("information_schema.columns"); + expect(m.lastSQL).toContain("table_schema = 'myschema'"); + expect(m.lastSQL).toContain("table_catalog = 'mycat'"); + expect(tables[0].resource).toBe("mycat.myschema.orders"); + }); + }); + + describe("column grouping", () => { + it("lowercases data types", async () => { + const conn: ApiConnection = { + name: "test", + type: "mysql", + mysqlConnection: { + host: "localhost", + port: 3306, + user: "root", + password: "", + database: "testdb", + }, + }; + const m = mockConnection([ + { + TABLE_NAME: "t", + COLUMN_NAME: "col", + DATA_TYPE: "VARCHAR(255)", + }, + ]); + const tables = await listTablesForSchema(conn, "testdb", m.conn); + expect(tables[0]?.columns?.[0]?.type).toBe("varchar(255)"); + }); + + it("returns empty array when no rows", async () => { + const conn: ApiConnection = { + name: "test", + type: "postgres", + postgresConnection: { + host: "localhost", + port: 5432, + userName: "postgres", + password: "", + databaseName: "testdb", + }, + }; + const m = mockConnection([]); + const tables = await listTablesForSchema(conn, "public", m.conn); + expect(tables).toEqual([]); + }); + }); + + describe("error handling", () => { + it("throws for unsupported connection type", async () => { + const conn = { + name: "test", + type: "unsupported", + } as unknown as ApiConnection; + const m = mockConnection(); + await expect( + listTablesForSchema(conn, "schema", m.conn), + ).rejects.toThrow("Unsupported connection type"); + }); + + it("throws when duckdb schema is not qualified", async () => { + const conn: ApiConnection = { + name: "test", + type: "duckdb", + duckdbConnection: {}, + }; + const m = mockConnection(); + await expect( + listTablesForSchema(conn, "main", m.conn), + ).rejects.toThrow('must be qualified as "catalog.schema"'); + }); + + it("throws when snowflake schema is unqualified and no database configured", async () => { + const conn: ApiConnection = { + name: "test", + type: "snowflake", + snowflakeConnection: { + account: "test_account", + username: "user", + password: "pass", + }, + }; + const m = mockConnection(); + await expect( + listTablesForSchema(conn, "PUBLIC", m.conn), + ).rejects.toThrow("Cannot resolve database"); + }); + }); + + describe("ducklake schema prefixing", () => { + const conn: ApiConnection = { + name: "myconn", + type: "ducklake", + ducklakeConnection: { + catalog: { + postgresConnection: { + host: "localhost", + port: 5432, + userName: "postgres", + password: "", + databaseName: "testdb", + }, + }, + storage: { bucketUrl: "s3://bucket" }, + }, + }; + + it("prefixes bare schema name with connection name", async () => { + const m = mockConnection([]); + await listTablesForSchema(conn, "main", m.conn); + expect(m.lastSQL).toContain("table_catalog = 'myconn'"); + expect(m.lastSQL).toContain("table_schema = 'main'"); + }); + + it("uses provided catalog when schema is already qualified", async () => { + const m = mockConnection([]); + await listTablesForSchema(conn, "othercat.myschema", m.conn); + expect(m.lastSQL).toContain("table_catalog = 'othercat'"); + expect(m.lastSQL).toContain("table_schema = 'myschema'"); + }); + }); +}); + +// --------------------------------------------------------------------------- +// getSchemasForConnection – schema listing +// --------------------------------------------------------------------------- +describe("getSchemasForConnection", () => { + describe("postgres", () => { + const conn: ApiConnection = { + name: "test", + type: "postgres", + postgresConnection: { + host: "localhost", + port: 5432, + userName: "postgres", + password: "", + databaseName: "testdb", + }, + }; + + it("queries information_schema.schemata", async () => { + const rows = [ + { schema_name: "public" }, + { schema_name: "information_schema" }, + { schema_name: "pg_catalog" }, + { schema_name: "app" }, + ]; + const m = mockConnection(rows); + const schemas = await getSchemasForConnection(conn, m.conn); + + expect(m.lastSQL).toContain("information_schema.schemata"); + expect(schemas).toHaveLength(4); + expect(schemas.find((s) => s.name === "public")?.isDefault).toBe(true); + expect( + schemas.find((s) => s.name === "information_schema")?.isHidden, + ).toBe(true); + expect(schemas.find((s) => s.name === "pg_catalog")?.isHidden).toBe( + true, + ); + expect(schemas.find((s) => s.name === "app")?.isHidden).toBe(false); + }); + }); + + describe("mysql", () => { + it("returns a single schema from the connection database", async () => { + const conn: ApiConnection = { + name: "test", + type: "mysql", + mysqlConnection: { + host: "localhost", + port: 3306, + user: "root", + password: "", + database: "mydb", + }, + }; + const m = mockConnection(); + const schemas = await getSchemasForConnection(conn, m.conn); + + expect(schemas).toHaveLength(1); + expect(schemas[0].name).toBe("mydb"); + expect(schemas[0].isDefault).toBe(true); + }); + }); + + describe("snowflake", () => { + it("queries INFORMATION_SCHEMA.SCHEMATA with database filter", async () => { + const conn: ApiConnection = { + name: "test", + type: "snowflake", + snowflakeConnection: { + account: "test_account", + username: "user", + password: "pass", + database: "MY_DB", + schema: "PUBLIC", + }, + }; + const rows = [ + { + CATALOG_NAME: "MY_DB", + SCHEMA_NAME: "PUBLIC", + SCHEMA_OWNER: "SYSADMIN", + }, + { + CATALOG_NAME: "MY_DB", + SCHEMA_NAME: "INFORMATION_SCHEMA", + SCHEMA_OWNER: "", + }, + ]; + const m = mockConnection(rows); + const schemas = await getSchemasForConnection(conn, m.conn); + + expect(m.lastSQL).toContain("INFORMATION_SCHEMA.SCHEMATA"); + expect(m.lastSQL).toContain("CATALOG_NAME = 'MY_DB'"); + expect(schemas).toHaveLength(2); + expect(schemas.find((s) => s.name === "MY_DB.PUBLIC")?.isDefault).toBe( + true, + ); + expect( + schemas.find((s) => s.name === "MY_DB.INFORMATION_SCHEMA") + ?.isHidden, + ).toBe(true); + }); + }); + + describe("duckdb", () => { + it("queries information_schema.schemata and hides system schemas", async () => { + const conn: ApiConnection = { + name: "test", + type: "duckdb", + duckdbConnection: {}, + }; + const rows = [ + { catalog_name: "main", schema_name: "main" }, + { catalog_name: "main", schema_name: "information_schema" }, + { catalog_name: "system", schema_name: "main" }, + ]; + const m = mockConnection(rows); + const schemas = await getSchemasForConnection(conn, m.conn); + + expect(schemas).toHaveLength(3); + const mainMain = schemas.find((s) => s.name === "main.main"); + expect(mainMain?.isDefault).toBe(true); + expect(mainMain?.isHidden).toBe(false); + expect( + schemas.find((s) => s.name === "main.information_schema")?.isHidden, + ).toBe(true); + expect(schemas.find((s) => s.name === "system.main")?.isHidden).toBe( + true, + ); + }); + }); + + describe("motherduck", () => { + it("queries information_schema.schemata with optional database filter", async () => { + const conn: ApiConnection = { + name: "test", + type: "motherduck", + motherduckConnection: { accessToken: "fake", database: "mydb" }, + }; + const rows = [ + { schema_name: "main" }, + { schema_name: "information_schema" }, + ]; + const m = mockConnection(rows); + const schemas = await getSchemasForConnection(conn, m.conn); + + expect(m.lastSQL).toContain("catalog_name = 'mydb'"); + expect(schemas).toHaveLength(2); + expect(schemas.find((s) => s.name === "main")?.isDefault).toBe(true); + expect( + schemas.find((s) => s.name === "information_schema")?.isHidden, + ).toBe(true); + }); + }); + + describe("ducklake", () => { + it("queries information_schema.schemata filtered by connection name", async () => { + const conn: ApiConnection = { + name: "myconn", + type: "ducklake", + ducklakeConnection: { + catalog: { + postgresConnection: { + host: "localhost", + port: 5432, + userName: "postgres", + password: "", + databaseName: "testdb", + }, + }, + storage: { bucketUrl: "s3://bucket" }, + }, + }; + const rows = [ + { schema_name: "main" }, + { schema_name: "public" }, + { schema_name: "internal" }, + ]; + const m = mockConnection(rows); + const schemas = await getSchemasForConnection(conn, m.conn); + + expect(m.lastSQL).toContain("catalog_name = 'myconn'"); + expect(schemas).toHaveLength(3); + expect(schemas.find((s) => s.name === "main")?.isHidden).toBe(false); + expect(schemas.find((s) => s.name === "public")?.isHidden).toBe(false); + expect(schemas.find((s) => s.name === "internal")?.isHidden).toBe( + true, + ); + }); + }); + + describe("trino", () => { + it("queries catalog.information_schema.schemata when catalog is set", async () => { + const conn: ApiConnection = { + name: "test", + type: "trino", + trinoConnection: { + server: "localhost", + port: 8080, + catalog: "hive", + }, + }; + const rows = [ + { schema_name: "default" }, + { schema_name: "information_schema" }, + ]; + const m = mockConnection(rows); + const schemas = await getSchemasForConnection(conn, m.conn); + + expect(m.lastSQL).toContain("hive.information_schema.schemata"); + expect(schemas).toHaveLength(2); + expect(schemas.find((s) => s.name === "default")?.isHidden).toBe( + false, + ); + expect( + schemas.find((s) => s.name === "information_schema")?.isHidden, + ).toBe(true); + }); + }); + + it("throws for unsupported connection type", async () => { + const conn = { + name: "test", + type: "unsupported", + } as unknown as ApiConnection; + const m = mockConnection(); + await expect(getSchemasForConnection(conn, m.conn)).rejects.toThrow( + "Unsupported connection type", + ); + }); +}); + +// --------------------------------------------------------------------------- +// extractErrorDataFromError +// --------------------------------------------------------------------------- +describe("extractErrorDataFromError", () => { + it("extracts message from Error instance", () => { + const result = extractErrorDataFromError(new Error("boom")); + expect(result.error).toBe("boom"); + }); + + it("converts string errors", () => { + const result = extractErrorDataFromError("something went wrong"); + expect(result.error).toBe("something went wrong"); + }); + + it("converts non-string non-Error values", () => { + const result = extractErrorDataFromError(42); + expect(result.error).toBe("42"); + }); + + it("extracts task property when present", () => { + const err = Object.assign(new Error("fail"), { task: { id: 1 } }); + const result = extractErrorDataFromError(err); + expect(result.error).toBe("fail"); + expect(result.task).toEqual({ id: 1 }); + }); +}); diff --git a/packages/server/src/service/db_utils.ts b/packages/server/src/service/db_utils.ts index 63d753e6..11b0d616 100644 --- a/packages/server/src/service/db_utils.ts +++ b/packages/server/src/service/db_utils.ts @@ -3,7 +3,6 @@ import { ContainerClient } from "@azure/storage-blob"; import { BigQuery } from "@google-cloud/bigquery"; import { Connection, TableSourceDef } from "@malloydata/malloy"; import { components } from "../api"; -import { ConnectionError } from "../errors"; import { logger } from "../logger"; import { CloudStorageCredentials, @@ -18,9 +17,43 @@ import { ApiConnection } from "./model"; type ApiSchema = components["schemas"]["Schema"]; type ApiTable = components["schemas"]["Table"]; -type ApiTableSource = components["schemas"]["TableSource"]; type ApiAzureConnection = components["schemas"]["AzureConnection"]; +/** + * Build a SQL `AND column IN (...)` fragment for optional table-name filtering. + * Returns an empty string when `values` is undefined or empty. + */ +export function sqlInFilter(columnName: string, values?: string[]): string { + if (!values || values.length === 0) return ""; + const escaped = values.map((v) => `'${v.replace(/'/g, "''")}'`); + return `AND ${columnName} IN (${escaped.join(", ")})`; +} + +/** + * Group INFORMATION_SCHEMA.COLUMNS rows into ApiTable objects. + * Handles both upper-case (Snowflake) and lower-case (Postgres/DuckDB) column names. + */ +function groupColumnRowsIntoTables( + rows: unknown[], + buildResource: (tableName: string) => string, +): ApiTable[] { + const tableMap = new Map(); + for (const row of rows) { + const r = row as Record; + const tableName = String(r.TABLE_NAME ?? r.table_name ?? ""); + const columnName = String(r.COLUMN_NAME ?? r.column_name ?? ""); + const dataType = String(r.DATA_TYPE ?? r.data_type ?? "").toLowerCase(); + if (!tableName) continue; + if (!tableMap.has(tableName)) tableMap.set(tableName, []); + tableMap.get(tableName)!.push({ name: columnName, type: dataType }); + } + const tables: ApiTable[] = []; + for (const [tableName, columns] of tableMap) { + tables.push({ resource: buildResource(tableName), columns }); + } + return tables; +} + function createBigQueryClient(connection: ApiConnection): BigQuery { if (!connection.bigqueryConnection) { throw new Error("BigQuery connection is required"); @@ -36,26 +69,25 @@ function createBigQueryClient(connection: ApiConnection): BigQuery { // Add service account key if provided if (connection.bigqueryConnection.serviceAccountKeyJson) { + let credentials: Record; try { - const credentials = JSON.parse( + credentials = JSON.parse( connection.bigqueryConnection.serviceAccountKeyJson, ); - config.credentials = credentials; + } catch (parseError) { + throw new Error( + `Failed to parse BigQuery service account key JSON: ${(parseError as Error).message}`, + ); + } + config.credentials = credentials; - // Use project_id from credentials if defaultProjectId is not set - if (!config.projectId && credentials.project_id) { - config.projectId = credentials.project_id; - } + if (!config.projectId && credentials.project_id) { + config.projectId = credentials.project_id as string; + } - if (!config.projectId) { - throw new Error( - "BigQuery project ID is required. Either set the defaultProjectId in the connection configuration or the project_id in the service account key JSON.", - ); - } - } catch (error) { - logger.warn( - "Failed to parse service account key JSON, using default credentials", - { error }, + if (!config.projectId) { + throw new Error( + "BigQuery project ID is required. Either set the defaultProjectId in the connection configuration or the project_id in the service account key JSON.", ); } } else if ( @@ -103,358 +135,410 @@ function getCloudCredentialsFromAttachedDatabases( return null; } -export async function getSchemasForConnection( +async function getSchemasForBigQuery( + connection: ApiConnection, +): Promise { + if (!connection.bigqueryConnection) { + throw new Error("BigQuery connection is required"); + } + try { + const bigquery = createBigQueryClient(connection); + const [datasets] = await bigquery.getDatasets(); + + return await Promise.all( + datasets.map(async (dataset) => { + const [metadata] = await dataset.getMetadata(); + return { + name: dataset.id, + isHidden: false, + isDefault: false, + description: (metadata as { description?: string })?.description, + }; + }), + ); + } catch (error) { + logger.error( + `Error getting schemas for BigQuery connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get schemas for BigQuery connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function getSchemasForPostgres( connection: ApiConnection, malloyConnection: Connection, ): Promise { - if (connection.type === "bigquery") { - if (!connection.bigqueryConnection) { - throw new Error("BigQuery connection is required"); - } - try { - const bigquery = createBigQueryClient(connection); - const [datasets] = await bigquery.getDatasets(); - - const schemas = await Promise.all( - datasets.map(async (dataset) => { - const [metadata] = await dataset.getMetadata(); - return { - name: dataset.id, - isHidden: false, - isDefault: false, - // Include description from dataset metadata if available - description: (metadata as { description?: string }) - ?.description, - }; - }), - ); - return schemas; - } catch (error) { - console.error( - `Error getting schemas for BigQuery connection ${connection.name}:`, - error, - ); - throw new Error( - `Failed to get schemas for BigQuery connection ${connection.name}: ${(error as Error).message}`, + if (!connection.postgresConnection) { + throw new Error("Postgres connection is required"); + } + try { + const result = await malloyConnection.runSQL( + "SELECT schema_name FROM information_schema.schemata ORDER BY schema_name", + ); + const rows = standardizeRunSQLResult(result); + return rows.map((row: unknown) => { + const typedRow = row as Record; + const schemaName = String( + typedRow.schema_name ?? typedRow.SCHEMA_NAME ?? "", ); + return { + name: schemaName, + isHidden: ["information_schema", "pg_catalog", "pg_toast"].includes( + schemaName, + ), + isDefault: schemaName === "public", + }; + }); + } catch (error) { + logger.error( + `Error getting schemas for Postgres connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get schemas for Postgres connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function getSchemasForMySQL( + connection: ApiConnection, +): Promise { + if (!connection.mysqlConnection) { + throw new Error("Mysql connection is required"); + } + return [ + { + name: connection.mysqlConnection.database || "mysql", + isHidden: false, + isDefault: true, + }, + ]; +} + +async function getSchemasForSnowflake( + connection: ApiConnection, + malloyConnection: Connection, +): Promise { + if (!connection.snowflakeConnection) { + throw new Error("Snowflake connection is required"); + } + try { + const database = connection.snowflakeConnection.database; + const schema = connection.snowflakeConnection.schema; + + const filters: string[] = []; + if (database) { + filters.push(`CATALOG_NAME = '${database}'`); } - } else if (connection.type === "postgres") { - if (!connection.postgresConnection) { - throw new Error("Postgres connection is required"); + if (schema) { + filters.push(`SCHEMA_NAME = '${schema}'`); } - try { - // Use the connection's runSQL method to query schemas - const result = await malloyConnection.runSQL( - "SELECT schema_name as row FROM information_schema.schemata ORDER BY schema_name", - ); + const whereClause = + filters.length > 0 ? `WHERE ${filters.join(" AND ")}` : ""; - const rows = standardizeRunSQLResult(result); - return rows.map((row: unknown) => { - const schemaName = row as string; - return { - name: schemaName, - isHidden: [ - "information_schema", - "pg_catalog", - "pg_toast", - ].includes(schemaName), - isDefault: schemaName === "public", - }; - }); - } catch (error) { - console.error( - `Error getting schemas for Postgres connection ${connection.name}:`, - error, + const result = await malloyConnection.runSQL( + `SELECT CATALOG_NAME, SCHEMA_NAME, SCHEMA_OWNER FROM ${database ? `${database}.` : ""}INFORMATION_SCHEMA.SCHEMATA ${whereClause} ORDER BY SCHEMA_NAME`, + ); + const rows = standardizeRunSQLResult(result); + return rows.map((row: unknown) => { + const typedRow = row as Record; + const catalogName = String( + typedRow.CATALOG_NAME ?? typedRow.catalog_name ?? "", ); - throw new Error( - `Failed to get schemas for Postgres connection ${connection.name}: ${(error as Error).message}`, + const schemaName = String( + typedRow.SCHEMA_NAME ?? typedRow.schema_name ?? "", ); - } - } else if (connection.type === "mysql") { - if (!connection.mysqlConnection) { - throw new Error("Mysql connection is required"); - } - try { - // For MySQL, return the database name as the schema - return [ - { - name: connection.mysqlConnection.database || "mysql", - isHidden: false, - isDefault: true, - }, - ]; - } catch (error) { - console.error( - `Error getting schemas for MySQL connection ${connection.name}:`, - error, + const owner = String( + typedRow.SCHEMA_OWNER ?? typedRow.schema_owner ?? "", ); - throw new Error( - `Failed to get schemas for MySQL connection ${connection.name}: ${(error as Error).message}`, + return { + name: `${catalogName}.${schemaName}`, + isHidden: + ["SNOWFLAKE", ""].includes(owner) || + schemaName === "INFORMATION_SCHEMA", + isDefault: schema ? schemaName === schema : false, + }; + }); + } catch (error) { + logger.error( + `Error getting schemas for Snowflake connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get schemas for Snowflake connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function getSchemasForTrino( + connection: ApiConnection, + malloyConnection: Connection, +): Promise { + if (!connection.trinoConnection) { + throw new Error("Trino connection is required"); + } + try { + const configuredSchema = connection.trinoConnection.schema; + let allRows: { catalog: string; schema: string }[] = []; + + if (connection.trinoConnection.catalog) { + const catalog = connection.trinoConnection.catalog; + const result = await malloyConnection.runSQL( + `SELECT schema_name FROM ${catalog}.information_schema.schemata ORDER BY schema_name`, ); - } - } else if (connection.type === "snowflake") { - if (!connection.snowflakeConnection) { - throw new Error("Snowflake connection is required"); - } - try { - // Use the connection's runSQL method to query schemas - const result = await malloyConnection.runSQL("SHOW SCHEMAS"); const rows = standardizeRunSQLResult(result); - return rows.map((row: unknown) => { - const typedRow = row as Record; - const databaseName = String( - typedRow.database_name ?? typedRow.DATABASE_NAME ?? "", - ); - const name = String(typedRow.name ?? typedRow.NAME ?? ""); - const owner = String(typedRow.owner ?? typedRow.OWNER ?? ""); - const isDefaultVal = - typedRow.is_default ?? typedRow.isDefault ?? typedRow.IS_DEFAULT; + allRows = rows.map((row: unknown) => { + const r = row as Record; return { - name: `${databaseName}.${name}`, - isHidden: ["SNOWFLAKE", ""].includes(owner), - isDefault: isDefaultVal === "Y", + catalog, + schema: String(r.schema_name ?? r.Schema ?? ""), }; }); - } catch (error) { - console.error( - `Error getting schemas for Snowflake connection ${connection.name}:`, - error, - ); - throw new Error( - `Failed to get schemas for Snowflake connection ${connection.name}: ${(error as Error).message}`, + } else { + const catalogsResult = await malloyConnection.runSQL(`SHOW CATALOGS`); + const catalogNames = standardizeRunSQLResult(catalogsResult).map( + (row: unknown) => { + const r = row as Record; + return String(r.Catalog ?? r.catalog ?? ""); + }, ); - } - } else if (connection.type === "trino") { - if (!connection.trinoConnection) { - throw new Error("Trino connection is required"); - } - try { - let result: unknown; - // Use the connection's runSQL method to query schemas - if (connection.trinoConnection.catalog) { - result = await malloyConnection.runSQL( - `SHOW SCHEMAS FROM ${connection.trinoConnection.catalog}`, - ); - } else { - const catalogs = await malloyConnection.runSQL(`SHOW CATALOGS`); - console.log("catalogs", catalogs); - let catalogNames = standardizeRunSQLResult(catalogs); - catalogNames = catalogNames.map((catalog: unknown) => { - const typedCatalog = catalog as Record; - return typedCatalog.Catalog as string; - }); - const schemas: unknown[] = []; - - console.log("catalogNames", catalogNames); - for (const catalog of catalogNames) { - const schemasResult = await malloyConnection.runSQL( - `SHOW SCHEMAS FROM ${catalog}`, + for (const catalog of catalogNames) { + try { + const result = await malloyConnection.runSQL( + `SELECT schema_name FROM ${catalog}.information_schema.schemata ORDER BY schema_name`, ); - const schemasResultRows = standardizeRunSQLResult(schemasResult); - console.log("schemasResultRows", schemasResultRows); - - // Concat catalog name to schema name for each schema row - const schemasWithCatalog = schemasResultRows.map( - (row: unknown) => { - const typedRow = row as Record; - // For display, use the convention "catalog.schema" - return { - ...typedRow, - Schema: `${catalog}.${typedRow.Schema ?? typedRow.schema ?? ""}`, - }; - }, + const rows = standardizeRunSQLResult(result); + for (const row of rows) { + const r = row as Record; + allRows.push({ + catalog, + schema: String(r.schema_name ?? r.Schema ?? ""), + }); + } + } catch (catalogError) { + logger.warn( + `Failed to list schemas for Trino catalog ${catalog}`, + { error: catalogError }, ); - schemas.push(...schemasWithCatalog); - console.log("schemas", schemas); } - result = schemas; } - - const rows = standardizeRunSQLResult(result); - return rows.map((row: unknown) => { - const typedRow = row as Record; - return { - name: typedRow.Schema as string, - isHidden: ["information_schema", "performance_schema"].includes( - typedRow.Schema as string, - ), - isDefault: - typedRow.Schema === connection.trinoConnection?.schema, - }; - }); - } catch (error) { - console.error( - `Error getting schemas for Trino connection ${connection.name}:`, - error, - ); - throw new Error( - `Failed to get schemas for Trino connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else if (connection.type === "duckdb") { - if (!connection.duckdbConnection) { - throw new Error("DuckDB connection is required"); } - try { - // Use DuckDB's INFORMATION_SCHEMA.SCHEMATA to list schemas - // Use DISTINCT to avoid duplicates from attached databases - const result = await malloyConnection.runSQL( - "SELECT DISTINCT schema_name,catalog_name FROM information_schema.schemata ORDER BY catalog_name,schema_name", - { rowLimit: 1000 }, - ); - const rows = standardizeRunSQLResult(result); + return allRows.map(({ catalog, schema }) => { + const name = connection.trinoConnection?.catalog + ? schema + : `${catalog}.${schema}`; + return { + name, + isHidden: ["information_schema", "performance_schema"].includes( + schema, + ), + isDefault: configuredSchema ? schema === configuredSchema : false, + }; + }); + } catch (error) { + logger.error( + `Error getting schemas for Trino connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get schemas for Trino connection ${connection.name}: ${(error as Error).message}`, + ); + } +} - const schemas: ApiSchema[] = rows.map((row: unknown) => { - const typedRow = row as Record; - const schemaName = typedRow.schema_name as string; - const catalogName = typedRow.catalog_name as string; +async function getSchemasForDuckDB( + connection: ApiConnection, + malloyConnection: Connection, +): Promise { + if (!connection.duckdbConnection) { + throw new Error("DuckDB connection is required"); + } + try { + const result = await malloyConnection.runSQL( + "SELECT DISTINCT schema_name,catalog_name FROM information_schema.schemata ORDER BY catalog_name,schema_name", + { rowLimit: 1000 }, + ); - return { - name: `${catalogName}.${schemaName}`, - isHidden: - [ - "information_schema", - "performance_schema", - "", - "SNOWFLAKE", - "information_schema", - "pg_catalog", - "pg_toast", - ].includes(schemaName as string) || - ["md_information_schema", "system"].includes( - catalogName as string, - ), - isDefault: catalogName === "main", - }; - }); + const rows = standardizeRunSQLResult(result); - const attachedDatabases = - connection.duckdbConnection.attachedDatabases || []; + const schemas: ApiSchema[] = rows.map((row: unknown) => { + const typedRow = row as Record; + const schemaName = String(typedRow.schema_name ?? ""); + const catalogName = String(typedRow.catalog_name ?? ""); - // Process all cloud storage connections in parallel - const cloudDatabases = attachedDatabases.filter( - (attachedDb) => - (attachedDb.type === "gcs" || attachedDb.type === "s3") && - (attachedDb.gcsConnection || attachedDb.s3Connection), - ); + return { + name: `${catalogName}.${schemaName}`, + isHidden: + [ + "information_schema", + "performance_schema", + "pg_catalog", + "pg_toast", + "", + ].includes(schemaName) || + ["md_information_schema", "system"].includes(catalogName), + isDefault: catalogName === "main", + }; + }); - const cloudDbPromises = cloudDatabases.map(async (attachedDb) => { - const dbType = attachedDb.type as "gcs" | "s3"; - const credentials = - dbType === "gcs" - ? gcsConnectionToCredentials(attachedDb.gcsConnection!) - : s3ConnectionToCredentials(attachedDb.s3Connection!); + const attachedDatabases = + connection.duckdbConnection.attachedDatabases || []; - try { - return await listCloudDirectorySchemas(credentials); - } catch (cloudError) { - logger.warn( - `Failed to list ${dbType.toUpperCase()} directory schemas for ${attachedDb.name}`, - { error: cloudError }, - ); - return []; - } - }); + const cloudDatabases = attachedDatabases.filter( + (attachedDb) => + (attachedDb.type === "gcs" || attachedDb.type === "s3") && + (attachedDb.gcsConnection || attachedDb.s3Connection), + ); - const cloudSchemaArrays = await Promise.all(cloudDbPromises); - for (const cloudSchemas of cloudSchemaArrays) { - schemas.push(...cloudSchemas); - } + const cloudDbPromises = cloudDatabases.map(async (attachedDb) => { + const dbType = attachedDb.type as "gcs" | "s3"; + const credentials = + dbType === "gcs" + ? gcsConnectionToCredentials(attachedDb.gcsConnection!) + : s3ConnectionToCredentials(attachedDb.s3Connection!); - // Add Azure ADLS attached databases as schemas (by name) - const azureDatabases = attachedDatabases.filter( - (attachedDb) => - attachedDb.type === "azure" && attachedDb.azureConnection, - ); - for (const attachedDb of azureDatabases) { - if (attachedDb.name) { - schemas.push({ - name: attachedDb.name, - isHidden: false, - isDefault: false, - }); - } + try { + return await listCloudDirectorySchemas(credentials); + } catch (cloudError) { + logger.warn( + `Failed to list ${dbType.toUpperCase()} directory schemas for ${attachedDb.name}`, + { error: cloudError }, + ); + return []; } + }); - return schemas; - } catch (error) { - console.error( - `Error getting schemas for DuckDB connection ${connection.name}:`, - error, - ); - throw new Error( - `Failed to get schemas for DuckDB connection ${connection.name}: ${(error as Error).message}`, - ); + const cloudSchemaArrays = await Promise.all(cloudDbPromises); + for (const cloudSchemas of cloudSchemaArrays) { + schemas.push(...cloudSchemas); } - } else if (connection.type === "motherduck") { - if (!connection.motherduckConnection) { - throw new Error("MotherDuck connection is required"); - } - try { - // Use MotherDuck's INFORMATION_SCHEMA.SCHEMATA to list schemas - const result = await malloyConnection.runSQL( - "SELECT DISTINCT schema_name as row FROM information_schema.schemata ORDER BY schema_name", - { rowLimit: 1000 }, - ); - const rows = standardizeRunSQLResult(result); - console.log(rows); - return rows.map((row: unknown) => { - const typedRow = row as { row: string }; - return { - name: typedRow.row, - isHidden: [ - "information_schema", - "performance_schema", - "", - ].includes(typedRow.row), + + const azureDatabases = attachedDatabases.filter( + (attachedDb) => + attachedDb.type === "azure" && attachedDb.azureConnection, + ); + for (const attachedDb of azureDatabases) { + if (attachedDb.name) { + schemas.push({ + name: attachedDb.name, + isHidden: false, isDefault: false, - }; - }); - } catch (error) { - console.error( - `Error getting schemas for MotherDuck connection ${connection.name}:`, - error, - ); - throw new Error( - `Failed to get schemas for MotherDuck connection ${connection.name}: ${(error as Error).message}`, - ); + }); + } } - } else if (connection.type === "ducklake") { - try { - // Filter by catalog_name to only get schemas from the attached DuckLake catalog - // The catalog is attached with the connection name (see attachDuckLake in connection.ts) - const catalogName = connection.name; - const result = await malloyConnection.runSQL( - `SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '${catalogName}' ORDER BY schema_name`, - { rowLimit: 1000 }, + + return schemas; + } catch (error) { + logger.error( + `Error getting schemas for DuckDB connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get schemas for DuckDB connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function getSchemasForMotherDuck( + connection: ApiConnection, + malloyConnection: Connection, +): Promise { + if (!connection.motherduckConnection) { + throw new Error("MotherDuck connection is required"); + } + try { + const database = connection.motherduckConnection.database; + const whereClause = database ? `WHERE catalog_name = '${database}'` : ""; + const result = await malloyConnection.runSQL( + `SELECT DISTINCT schema_name FROM information_schema.schemata ${whereClause} ORDER BY schema_name`, + ); + const rows = standardizeRunSQLResult(result); + return rows.map((row: unknown) => { + const typedRow = row as Record; + const schemaName = String( + typedRow.schema_name ?? typedRow.SCHEMA_NAME ?? "", ); - const rows = standardizeRunSQLResult(result); + return { + name: schemaName, + isHidden: ["information_schema", "performance_schema", ""].includes( + schemaName, + ), + isDefault: schemaName === "main", + }; + }); + } catch (error) { + logger.error( + `Error getting schemas for MotherDuck connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get schemas for MotherDuck connection ${connection.name}: ${(error as Error).message}`, + ); + } +} - return rows.map((row: unknown) => { - const typedRow = row as Record; - const schemaName = typedRow.schema_name as string; +async function getSchemasForDuckLake( + connection: ApiConnection, + malloyConnection: Connection, +): Promise { + try { + // The catalog is attached with the connection name (see attachDuckLake in connection.ts) + const catalogName = connection.name; + const result = await malloyConnection.runSQL( + `SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '${catalogName}' ORDER BY schema_name`, + { rowLimit: 1000 }, + ); + const rows = standardizeRunSQLResult(result); - const shouldShow = schemaName === "main" || schemaName === "public"; + return rows.map((row: unknown) => { + const typedRow = row as Record; + const schemaName = typedRow.schema_name as string; + const shouldShow = schemaName === "main" || schemaName === "public"; + return { + name: schemaName, + isHidden: !shouldShow, + isDefault: false, + }; + }); + } catch (error) { + logger.error( + `Error getting schemas for DuckLake connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get schemas for DuckLake connection ${connection.name}: ${(error as Error).message}`, + ); + } +} - return { - name: schemaName, - isHidden: !shouldShow, - isDefault: false, - }; - }); - } catch (error) { - logger.error( - `Error getting schemas for DuckLake connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get schemas for DuckLake connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else { - throw new Error(`Unsupported connection type: ${connection.type}`); +export async function getSchemasForConnection( + connection: ApiConnection, + malloyConnection: Connection, +): Promise { + switch (connection.type) { + case "bigquery": + return getSchemasForBigQuery(connection); + case "postgres": + return getSchemasForPostgres(connection, malloyConnection); + case "mysql": + return getSchemasForMySQL(connection); + case "snowflake": + return getSchemasForSnowflake(connection, malloyConnection); + case "trino": + return getSchemasForTrino(connection, malloyConnection); + case "duckdb": + return getSchemasForDuckDB(connection, malloyConnection); + case "motherduck": + return getSchemasForMotherDuck(connection, malloyConnection); + case "ducklake": + return getSchemasForDuckLake(connection, malloyConnection); + default: + throw new Error(`Unsupported connection type: ${connection.type}`); } } @@ -695,481 +779,428 @@ async function describeAzureFile( } } -export async function getTablesForSchema( +export async function listTablesForSchema( connection: ApiConnection, schemaName: string, malloyConnection: Connection, - fetchTableSchema = true, + tableNames?: string[], ): Promise { - // Check if schemaName matches an Azure attached database name - if (connection.type === "duckdb") { - const attachedDbs = connection.duckdbConnection?.attachedDatabases || []; - const azureDb = attachedDbs.find( - (db) => - db.type === "azure" && db.name === schemaName && db.azureConnection, - ); - if (azureDb) { - const azureConn = azureDb.azureConnection!; - const fileUrl = - azureConn.authType === "sas_token" - ? azureConn.sasUrl - : azureConn.fileUrl; - if (fileUrl) { - return await describeAzureFile( - malloyConnection, - fileUrl, - azureConn, - ); - } - } + switch (connection.type) { + case "bigquery": + return listTablesForBigQuery( + connection, + schemaName, + malloyConnection, + tableNames, + ); + case "mysql": + return listTablesForMySQL( + connection, + schemaName, + malloyConnection, + tableNames, + ); + case "postgres": + return listTablesForPostgres( + connection, + schemaName, + malloyConnection, + tableNames, + ); + case "snowflake": + return listTablesForSnowflake( + connection, + schemaName, + malloyConnection, + tableNames, + ); + case "trino": + return listTablesForTrino( + connection, + schemaName, + malloyConnection, + tableNames, + ); + case "duckdb": + return listTablesForDuckDB( + connection, + schemaName, + malloyConnection, + tableNames, + ); + case "motherduck": + return listTablesForMotherDuck( + connection, + schemaName, + malloyConnection, + tableNames, + ); + case "ducklake": + return listTablesForDuckLake( + connection, + schemaName, + malloyConnection, + tableNames, + ); + default: + throw new Error(`Unsupported connection type: ${connection.type}`); } +} - // Check if this is an Azure ADLS file path (abfss:// or HTTPS SAS URL) +/** + * BigQuery: list tables via API client, then fetch each table's schema + * individually since BigQuery's INFORMATION_SCHEMA is region-scoped. + */ +async function listTablesForBigQuery( + connection: ApiConnection, + schemaName: string, + malloyConnection: Connection, + tableNames?: string[], +): Promise { + try { + const bigquery = createBigQueryClient(connection); + const dataset = bigquery.dataset(schemaName); + const [tables] = await dataset.getTables(); + + let names = tables + .map((table) => table.id) + .filter((id): id is string => id !== undefined); + if (tableNames) { + const allowed = new Set(tableNames); + names = names.filter((id) => allowed.has(id)); + } + + const results = await Promise.all( + names.map(async (tableName) => { + const tablePath = `${schemaName}.${tableName}`; + try { + const source = await ( + malloyConnection as Connection & { + fetchTableSchema: ( + tableKey: string, + tablePath: string, + ) => Promise; + } + ).fetchTableSchema(tableName, tablePath); + const columns = + source?.fields?.map((field) => ({ + name: field.name, + type: field.type, + })) || []; + return { resource: tablePath, columns }; + } catch (error) { + logger.warn(`Failed to get schema for table ${tableName}`, { + error: extractErrorDataFromError(error), + schemaName, + tableName, + }); + return { resource: tablePath, columns: [] }; + } + }), + ); + return results; + } catch (error) { + logger.error( + `Error getting tables for BigQuery schema ${schemaName} in connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get tables for BigQuery schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function listTablesForMySQL( + connection: ApiConnection, + schemaName: string, + malloyConnection: Connection, + tableNames?: string[], +): Promise { + if (!connection.mysqlConnection) { + throw new Error("Mysql connection is required"); + } + try { + const result = await malloyConnection.runSQL( + `SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE FROM information_schema.columns WHERE table_schema = '${schemaName}' ${sqlInFilter("TABLE_NAME", tableNames)} ORDER BY TABLE_NAME, ORDINAL_POSITION`, + ); + const rows = standardizeRunSQLResult(result); + return groupColumnRowsIntoTables(rows, (t) => `${schemaName}.${t}`); + } catch (error) { + logger.error( + `Error getting tables for MySQL schema ${schemaName} in connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get tables for MySQL schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function listTablesForPostgres( + connection: ApiConnection, + schemaName: string, + malloyConnection: Connection, + tableNames?: string[], +): Promise { + if (!connection.postgresConnection) { + throw new Error("Postgres connection is required"); + } + try { + const result = await malloyConnection.runSQL( + `SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = '${schemaName}' ${sqlInFilter("table_name", tableNames)} ORDER BY table_name, ordinal_position`, + ); + const rows = standardizeRunSQLResult(result); + return groupColumnRowsIntoTables(rows, (t) => `${schemaName}.${t}`); + } catch (error) { + logger.error( + `Error getting tables for Postgres schema ${schemaName} in connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get tables for Postgres schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function listTablesForSnowflake( + connection: ApiConnection, + schemaName: string, + malloyConnection: Connection, + tableNames?: string[], +): Promise { + if (!connection.snowflakeConnection) { + throw new Error("Snowflake connection is required"); + } + try { + const parts = schemaName.split("."); + let databaseName: string; + let schemaOnly: string; + + if (parts.length >= 2) { + databaseName = parts[0]; + schemaOnly = parts[1]; + } else { + databaseName = connection.snowflakeConnection.database ?? ""; + schemaOnly = parts[0]; + } + + if (!databaseName) { + throw new Error( + `Cannot resolve database for schema "${schemaName}": provide DATABASE.SCHEMA or configure a database on the connection`, + ); + } + + const qualifiedSchema = `${databaseName}.${schemaOnly}`; + const result = await malloyConnection.runSQL( + `SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE FROM ${databaseName}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '${schemaOnly}' ${sqlInFilter("TABLE_NAME", tableNames)} ORDER BY TABLE_NAME, ORDINAL_POSITION`, + ); + const rows = standardizeRunSQLResult(result); + return groupColumnRowsIntoTables(rows, (t) => `${qualifiedSchema}.${t}`); + } catch (error) { + logger.error( + `Error getting tables for Snowflake schema ${schemaName} in connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get tables for Snowflake schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function listTablesForTrino( + connection: ApiConnection, + schemaName: string, + malloyConnection: Connection, + tableNames?: string[], +): Promise { + if (!connection.trinoConnection) { + throw new Error("Trino connection is required"); + } + try { + let catalogPrefix: string; + let schemaOnly: string; + let resourcePrefix: string; + + if (connection.trinoConnection.catalog) { + catalogPrefix = `${connection.trinoConnection.catalog}.`; + schemaOnly = schemaName; + resourcePrefix = `${connection.trinoConnection.catalog}.${schemaName}`; + } else { + const dotIdx = schemaName.indexOf("."); + if (dotIdx > 0) { + catalogPrefix = `${schemaName.substring(0, dotIdx)}.`; + schemaOnly = schemaName.substring(dotIdx + 1); + } else { + catalogPrefix = ""; + schemaOnly = schemaName; + } + resourcePrefix = schemaName; + } + + const result = await malloyConnection.runSQL( + `SELECT table_name, column_name, data_type FROM ${catalogPrefix}information_schema.columns WHERE table_schema = '${schemaOnly}' ${sqlInFilter("table_name", tableNames)} ORDER BY table_name, ordinal_position`, + ); + const rows = standardizeRunSQLResult(result); + return groupColumnRowsIntoTables(rows, (t) => `${resourcePrefix}.${t}`); + } catch (error) { + logger.error( + `Error getting tables for Trino schema ${schemaName} in connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get tables for Trino schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, + ); + } +} + +async function listTablesForDuckDB( + connection: ApiConnection, + schemaName: string, + malloyConnection: Connection, + tableNames?: string[], +): Promise { + if (!connection.duckdbConnection) { + throw new Error("DuckDB connection is required"); + } + + const attachedDbs = connection.duckdbConnection.attachedDatabases || []; + + // Azure attached database matched by name + const azureDb = attachedDbs.find( + (db) => + db.type === "azure" && db.name === schemaName && db.azureConnection, + ); + if (azureDb) { + const azureConn = azureDb.azureConnection!; + const fileUrl = + azureConn.authType === "sas_token" + ? azureConn.sasUrl + : azureConn.fileUrl; + if (fileUrl) { + return describeAzureFile(malloyConnection, fileUrl, azureConn); + } + } + + // Azure ADLS file path (abfss://, https://, az://) if ( - connection.type === "duckdb" && - (schemaName.startsWith("abfss://") || - schemaName.startsWith("https://") || - schemaName.startsWith("az://")) + schemaName.startsWith("abfss://") || + schemaName.startsWith("https://") || + schemaName.startsWith("az://") ) { - return await describeAzureFile(malloyConnection, schemaName); + return describeAzureFile(malloyConnection, schemaName); } - // Check if this is a cloud storage file path (gs://bucket/path/file.ext or s3://bucket/path/file.ext) + // Cloud storage (GCS/S3) const parsedUri = parseCloudUri(schemaName); - - if (parsedUri && connection.type === "duckdb") { + if (parsedUri) { const { type: cloudType, bucket: bucketName, path: directoryPath, } = parsedUri; - - const attachedDatabases = - connection.duckdbConnection?.attachedDatabases || []; const credentials = getCloudCredentialsFromAttachedDatabases( - attachedDatabases, + attachedDbs, cloudType, ); - if (!credentials) { throw new Error( `${cloudType.toUpperCase()} credentials not found in attached databases`, ); } - const fileKeys = await listDataFilesInDirectory( credentials, bucketName, directoryPath, ); - - return await getCloudTablesWithColumns( + return getCloudTablesWithColumns( malloyConnection, credentials, bucketName, fileKeys, ); - } else if (connection.type === "ducklake") { - if (schemaName.split(".").length == 2) { - schemaName = `${connection.name}.${schemaName}`; - } else if (schemaName.split(".").length === 1) { - schemaName = `${connection.name}.${schemaName}`; - } } - const tableNames = await listTablesForSchema( - connection, - schemaName, - malloyConnection, - ); - // Fetch all table sources in parallel - const tableSourcePromises = tableNames.map(async (tableName) => { - try { - let tablePath: string; - if (connection.type === "trino") { - if (connection.trinoConnection?.catalog) { - tablePath = `${connection.trinoConnection?.catalog}.${schemaName}.${tableName}`; - } else { - // Catalog name is included in the schema name - tablePath = `${schemaName}.${tableName}`; - } - } else if (connection.type === "ducklake") { - // For ducklake, schemaName already includes connection name prefix from above - // So tablePath should be schemaName.tableName (which is connectionName.schemaName.tableName) - tablePath = `${schemaName}.${tableName}`; - } else { - tablePath = `${schemaName}.${tableName}`; - } - - logger.info( - `Processing table: ${tableName} in schema: ${schemaName}`, - { tablePath, connectionType: connection.type }, - ); - let tableSource: ApiTableSource | undefined; - if (fetchTableSchema) { - tableSource = await getConnectionTableSource( - malloyConnection, - tableName, - tablePath, - ); - } - return { - resource: tablePath, - columns: tableSource?.columns || [], - }; - } catch (error) { - logger.warn(`Failed to get schema for table ${tableName}`, { - error: extractErrorDataFromError(error), - schemaName, - tableName, - }); - // Return table without columns if schema fetch fails - return { - resource: `${schemaName}.${tableName}`, - columns: [], - }; - } - }); - - // Wait for all table sources to be fetched - const tableResults = await Promise.all(tableSourcePromises); + // Regular DuckDB schema — query information_schema.columns + const dotIdx = schemaName.indexOf("."); + if (dotIdx < 0) { + throw new Error( + `DuckDB schema name must be qualified as "catalog.schema", got "${schemaName}"`, + ); + } + const catalogName = schemaName.substring(0, dotIdx); + const actualSchemaName = schemaName.substring(dotIdx + 1); - return tableResults; + try { + const result = await malloyConnection.runSQL( + `SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = '${actualSchemaName}' AND table_catalog = '${catalogName}' ${sqlInFilter("table_name", tableNames)} ORDER BY table_name, ordinal_position`, + ); + const rows = standardizeRunSQLResult(result); + return groupColumnRowsIntoTables(rows, (t) => `${schemaName}.${t}`); + } catch (error) { + logger.error( + `Error getting tables for DuckDB schema ${schemaName} in connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get tables for DuckDB schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, + ); + } } -export async function getConnectionTableSource( +async function listTablesForMotherDuck( + connection: ApiConnection, + schemaName: string, malloyConnection: Connection, - tableKey: string, - tablePath: string, -): Promise { + tableNames?: string[], +): Promise { + if (!connection.motherduckConnection) { + throw new Error("MotherDuck connection is required"); + } try { - logger.info(`Attempting to fetch table schema for: ${tablePath}`, { - tableKey, - tablePath, - }); - const source = await ( - malloyConnection as Connection & { - fetchTableSchema: ( - tableKey: string, - tablePath: string, - ) => Promise; - } - ).fetchTableSchema(tableKey, tablePath); - if (source === undefined) { - throw new ConnectionError( - `Table ${tablePath} not found: ${JSON.stringify(source)}`, - ); - } - - // Validate that source has the expected structure - if (!source) { - throw new ConnectionError( - `Invalid table source returned for ${tablePath}`, - ); - } else if (typeof source !== "object") { - throw new ConnectionError(JSON.stringify(source)); - } - - const malloyFields = (source as TableSourceDef).fields; - if (!malloyFields || !Array.isArray(malloyFields)) { - throw new ConnectionError( - `Table ${tablePath} has no fields or invalid field structure`, - ); - } - - //This is for the Trino connection. The connection will not throw an error if the table is not found. - // Instead it will return an empty fields array. So we need to check for that. - // But it is fine to have it for all other connections as well. - if (malloyFields.length === 0) { - throw new ConnectionError(`Table ${tablePath} not found`); - } - - const fields = malloyFields.map((field) => { - return { - name: field.name, - type: field.type, - }; - }); - logger.debug(`Successfully fetched schema for ${tablePath}`, { - fieldCount: fields.length, - }); - return { - source: JSON.stringify(source), - resource: tablePath, - columns: fields, - }; + const result = await malloyConnection.runSQL( + `SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = '${schemaName}' ${sqlInFilter("table_name", tableNames)} ORDER BY table_name, ordinal_position`, + ); + const rows = standardizeRunSQLResult(result); + return groupColumnRowsIntoTables(rows, (t) => `${schemaName}.${t}`); } catch (error) { - const errorMessage = - error instanceof Error - ? error.message - : typeof error === "string" - ? error - : JSON.stringify(error); - logger.error("fetchTableSchema error", { - error, - tableKey, - tablePath, - }); - throw new ConnectionError(errorMessage); + logger.error( + `Error getting tables for MotherDuck schema ${schemaName} in connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get tables for MotherDuck schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, + ); } } -export async function listTablesForSchema( +async function listTablesForDuckLake( connection: ApiConnection, schemaName: string, malloyConnection: Connection, -): Promise { - if (connection.type === "bigquery") { - try { - // Use BigQuery client directly for efficient table listing - // This is much faster than querying all regions - const bigquery = createBigQueryClient(connection); - const dataset = bigquery.dataset(schemaName); - const [tables] = await dataset.getTables(); - - // Return table names, filtering out any undefined values - return tables - .map((table) => table.id) - .filter((id): id is string => id !== undefined); - } catch (error) { - logger.error( - `Error getting tables for BigQuery schema ${schemaName} in connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get tables for BigQuery schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else if (connection.type === "mysql") { - if (!connection.mysqlConnection) { - throw new Error("Mysql connection is required"); - } - try { - const result = await malloyConnection.runSQL( - `SELECT TABLE_NAME FROM information_schema.tables WHERE table_schema = '${schemaName}' AND table_type = 'BASE TABLE'`, - ); - const rows = standardizeRunSQLResult(result); - return rows.map((row: unknown) => { - const typedRow = row as Record; - return typedRow.TABLE_NAME as string; - }); - } catch (error) { - logger.error( - `Error getting tables for MySQL schema ${schemaName} in connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get tables for MySQL schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else if (connection.type === "postgres") { - if (!connection.postgresConnection) { - throw new Error("Postgres connection is required"); - } - try { - const result = await malloyConnection.runSQL( - `SELECT table_name as row FROM information_schema.tables WHERE table_schema = '${schemaName}' ORDER BY table_name`, - ); - const rows = standardizeRunSQLResult(result); - return rows as string[]; - } catch (error) { - logger.error( - `Error getting tables for Postgres schema ${schemaName} in connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get tables for Postgres schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else if (connection.type === "snowflake") { - if (!connection.snowflakeConnection) { - throw new Error("Snowflake connection is required"); - } - try { - // TODO: Switch to INFORMATION_SCHEMA.TABLES and INFORMATION_SCHEMA.VIEWS, with pagination support implemented in both backend and the frontend. - // Note: LIMIT 1000 is a temporary workaround to avoid pagination. - const tablesResult = await malloyConnection.runSQL( - `SHOW TABLES IN SCHEMA ${schemaName} LIMIT 1000`, - ); - const viewsResult = await malloyConnection.runSQL( - `SHOW VIEWS IN SCHEMA ${schemaName} LIMIT 1000`, - ); - const tableRows = standardizeRunSQLResult(tablesResult); - const viewRows = standardizeRunSQLResult(viewsResult); - logger.debug("Snowflake Tables Listed", { tableRows }); - logger.debug("Snowflake Views Listed", { viewRows }); - const rows = [...tableRows, ...viewRows]; - return rows - .map((row: unknown) => { - const typedRow = row as Record; - const name = typedRow.name ?? typedRow.NAME; - return typeof name === "string" ? name : String(name); - }) - .filter((id) => id.length > 0); - } catch (error) { - logger.error( - `Error getting tables for Snowflake schema ${schemaName} in connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get tables for Snowflake schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else if (connection.type === "trino") { - if (!connection.trinoConnection) { - throw new Error("Trino connection is required"); - } - try { - let result: unknown; - - if (connection.trinoConnection?.catalog) { - result = await malloyConnection.runSQL( - `SHOW TABLES FROM ${connection.trinoConnection.catalog}.${schemaName}`, - ); - } else { - // Catalog name is included in the schema name - result = await malloyConnection.runSQL( - `SHOW TABLES FROM ${schemaName}`, - ); - } - const rows = standardizeRunSQLResult(result); - return rows.map((row: unknown) => { - const typedRow = row as Record; - return typedRow.Table as string; - }); - } catch (error) { - logger.error( - `Error getting tables for Trino schema ${schemaName} in connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get tables for Trino schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else if (connection.type === "duckdb") { - if (!connection.duckdbConnection) { - throw new Error("DuckDB connection is required"); - } - - const parsedUri = parseCloudUri(schemaName); - - if (parsedUri) { - const { - type: cloudType, - bucket: bucketName, - path: directoryPath, - } = parsedUri; - - const attachedDatabases = - connection.duckdbConnection.attachedDatabases || []; - - const credentials = getCloudCredentialsFromAttachedDatabases( - attachedDatabases, - cloudType, - ); - - if (!credentials) { - throw new Error( - `${cloudType.toUpperCase()} credentials not found in attached databases`, - ); - } - - try { - const fileKeys = await listDataFilesInDirectory( - credentials, - bucketName, - directoryPath, - ); - return fileKeys.map((key) => { - const lastSlash = key.lastIndexOf("/"); - return lastSlash > 0 ? key.substring(lastSlash + 1) : key; - }); - } catch (error) { - logger.error( - `Error listing ${cloudType.toUpperCase()} objects in ${schemaName}`, - { - error, - }, - ); - throw new Error( - `Failed to list files in ${schemaName}: ${(error as Error).message}`, - ); - } - } - - const catalogName = schemaName.split(".")[0]; - const actualSchemaName = schemaName.split(".")[1]; - - // Regular DuckDB table listing - try { - const result = await malloyConnection.runSQL( - `SELECT table_name FROM information_schema.tables WHERE table_schema = '${actualSchemaName}' and table_catalog = '${catalogName}' ORDER BY table_name`, - { rowLimit: 1000 }, - ); + tableNames?: string[], +): Promise { + // Prefix bare schema names with the catalog (connection) name. + // Two-part names like "catalog.schema" are already qualified. + if (!schemaName.includes(".")) { + schemaName = `${connection.name}.${schemaName}`; + } - const rows = standardizeRunSQLResult(result); - return rows.map((row: unknown) => { - const typedRow = row as Record; - return typedRow.table_name as string; - }); - } catch (error) { - logger.error( - `Error getting tables for DuckDB schema ${schemaName} in connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get tables for DuckDB schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else if (connection.type === "motherduck") { - if (!connection.motherduckConnection) { - throw new Error("MotherDuck connection is required"); - } - try { - const result = await malloyConnection.runSQL( - `SELECT table_name as row FROM information_schema.tables WHERE table_schema = '${schemaName}' ORDER BY table_name`, - { rowLimit: 1000 }, - ); - const rows = standardizeRunSQLResult(result); - return rows.map((row: unknown) => { - const typedRow = row as { row: string }; - return typedRow.row; - }); - } catch (error) { - logger.error( - `Error getting tables for MotherDuck schema ${schemaName} in connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get tables for MotherDuck schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else if (connection.type === "ducklake") { - const catalogName = schemaName.split(".")[0]; - const actualSchemaName = schemaName.split(".")[1]; - console.error("catalogName", catalogName); - console.error("actualSchemaName", actualSchemaName); - try { - const result = await malloyConnection.runSQL( - `SELECT table_name FROM information_schema.tables WHERE table_schema = '${actualSchemaName}' AND table_catalog = '${catalogName}' ORDER BY table_name`, - { rowLimit: 1000 }, - ); - const rows = standardizeRunSQLResult(result); - return rows.map((row: unknown) => { - const typedRow = row as Record; - return typedRow.table_name as string; - }); - } catch (error) { - logger.error( - `Error getting tables for DuckLake schema ${schemaName} in connection ${connection.name}`, - { error }, - ); - throw new Error( - `Failed to get tables for DuckLake schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, - ); - } - } else { - throw new Error(`Unsupported connection type: ${connection.type}`); + const catalogName = schemaName.split(".")[0]; + const actualSchemaName = schemaName.split(".")[1]; + try { + const result = await malloyConnection.runSQL( + `SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = '${actualSchemaName}' AND table_catalog = '${catalogName}' ${sqlInFilter("table_name", tableNames)} ORDER BY table_name, ordinal_position`, + ); + const rows = standardizeRunSQLResult(result); + return groupColumnRowsIntoTables(rows, (t) => `${schemaName}.${t}`); + } catch (error) { + logger.error( + `Error getting tables for DuckLake schema ${schemaName} in connection ${connection.name}`, + { error }, + ); + throw new Error( + `Failed to get tables for DuckLake schema ${schemaName} in connection ${connection.name}: ${(error as Error).message}`, + ); } } diff --git a/packages/server/tests/unit/ducklake/ducklake.test.ts b/packages/server/tests/unit/ducklake/ducklake.test.ts index 62daddf1..a7ae490f 100644 --- a/packages/server/tests/unit/ducklake/ducklake.test.ts +++ b/packages/server/tests/unit/ducklake/ducklake.test.ts @@ -10,7 +10,7 @@ import { } from "../../../src/service/connection"; import { getSchemasForConnection, - getTablesForSchema, + listTablesForSchema, } from "../../../src/service/db_utils"; type ApiConnection = components["schemas"]["Connection"]; @@ -421,8 +421,8 @@ describe("DuckLake Connection Tests", () => { } // Test table listing for first schema - const schemaName = schemas[0].name; - const tables = await getTablesForSchema( + const schemaName = schemas[0].name!; + const tables = await listTablesForSchema( ducklakeConnection, schemaName, connection,