diff --git a/README.md b/README.md index 283ccac6..4fe99c5e 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ AI-Driven Development (vibe coding) on Databricks just got a whole lot better. T ### Install in existing project -By default this will install at a project level rather than a user level. This is often a good fit, but requires you to run your client from the exact directory that was used for the install. +By default this will install at a project level rather than a user level. This is often a good fit, but requires you to run your client from the exact directory that was used for the install. _Note: Project configuration files can be re-used in other projects. You find these configs under .claude, .cursor, or .gemini_ #### Mac / Linux @@ -164,7 +164,7 @@ Works with LangChain, OpenAI Agents SDK, or any Python framework. See [databrick |-----------|-------------| | [`databricks-tools-core/`](databricks-tools-core/) | Python library with high-level Databricks functions | | [`databricks-mcp-server/`](databricks-mcp-server/) | MCP server exposing 50+ tools for AI assistants | -| [`databricks-skills/`](databricks-skills/) | 19 markdown skills teaching Databricks patterns | +| [`databricks-skills/`](databricks-skills/) | 20 markdown skills teaching Databricks patterns | | [`databricks-builder-app/`](databricks-builder-app/) | Full-stack web app with Claude Code integration | --- diff --git a/databricks-skills/README.md b/databricks-skills/README.md index 29a79ae8..3d011155 100644 --- a/databricks-skills/README.md +++ b/databricks-skills/README.md @@ -33,10 +33,10 @@ cp -r ai-dev-kit/databricks-skills/databricks-agent-bricks .claude/skills/ ## Available Skills ### 🤖 AI & Agents +- **databricks-ai-functions** - Built-in AI Functions (ai_classify, ai_extract, ai_summarize, ai_query, ai_forecast, ai_parse_document, and more) with SQL and PySpark patterns, function selection guidance, document processing pipelines, and custom RAG (parse → chunk → index → query) - **databricks-agent-bricks** - Knowledge Assistants, Genie Spaces, Supervisor Agents - **databricks-genie** - Genie Spaces: create, curate, and query via Conversation API - **databricks-model-serving** - Deploy MLflow models and AI agents to endpoints -- **databricks-parsing** - Parse documents (PDF, DOCX, images) with ai_parse_document for custom RAG pipelines - **databricks-unstructured-pdf-generation** - Generate synthetic PDFs for RAG - **databricks-vector-search** - Vector similarity search for RAG and semantic search diff --git a/databricks-skills/databricks-ai-functions/1-task-functions.md b/databricks-skills/databricks-ai-functions/1-task-functions.md new file mode 100644 index 00000000..eb989904 --- /dev/null +++ b/databricks-skills/databricks-ai-functions/1-task-functions.md @@ -0,0 +1,348 @@ +# Task-Specific AI Functions — Full Reference + +These functions require no model endpoint selection. They call pre-configured Foundation Model APIs optimized for each task. All require DBR 15.1+ (15.4 ML LTS for batch); `ai_parse_document` requires DBR 17.1+. + +--- + +## `ai_analyze_sentiment` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_analyze_sentiment + +Returns one of: `positive`, `negative`, `neutral`, `mixed`, or `NULL`. + +```sql +SELECT ai_analyze_sentiment(review_text) AS sentiment +FROM customer_reviews; +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("customer_reviews") +df.withColumn("sentiment", expr("ai_analyze_sentiment(review_text)")).display() +``` + +--- + +## `ai_classify` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_classify + +**Syntax:** `ai_classify(content, labels)` +- `content`: STRING — text to classify +- `labels`: ARRAY\ — 2 to 20 mutually exclusive categories + +Returns the matching label or `NULL`. + +```sql +SELECT ticket_text, + ai_classify(ticket_text, ARRAY('urgent', 'not urgent', 'spam')) AS priority +FROM support_tickets; +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("support_tickets") +df.withColumn( + "priority", + expr("ai_classify(ticket_text, array('urgent', 'not urgent', 'spam'))") +).display() +``` + +**Tips:** +- Fewer labels = more consistent results (2–5 is optimal) +- Labels should be mutually exclusive and clearly distinguishable +- Not suitable for multi-label classification — run multiple calls if needed + +--- + +## `ai_extract` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_extract + +**Syntax:** `ai_extract(content, labels)` +- `content`: STRING — source text +- `labels`: ARRAY\ — entity types to extract + +Returns a STRUCT where each field name matches a label. Fields are `NULL` if not found. + +```sql +-- Extract and access fields directly +SELECT + entities.person, + entities.location, + entities.date +FROM ( + SELECT ai_extract( + 'John Doe called from New York on 2024-01-15.', + ARRAY('person', 'location', 'date') + ) AS entities + FROM messages +); +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("messages") +df = df.withColumn( + "entities", + expr("ai_extract(message, array('person', 'location', 'date'))") +) +df.select("entities.person", "entities.location", "entities.date").display() +``` + +**Use `ai_query` instead when:** the output has nested arrays or more than ~5 levels of hierarchy. + +--- + +## `ai_fix_grammar` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_fix_grammar + +**Syntax:** `ai_fix_grammar(content)` — Returns corrected STRING. + +Optimized for English. Useful for cleaning user-generated content before downstream processing. + +```sql +SELECT ai_fix_grammar(user_comment) AS corrected FROM user_feedback; +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("user_feedback") +df.withColumn("corrected", expr("ai_fix_grammar(user_comment)")).display() +``` + +--- + +## `ai_gen` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_gen + +**Syntax:** `ai_gen(prompt)` — Returns a generated STRING. + +Use for free-form text generation where the output format doesn't need to be structured. For structured JSON output, use `ai_query` with `responseFormat`. + +```sql +SELECT product_name, + ai_gen(CONCAT('Write a one-sentence marketing tagline for: ', product_name)) AS tagline +FROM products; +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("products") +df.withColumn( + "tagline", + expr("ai_gen(concat('Write a one-sentence marketing tagline for: ', product_name))") +).display() +``` + +--- + +## `ai_mask` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_mask + +**Syntax:** `ai_mask(content, labels)` +- `content`: STRING — text with sensitive data +- `labels`: ARRAY\ — entity types to redact + +Returns text with identified entities replaced by `[MASKED]`. + +Common label values: `'person'`, `'email'`, `'phone'`, `'address'`, `'ssn'`, `'credit_card'` + +```sql +SELECT ai_mask( + message_body, + ARRAY('person', 'email', 'phone', 'address') +) AS message_safe +FROM customer_messages; +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("customer_messages") +df.withColumn( + "message_safe", + expr("ai_mask(message_body, array('person', 'email', 'phone'))") +).write.format("delta").mode("append").saveAsTable("catalog.schema.messages_safe") +``` + +--- + +## `ai_similarity` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_similarity + +**Syntax:** `ai_similarity(expr1, expr2)` — Returns a FLOAT between 0.0 and 1.0. + +Use for fuzzy deduplication, search result ranking, or item matching across datasets. + +```sql +-- Deduplicate company names (similarity > 0.85 = likely duplicate) +SELECT a.id, b.id, a.name, b.name, + ai_similarity(a.name, b.name) AS score +FROM companies a +JOIN companies b ON a.id < b.id +WHERE ai_similarity(a.name, b.name) > 0.85 +ORDER BY score DESC; +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("product_search") +df.withColumn( + "match_score", + expr("ai_similarity(search_query, product_title)") +).orderBy("match_score", ascending=False).display() +``` + +--- + +## `ai_summarize` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_summarize + +**Syntax:** `ai_summarize(content [, max_words])` +- `content`: STRING — text to summarize +- `max_words`: INTEGER (optional) — word limit; default 50; use `0` for uncapped + +```sql +-- Default (50 words) +SELECT ai_summarize(article_body) AS summary FROM news_articles; + +-- Custom word limit +SELECT ai_summarize(article_body, 20) AS brief FROM news_articles; +SELECT ai_summarize(article_body, 0) AS full FROM news_articles; +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("news_articles") +df.withColumn("summary", expr("ai_summarize(article_body, 30)")).display() +``` + +--- + +## `ai_translate` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_translate + +**Syntax:** `ai_translate(content, to_lang)` +- `content`: STRING — source text +- `to_lang`: STRING — target language code + +**Supported languages:** `en`, `de`, `fr`, `it`, `pt`, `hi`, `es`, `th` + +For unsupported languages, use `ai_query` with a multilingual model endpoint. + +```sql +-- Single language +SELECT ai_translate(product_description, 'es') AS description_es FROM products; + +-- Multi-language fanout +SELECT + description, + ai_translate(description, 'fr') AS description_fr, + ai_translate(description, 'de') AS description_de +FROM products; +``` + +```python +from pyspark.sql.functions import expr +df = spark.table("products") +df.withColumn( + "description_es", + expr("ai_translate(product_description, 'es')") +).display() +``` + +--- + +## `ai_parse_document` + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document + +**Requires:** DBR 17.1+ + +**Syntax:** `ai_parse_document(content [, options])` +- `content`: BINARY — document content loaded from `read_files()` or `spark.read.format("binaryFile")` +- `options`: MAP\ (optional) — parsing configuration + +**Supported formats:** PDF, JPG/JPEG, PNG, DOCX, PPTX + +Returns a VARIANT with pages, elements (text paragraphs, tables, figures, headers, footers), bounding boxes, and error metadata. + +**Options:** + +| Key | Values | Description | +|-----|--------|-------------| +| `version` | `'2.0'` | Output schema version | +| `imageOutputPath` | Volume path | Save rendered page images | +| `descriptionElementTypes` | `''`, `'figure'`, `'*'` | AI-generated descriptions (default: `'*'` for all) | + +**Output schema:** + +``` +document +├── pages[] -- page id, image_uri +└── elements[] -- extracted content + ├── type -- "text", "table", "figure", etc. + ├── content -- extracted text + ├── bbox -- bounding box coordinates + └── description -- AI-generated description +metadata -- file info, schema version +error_status[] -- errors per page (if any) +``` + +```sql +-- Parse and extract text blocks +SELECT + path, + parsed:pages[*].elements[*].content AS text_blocks, + parsed:error AS parse_error +FROM ( + SELECT path, ai_parse_document(content) AS parsed + FROM read_files('/Volumes/catalog/schema/landing/docs/', format => 'binaryFile') +); + +-- Parse with options (image output + descriptions) +SELECT ai_parse_document( + content, + map( + 'version', '2.0', + 'imageOutputPath', '/Volumes/catalog/schema/volume/images/', + 'descriptionElementTypes', '*' + ) +) AS parsed +FROM read_files('/Volumes/catalog/schema/volume/invoices/', format => 'binaryFile'); +``` + +```python +from pyspark.sql.functions import expr + +df = ( + spark.read.format("binaryFile") + .load("/Volumes/catalog/schema/landing/docs/") + .withColumn("parsed", expr("ai_parse_document(content)")) + .selectExpr( + "path", + "parsed:pages[*].elements[*].content AS text_blocks", + "parsed:error AS parse_error", + ) + .filter("parse_error IS NULL") +) + +# Chain with task-specific functions on the extracted text +df = ( + df.withColumn("summary", expr("ai_summarize(text_blocks, 50)")) + .withColumn("entities", expr("ai_extract(text_blocks, array('date', 'amount', 'vendor'))")) + .withColumn("category", expr("ai_classify(text_blocks, array('invoice', 'contract', 'report'))")) +) +df.display() +``` + +**Limitations:** +- Processing is slow for dense or low-resolution documents +- Suboptimal for non-Latin alphabets and digitally signed PDFs +- Custom models not supported — always uses the built-in parsing model diff --git a/databricks-skills/databricks-ai-functions/2-ai-query.md b/databricks-skills/databricks-ai-functions/2-ai-query.md new file mode 100644 index 00000000..60d860fa --- /dev/null +++ b/databricks-skills/databricks-ai-functions/2-ai-query.md @@ -0,0 +1,223 @@ +# `ai_query` — Full Reference + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query + +> Use `ai_query` only when no task-specific function fits. See the function selection table in [SKILL.md](SKILL.md). + +## When to Use `ai_query` + +- Output schema has **nested arrays or deeply nested STRUCTs** (e.g., `itens: [{codigo, descricao, qtde}]`) +- Calling a **custom Model Serving endpoint** (your own fine-tuned model) +- **Multimodal input** — passing binary image files via `files =>` +- **Cross-document reasoning** — prompt includes content from multiple sources +- Need **sampling parameters** (`temperature`, `max_tokens`) control + +## Syntax + +```sql +ai_query( + endpoint, + request + [, returnType => ddl_schema] + [, failOnError => boolean] + [, modelParameters => named_struct(...)] + [, responseFormat => json_string] + [, files => binary_column] +) +``` + +## Parameters + +| Parameter | Type | Runtime | Description | +|---|---|---|---| +| `endpoint` | STRING literal | — | Foundation Model name or custom endpoint name. Never guess — use exact names from the [model serving docs](https://docs.databricks.com/aws/en/machine-learning/foundation-models/supported-models.html). | +| `request` | STRING or STRUCT | — | Prompt string for chat models; STRUCT for custom ML endpoints | +| `returnType` | DDL schema (optional) | 15.2+ | Structures the parsed response like `from_json` | +| `failOnError` | BOOLEAN (optional, default `true`) | 15.3+ | If `false`, returns STRUCT `{response, error}` instead of raising on failure | +| `modelParameters` | STRUCT (optional) | 15.3+ | Sampling params: `temperature`, `max_tokens`, `top_p`, etc. | +| `responseFormat` | JSON string (optional) | 15.4+ | Forces structured JSON output: `'{"type":"json_object"}'` | +| `files` | binary column (optional) | — | Pass binary images directly (JPEG/PNG) — no upload step needed | + +## Foundation Model Names (Do Not Guess) + +| Use case | Endpoint name | +|---|---| +| General reasoning / extraction | `databricks-claude-sonnet-4` | +| Fast / cheap tasks | `databricks-meta-llama-3-1-8b-instruct` | +| Large context / complex | `databricks-meta-llama-3-3-70b-instruct` | +| Multimodal (vision + text) | `databricks-llama-4-maverick` | +| Embeddings | `databricks-gte-large-en` | + +## Patterns + +### Basic — single prompt + +```sql +SELECT ai_query( + 'databricks-meta-llama-3-3-70b-instruct', + 'Describe Databricks SQL in 30 words.' +) AS response; +``` + +### Applied to a table column + +```sql +SELECT ticket_id, + ai_query( + 'databricks-meta-llama-3-3-70b-instruct', + CONCAT('Summarize in one sentence: ', ticket_body) + ) AS summary +FROM support_tickets; +``` + +### Structured JSON output (`responseFormat`) + +Preferred over `returnType` for chat models (requires Runtime 15.4+): + +```sql +SELECT ai_query( + 'databricks-claude-sonnet-4', + CONCAT('Extract invoice fields as JSON. Fields: numero, fornecedor, total, ' + 'itens:[{codigo, descricao, qtde, vlrUnit}]. Input: ', text_blocks), + responseFormat => '{"type":"json_object"}', + failOnError => false +) AS ai_response +FROM parsed_documents; +``` + +Then parse with `from_json`: + +```python +from pyspark.sql.functions import from_json, col + +df = df.withColumn( + "invoice", + from_json( + col("ai_response.response"), + "STRUCT>>" + ) +) +# Access fields +df.select("invoice.numero", "invoice.total", "invoice.itens").display() +``` + +### With `failOnError` (always use in batch pipelines) + +```sql +SELECT + id, + ai_response.response, + ai_response.error +FROM ( + SELECT id, + ai_query( + 'databricks-claude-sonnet-4', + CONCAT('Classify: ', text), + failOnError => false + ) AS ai_response + FROM documents +) +-- Route errors to a separate table downstream +``` + +### With `modelParameters` (control sampling) + +```sql +SELECT ai_query( + 'databricks-meta-llama-3-3-70b-instruct', + CONCAT('Extract entities from: ', text), + failOnError => false, + modelParameters => named_struct('temperature', CAST(0.0 AS DOUBLE), 'max_tokens', 500) +) AS result +FROM documents; +``` + +### Multimodal — image files (`files =>`) + +No file upload step needed. Pass the binary column directly: + +```sql +SELECT + path, + ai_query( + 'databricks-llama-4-maverick', + 'Describe what is in this image in detail.', + files => content + ) AS description +FROM read_files('/Volumes/catalog/schema/images/', format => 'binaryFile'); +``` + +```python +from pyspark.sql.functions import expr + +df = ( + spark.read.format("binaryFile") + .load("/Volumes/catalog/schema/images/") + .withColumn("description", expr(""" + ai_query( + 'databricks-llama-4-maverick', + 'Describe the contents of this image.', + files => content + ) + """)) +) +``` + +### As a reusable SQL UDF + +```sql +CREATE FUNCTION catalog.schema.extract_invoice(text STRING) +RETURNS STRING +RETURN ai_query( + 'databricks-claude-sonnet-4', + CONCAT('Extract invoice JSON from: ', text), + responseFormat => '{"type":"json_object"}' +); + +SELECT extract_invoice(document_text) FROM raw_documents; +``` + +### PySpark with `expr` + +```python +from pyspark.sql.functions import expr + +df = spark.table("documents") +df = df.withColumn("result", expr(""" + ai_query( + 'databricks-claude-sonnet-4', + concat('Extract structured data from: ', content), + responseFormat => '{"type":"json_object"}', + failOnError => false + ) +""")) +``` + +## Error Handling Pattern for Batch Pipelines + +Always use `failOnError => false` in batch jobs. Write errors to a sidecar table: + +```python +import dlt +from pyspark.sql.functions import expr, col + +@dlt.table(comment="AI extraction results") +def extracted(): + return ( + dlt.read("raw") + .withColumn("ai_response", expr(""" + ai_query('databricks-claude-sonnet-4', prompt, + responseFormat => '{"type":"json_object"}', + failOnError => false) + """)) + ) + +@dlt.table(comment="Rows that failed AI extraction") +def extraction_errors(): + return ( + dlt.read("extracted") + .filter(col("ai_response.error").isNotNull()) + .select("id", "prompt", col("ai_response.error").alias("error")) + ) +``` diff --git a/databricks-skills/databricks-ai-functions/3-ai-forecast.md b/databricks-skills/databricks-ai-functions/3-ai-forecast.md new file mode 100644 index 00000000..9c1f9b1f --- /dev/null +++ b/databricks-skills/databricks-ai-functions/3-ai-forecast.md @@ -0,0 +1,162 @@ +# `ai_forecast` — Full Reference + +**Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_forecast + +> `ai_forecast` is a **table-valued function** — it returns a table of rows, not a scalar. Call it with `SELECT * FROM ai_forecast(...)`. + +## Requirements + +- **Pro or Serverless SQL warehouse** — not available on Classic or Starter +- Input data must have a DATE or TIMESTAMP time column and at least one numeric value column + +## Syntax + +```sql +SELECT * +FROM ai_forecast( + observed => TABLE(...) or query, + horizon => 'YYYY-MM-DD' or TIMESTAMP, + time_col => 'column_name', + value_col => 'column_name', + [group_col => 'column_name'], + [prediction_interval_width => 0.95] +) +``` + +## Parameters + +| Parameter | Type | Description | +|---|---|---| +| `observed` | TABLE reference or subquery | Training data with time + value columns | +| `horizon` | DATE, TIMESTAMP, or STRING | End date/time for the forecast period | +| `time_col` | STRING | Name of the DATE or TIMESTAMP column in `observed` | +| `value_col` | STRING | One or more numeric columns to forecast (up to 100 per group) | +| `group_col` | STRING (optional) | Column to partition forecasts by — produces one forecast series per group value | +| `prediction_interval_width` | DOUBLE (optional, default 0.95) | Confidence interval width between 0 and 1 | + +## Output Columns + +For each `value_col` named `metric`, the output includes: + +| Column | Type | Description | +|---|---|---| +| time_col | DATE or TIMESTAMP | The forecast timestamp (same type as input) | +| `metric_forecast` | DOUBLE | Point forecast | +| `metric_upper` | DOUBLE | Upper confidence bound | +| `metric_lower` | DOUBLE | Lower confidence bound | +| group_col | original type | Present when `group_col` is specified | + +## Patterns + +### Single Metric Forecast + +```sql +SELECT * +FROM ai_forecast( + observed => TABLE(SELECT order_date, revenue FROM daily_revenue), + horizon => '2026-12-31', + time_col => 'order_date', + value_col => 'revenue' +); +-- Returns: order_date, revenue_forecast, revenue_upper, revenue_lower +``` + +### Multi-Group Forecast + +Produces one forecast series per distinct value of `group_col`: + +```sql +SELECT * +FROM ai_forecast( + observed => TABLE(SELECT date, region, sales FROM regional_sales), + horizon => '2026-12-31', + time_col => 'date', + value_col => 'sales', + group_col => 'region' +); +-- Returns: date, region, sales_forecast, sales_upper, sales_lower +-- One row per date per region +``` + +### Multiple Value Columns + +```sql +SELECT * +FROM ai_forecast( + observed => TABLE(SELECT date, units, revenue FROM daily_kpis), + horizon => '2026-06-30', + time_col => 'date', + value_col => 'units,revenue' -- comma-separated +); +-- Returns: date, units_forecast, units_upper, units_lower, +-- revenue_forecast, revenue_upper, revenue_lower +``` + +### Custom Confidence Interval + +```sql +SELECT * +FROM ai_forecast( + observed => TABLE(SELECT ts, sensor_value FROM iot_readings), + horizon => '2026-03-31', + time_col => 'ts', + value_col => 'sensor_value', + prediction_interval_width => 0.80 -- narrower interval = less conservative +); +``` + +### Filtering Input Data (Subquery) + +```sql +SELECT * +FROM ai_forecast( + observed => TABLE( + SELECT date, sales + FROM daily_sales + WHERE region = 'BR' AND date >= '2024-01-01' + ), + horizon => '2026-12-31', + time_col => 'date', + value_col => 'sales' +); +``` + +### PySpark — Use `spark.sql()` + +`ai_forecast` is a table-valued function and must be called through `spark.sql()`: + +```python +result = spark.sql(""" + SELECT * + FROM ai_forecast( + observed => TABLE(SELECT date, sales FROM catalog.schema.daily_sales), + horizon => '2026-12-31', + time_col => 'date', + value_col => 'sales' + ) +""") +result.display() +``` + +### Save Forecast to Delta Table + +```python +result = spark.sql(""" + SELECT * + FROM ai_forecast( + observed => TABLE(SELECT date, region, revenue FROM catalog.schema.sales), + horizon => '2026-12-31', + time_col => 'date', + value_col => 'revenue', + group_col => 'region' + ) +""") +result.write.format("delta").mode("overwrite").saveAsTable("catalog.schema.revenue_forecast") +``` + +## Notes + +- The underlying model is a **prophet-like piecewise linear + seasonality model** — suitable for business time series with trend and weekly/yearly seasonality +- Handles "any number of groups" but up to **100 metrics per group** +- Output time column preserves the input type (DATE stays DATE, TIMESTAMP stays TIMESTAMP) +- Value columns are always cast to DOUBLE in output regardless of input type diff --git a/databricks-skills/databricks-ai-functions/4-document-processing-pipeline.md b/databricks-skills/databricks-ai-functions/4-document-processing-pipeline.md new file mode 100644 index 00000000..cb8afbd6 --- /dev/null +++ b/databricks-skills/databricks-ai-functions/4-document-processing-pipeline.md @@ -0,0 +1,470 @@ +# Document Processing Pipeline with AI Functions + +End-to-end patterns for building batch document processing pipelines using AI Functions in a Lakeflow Declarative Pipeline (DLT). Covers function selection, `config.yml` centralization, error handling, and guidance on near-real-time variants with DSPy or LangChain. + +> For workflow migration context (e.g., migrating from n8n, LangChain, or other orchestration tools), see the companion skill `n8n-to-databricks`. + +--- + +## Function Selection for Document Pipelines + +When processing documents with AI Functions, apply this order of preference for each stage: + +| Stage | Preferred function | Use `ai_query` when... | +|---|---|---| +| Parse binary docs (PDF, DOCX, images) | `ai_parse_document` | Need image-level reasoning | +| Extract flat fields from text | `ai_extract` | Schema has nested arrays | +| Classify document type or status | `ai_classify` | More than 20 categories | +| Score item similarity / matching | `ai_similarity` | Need cross-document reasoning | +| Summarize long sections | `ai_summarize` | — | +| Extract nested JSON (e.g. line items) | `ai_query` with `responseFormat` | (This is the intended use case) | + +--- + +## Centralized Configuration (`config.yml`) + +**Always centralize model names, volume paths, and prompts in a `config.yml`.** This makes model swaps a one-line change and keeps pipeline code free of hardcoded strings. + +```yaml +# config.yml +models: + default: "databricks-claude-sonnet-4" + mini: "databricks-meta-llama-3-1-8b-instruct" + vision: "databricks-llama-4-maverick" + +catalog: + name: "my_catalog" + schema: "document_processing" + +volumes: + input: "/Volumes/my_catalog/document_processing/landing/" + tmp: "/Volumes/my_catalog/document_processing/tmp/" + +output_tables: + results: "my_catalog.document_processing.processed_docs" + errors: "my_catalog.document_processing.processing_errors" + +prompts: + extract_invoice: | + Extract invoice fields and return ONLY valid JSON. + Fields: invoice_number, vendor_name, vendor_tax_id (digits only), + issue_date (dd/mm/yyyy), total_amount (numeric), + line_items: [{item_code, description, quantity, unit_price, total}]. + Return null for missing fields. + + classify_doc: | + Classify this document into exactly one category. +``` + +```python +# config_loader.py +import yaml + +def load_config(path: str = "config.yml") -> dict: + with open(path) as f: + return yaml.safe_load(f) + +CFG = load_config() +ENDPOINT = CFG["models"]["default"] +ENDPOINT_MINI = CFG["models"]["mini"] +VOLUME_INPUT = CFG["volumes"]["input"] +PROMPT_INV = CFG["prompts"]["extract_invoice"] +``` + +--- + +## Batch Pipeline — Lakeflow Declarative Pipeline + +Each logical step in your document workflow maps to a `@dlt.table` stage. Data flows through Delta tables between stages. + +``` +[Landing Volume] → Stage 1: ai_parse_document + → Stage 2: ai_classify (document type) + → Stage 3: ai_extract (flat fields) + ai_query (nested JSON) + → Stage 4: ai_similarity (item matching) + → Stage 5: Final Delta output table +``` + +### `pipeline.py` + +```python +import dlt +import yaml +from pyspark.sql.functions import expr, col, from_json + +CFG = yaml.safe_load(open("/Workspace/path/to/config.yml")) +ENDPOINT = CFG["models"]["default"] +VOL_IN = CFG["volumes"]["input"] +PROMPT = CFG["prompts"]["extract_invoice"] + + +# ── Stage 1: Parse binary documents ────────────────────────────────────────── +# Preferred: ai_parse_document — no model selection, no ai_query needed + +@dlt.table(comment="Parsed document text from all file types in the landing volume") +def raw_parsed(): + return ( + spark.read.format("binaryFile").load(VOL_IN) + .withColumn("parsed", expr("ai_parse_document(content)")) + .selectExpr( + "path", + "parsed:pages[*].elements[*].content AS text_blocks", + "parsed:error AS parse_error", + ) + .filter("parse_error IS NULL") + ) + + +# ── Stage 2: Classify document type ────────────────────────────────────────── +# Preferred: ai_classify — cheap, no endpoint selection + +@dlt.table(comment="Document type classification") +def classified_docs(): + return ( + dlt.read("raw_parsed") + .withColumn( + "doc_type", + expr("ai_classify(text_blocks, array('invoice', 'purchase_order', 'receipt', 'contract', 'other'))") + ) + ) + + +# ── Stage 3a: Flat field extraction ────────────────────────────────────────── +# Preferred: ai_extract for flat fields (vendor, date, total) + +@dlt.table(comment="Flat header fields extracted from documents") +def extracted_flat(): + return ( + dlt.read("classified_docs") + .filter("doc_type = 'invoice'") + .withColumn( + "header", + expr("ai_extract(text_blocks, array('invoice_number', 'vendor_name', 'issue_date', 'total_amount', 'tax_id'))") + ) + .select("path", "doc_type", "text_blocks", col("header")) + ) + + +# ── Stage 3b: Nested JSON extraction (last resort: ai_query) ───────────────── +# Use ai_query only because line_items is a nested array — ai_extract can't handle it + +@dlt.table(comment="Nested line items extracted — ai_query used for array schema only") +def extracted_line_items(): + return ( + dlt.read("extracted_flat") + .withColumn( + "ai_response", + expr(f""" + ai_query( + '{ENDPOINT}', + concat('{PROMPT.strip()}', '\\n\\nDocument text:\\n', LEFT(text_blocks, 6000)), + responseFormat => '{{"type":"json_object"}}', + failOnError => false + ) + """) + ) + .withColumn( + "line_items", + from_json( + col("ai_response.response"), + "STRUCT>>" + ) + ) + .select("path", "doc_type", "header", "line_items", col("ai_response.error").alias("extraction_error")) + ) + + +# ── Stage 4: Similarity matching ───────────────────────────────────────────── +# Preferred: ai_similarity for fuzzy matching between extracted fields + +@dlt.table(comment="Vendor name similarity vs reference master data") +def vendor_matched(): + extracted = dlt.read("extracted_line_items") + # Join against a reference vendor table for fuzzy matching + vendors = spark.table("my_catalog.document_processing.vendor_master").select("vendor_id", "vendor_name") + + return ( + extracted.crossJoin(vendors) + .withColumn( + "name_similarity", + expr("ai_similarity(header.vendor_name, vendor_name)") + ) + .filter("name_similarity > 0.80") + .orderBy("name_similarity", ascending=False) + ) + + +# ── Stage 5: Final output + error sidecar ──────────────────────────────────── + +@dlt.table( + comment="Final processed documents ready for downstream consumption", + table_properties={"delta.enableChangeDataFeed": "true"}, +) +def processed_docs(): + return ( + dlt.read("extracted_line_items") + .filter("extraction_error IS NULL") + .selectExpr( + "path", + "doc_type", + "header.invoice_number", + "header.vendor_name", + "header.issue_date", + "header.total_amount", + "line_items.line_items AS items", + ) + ) + + +@dlt.table(comment="Rows that failed at any extraction stage — review and reprocess") +def processing_errors(): + return ( + dlt.read("extracted_line_items") + .filter("extraction_error IS NOT NULL") + .select("path", "doc_type", col("extraction_error").alias("error")) + ) +``` + +--- + +## Custom RAG Pipeline — Parse → Chunk → Index → Query + +When the goal is retrieval-augmented generation rather than field extraction, use this pipeline to parse documents, chunk them into a Delta table, and index with Vector Search. + +### Step 1 — Parse and Chunk into a Delta Table + +`ai_parse_document` returns a VARIANT. Use `variant_get` with an explicit `ARRAY` cast before calling `explode`, since `explode()` does not accept raw VARIANT values. + +```sql +CREATE OR REPLACE TABLE catalog.schema.parsed_chunks AS +WITH parsed AS ( + SELECT + path, + ai_parse_document(content) AS doc + FROM read_files('/Volumes/catalog/schema/volume/docs/', format => 'binaryFile') +), +elements AS ( + SELECT + path, + explode(variant_get(doc, '$.document.elements', 'ARRAY')) AS element + FROM parsed +) +SELECT + md5(concat(path, variant_get(element, '$.content', 'STRING'))) AS chunk_id, + path AS source_path, + variant_get(element, '$.content', 'STRING') AS content, + variant_get(element, '$.type', 'STRING') AS element_type, + current_timestamp() AS parsed_at +FROM elements +WHERE variant_get(element, '$.content', 'STRING') IS NOT NULL + AND length(trim(variant_get(element, '$.content', 'STRING'))) > 10; +``` + +### Step 1a (Production) — Incremental Parsing with Structured Streaming + +For production pipelines where new documents arrive over time, use Structured Streaming with checkpoints for exactly-once processing. Each run processes only new files, then stops with `trigger(availableNow=True)`. + +See the official bundle example: +[databricks/bundle-examples/contrib/job_with_ai_parse_document](https://github.com/databricks/bundle-examples/tree/main/contrib/job_with_ai_parse_document) + +**Stage 1 — Parse raw documents (streaming):** + +```python +from pyspark.sql.functions import col, current_timestamp, expr + +files_df = ( + spark.readStream.format("binaryFile") + .option("pathGlobFilter", "*.{pdf,jpg,jpeg,png}") + .option("recursiveFileLookup", "true") + .load("/Volumes/catalog/schema/volume/docs/") +) + +parsed_df = ( + files_df + .repartition(8, expr("crc32(path) % 8")) + .withColumn("parsed", expr(""" + ai_parse_document(content, map( + 'version', '2.0', + 'descriptionElementTypes', '*' + )) + """)) + .withColumn("parsed_at", current_timestamp()) + .select("path", "parsed", "parsed_at") +) + +( + parsed_df.writeStream.format("delta") + .outputMode("append") + .option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/01_parse") + .option("mergeSchema", "true") + .trigger(availableNow=True) + .toTable("catalog.schema.parsed_documents_raw") +) +``` + +**Stage 2 — Extract text from parsed VARIANT (streaming):** + +Uses `transform()` to extract element content from the VARIANT array, and `try_cast` for safe access. Error rows are preserved but flagged. + +```python +from pyspark.sql.functions import col, concat_ws, expr, lit, when + +parsed_stream = spark.readStream.format("delta").table("catalog.schema.parsed_documents_raw") + +text_df = ( + parsed_stream + .withColumn("text", + when( + expr("try_cast(parsed:error_status AS STRING)").isNotNull(), lit(None) + ).otherwise( + concat_ws("\n\n", expr(""" + transform( + try_cast(parsed:document:elements AS ARRAY), + element -> try_cast(element:content AS STRING) + ) + """)) + ) + ) + .withColumn("error_status", expr("try_cast(parsed:error_status AS STRING)")) + .select("path", "text", "error_status", "parsed_at") +) + +( + text_df.writeStream.format("delta") + .outputMode("append") + .option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/02_text") + .option("mergeSchema", "true") + .trigger(availableNow=True) + .toTable("catalog.schema.parsed_documents_text") +) +``` + +Key techniques: +- **`repartition` by file hash** — parallelizes `ai_parse_document` across workers +- **`trigger(availableNow=True)`** — processes all pending files then stops (batch-like) +- **Checkpoints** — exactly-once guarantee; no re-parsing on re-runs +- **`transform()` + `try_cast`** — safer than `explode` + `variant_get` for text extraction +- **Separate stages with independent checkpoints** — parse and text extraction can fail/retry independently + +### Step 1b — Enable Change Data Feed + +Required for Vector Search Delta Sync: + +```sql +ALTER TABLE catalog.schema.parsed_chunks +SET TBLPROPERTIES (delta.enableChangeDataFeed = true); +``` + +### Step 2 — Create a Vector Search Index and Query It + +Use the **[databricks-vector-search](../databricks-vector-search/SKILL.md)** skill to create a Delta Sync index on the chunked table and query it. Ensure CDF is enabled first (Step 1b above). + +### RAG-Specific Issues + +| Issue | Solution | +|-------|----------| +| `explode()` fails with VARIANT | `explode()` requires ARRAY, not VARIANT. Use `variant_get(doc, '$.document.elements', 'ARRAY')` to cast before exploding | +| Short/noisy chunks | Filter with `length(trim(...)) > 10` — parsing produces tiny fragments (page numbers, headers) that pollute the index | +| Re-parsing unchanged documents | Use Structured Streaming with checkpoints — see Step 1a above | +| Region not supported | US/EU regions only, or enable cross-geography routing | + +--- + +## Near-Real-Time Variant — DSPy + MLflow Agent + +When the pipeline must respond in seconds (triggered by a user action, API call, or form submission), use DSPy with an MLflow ChatAgent instead of a DLT pipeline. + +**When to use DSPy vs LangChain:** + +| Scenario | Stack | +|---|---| +| Fixed pipeline steps, well-defined I/O, want prompt optimization | **DSPy** | +| Needs tool-calling, memory, or multi-agent coordination | **LangChain LCEL** + MLflow ChatAgent | +| Single LLM call, simple task | Direct AI Function or `ai_query` in a notebook | + +### DSPy Signatures (replace LangChain agent system prompts) + +```python +# pip install dspy-ai mlflow databricks-sdk +import dspy, yaml + +CFG = yaml.safe_load(open("config.yml")) +lm = dspy.LM( + model=f"databricks/{CFG['models']['default']}", + api_base="https:///serving-endpoints", + api_key=dbutils.secrets.get("scope", "databricks-token"), +) +dspy.configure(lm=lm) + + +class ExtractInvoiceHeader(dspy.Signature): + """Extract invoice header fields from document text.""" + document_text: str = dspy.InputField(desc="Raw text from the document") + invoice_number: str = dspy.OutputField(desc="Invoice number, or null") + vendor_name: str = dspy.OutputField(desc="Vendor/supplier name, or null") + issue_date: str = dspy.OutputField(desc="Date as dd/mm/yyyy, or null") + total_amount: float = dspy.OutputField(desc="Total amount as float, or null") + + +class ClassifyDocument(dspy.Signature): + """Classify a document into one of the provided categories.""" + document_text: str = dspy.InputField() + category: str = dspy.OutputField( + desc="One of: invoice, purchase_order, receipt, contract, other" + ) + + +class DocumentPipeline(dspy.Module): + def __init__(self): + self.classify = dspy.Predict(ClassifyDocument) + self.extract = dspy.Predict(ExtractInvoiceHeader) + + def forward(self, document_text: str): + doc_type = self.classify(document_text=document_text).category + if doc_type == "invoice": + header = self.extract(document_text=document_text) + return {"doc_type": doc_type, "header": header.__dict__} + return {"doc_type": doc_type, "header": None} + + +pipeline = DocumentPipeline() +``` + +### Wrap and Register with MLflow + +```python +import mlflow, json + +class DSPyDocumentAgent(mlflow.pyfunc.PythonModel): + def load_context(self, context): + import dspy, yaml + cfg = yaml.safe_load(open(context.artifacts["config"])) + lm = dspy.LM(model=f"databricks/{cfg['models']['default']}") + dspy.configure(lm=lm) + self.pipeline = DocumentPipeline() + + def predict(self, context, model_input): + text = model_input.iloc[0]["document_text"] + return json.dumps(self.pipeline(document_text=text), ensure_ascii=False) + +mlflow.set_registry_uri("databricks-uc") +with mlflow.start_run(): + mlflow.pyfunc.log_model( + artifact_path="document_agent", + python_model=DSPyDocumentAgent(), + artifacts={"config": "config.yml"}, + registered_model_name="my_catalog.document_processing.document_agent", + ) +``` + +--- + +## Tips + +1. **Parse first, enrich second** — always run `ai_parse_document` as the first stage. Feed its text output to task-specific functions; never pass raw binary to `ai_query`. +2. **Flat fields → `ai_extract`; nested arrays → `ai_query`** — this is the clearest decision boundary. +3. **`failOnError => false` is mandatory in batch** — write errors to a sidecar `_errors` table rather than crashing the pipeline. +4. **Truncate before sending to `ai_query`** — use `LEFT(text, 6000)` or chunk long documents to stay within context window limits. +5. **Prompts belong in `config.yml`** — never hardcode prompt strings in pipeline code. A prompt change should be a config change, not a code change. +6. **DSPy for agents** — when migrating from LangChain agent-based tools, DSPy typed `Signature` classes give you structured I/O contracts, testability, and optional prompt compilation/optimization. diff --git a/databricks-skills/databricks-ai-functions/SKILL.md b/databricks-skills/databricks-ai-functions/SKILL.md new file mode 100644 index 00000000..e3fc3fbb --- /dev/null +++ b/databricks-skills/databricks-ai-functions/SKILL.md @@ -0,0 +1,195 @@ +--- +name: databricks-ai-functions +description: "Use Databricks built-in AI Functions (ai_classify, ai_extract, ai_summarize, ai_mask, ai_translate, ai_fix_grammar, ai_gen, ai_analyze_sentiment, ai_similarity, ai_parse_document, ai_query, ai_forecast) to add AI capabilities directly to SQL and PySpark pipelines without managing model endpoints. Also covers document parsing and building custom RAG pipelines (parse → chunk → index → query)." +--- + +# Databricks AI Functions + +> **Official Docs:** https://docs.databricks.com/aws/en/large-language-models/ai-functions +> Individual function reference: https://docs.databricks.com/aws/en/sql/language-manual/functions/ + +## Overview + +Databricks AI Functions are built-in SQL and PySpark functions that call Foundation Model APIs directly from your data pipelines — no model endpoint setup, no API keys, no boilerplate. They operate on table columns as naturally as `UPPER()` or `LENGTH()`, and are optimized for batch inference at scale. + +There are three categories: + +| Category | Functions | Use when | +|---|---|---| +| **Task-specific** | `ai_analyze_sentiment`, `ai_classify`, `ai_extract`, `ai_fix_grammar`, `ai_gen`, `ai_mask`, `ai_similarity`, `ai_summarize`, `ai_translate`, `ai_parse_document` | The task is well-defined — prefer these always | +| **General-purpose** | `ai_query` | Complex nested JSON, custom endpoints, multimodal — **last resort only** | +| **Table-valued** | `ai_forecast` | Time series forecasting | + +**Function selection rule — always prefer a task-specific function over `ai_query`:** + +| Task | Use this | Fall back to `ai_query` when... | +|---|---|---| +| Sentiment scoring | `ai_analyze_sentiment` | Never | +| Fixed-label routing | `ai_classify` (2–20 labels) | Never | +| Flat entity extraction | `ai_extract` | Output schema has nested arrays | +| Summarization | `ai_summarize` | Never — use `max_words=0` for uncapped | +| Grammar correction | `ai_fix_grammar` | Never | +| Translation | `ai_translate` | Target language not in the supported list | +| PII redaction | `ai_mask` | Never | +| Free-form generation | `ai_gen` | Need structured JSON output | +| Semantic similarity | `ai_similarity` | Never | +| PDF / document parsing | `ai_parse_document` | Need image-level reasoning | +| Complex JSON / reasoning | — | **This is the intended use case for `ai_query`** | + +## Prerequisites + +- Databricks SQL warehouse (**not Classic**) or cluster with DBR **15.1+** +- DBR **15.4 ML LTS** recommended for batch workloads +- DBR **17.1+** required for `ai_parse_document` +- `ai_forecast` requires a **Pro or Serverless** SQL warehouse +- Workspace in a supported AWS/Azure region for batch AI inference +- Models run under Apache 2.0 or LLAMA 3.3 Community License — customers are responsible for compliance + +## Quick Start + +Classify, extract, and score sentiment from a text column in a single query: + +```sql +SELECT + ticket_id, + ticket_text, + ai_classify(ticket_text, ARRAY('urgent', 'not urgent', 'spam')) AS priority, + ai_extract(ticket_text, ARRAY('product', 'error_code', 'date')) AS entities, + ai_analyze_sentiment(ticket_text) AS sentiment +FROM support_tickets; +``` + +```python +from pyspark.sql.functions import expr + +df = spark.table("support_tickets") +df = ( + df.withColumn("priority", expr("ai_classify(ticket_text, array('urgent', 'not urgent', 'spam'))")) + .withColumn("entities", expr("ai_extract(ticket_text, array('product', 'error_code', 'date'))")) + .withColumn("sentiment", expr("ai_analyze_sentiment(ticket_text)")) +) +# Access nested STRUCT fields from ai_extract +df.select("ticket_id", "priority", "sentiment", + "entities.product", "entities.error_code", "entities.date").display() +``` + +## Common Patterns + +### Pattern 1: Text Analysis Pipeline + +Chain multiple task-specific functions to enrich a text column in one pass: + +```sql +SELECT + id, + content, + ai_analyze_sentiment(content) AS sentiment, + ai_summarize(content, 30) AS summary, + ai_classify(content, + ARRAY('technical', 'billing', 'other')) AS category, + ai_fix_grammar(content) AS content_clean +FROM raw_feedback; +``` + +### Pattern 2: PII Redaction Before Storage + +```python +from pyspark.sql.functions import expr + +df_clean = ( + spark.table("raw_messages") + .withColumn( + "message_safe", + expr("ai_mask(message, array('person', 'email', 'phone', 'address'))") + ) +) +df_clean.write.format("delta").mode("append").saveAsTable("catalog.schema.messages_safe") +``` + +### Pattern 3: Document Ingestion from a Unity Catalog Volume + +Parse PDFs/Office docs, then enrich with task-specific functions: + +```python +from pyspark.sql.functions import expr + +df = ( + spark.read.format("binaryFile") + .load("/Volumes/catalog/schema/landing/documents/") + .withColumn("parsed", expr("ai_parse_document(content)")) + .selectExpr("path", + "parsed:pages[*].elements[*].content AS text_blocks", + "parsed:error AS parse_error") + .filter("parse_error IS NULL") + .withColumn("summary", expr("ai_summarize(text_blocks, 50)")) + .withColumn("entities", expr("ai_extract(text_blocks, array('date', 'amount', 'vendor'))")) +) +``` + +### Pattern 4: Semantic Matching / Deduplication + +```sql +-- Find near-duplicate company names +SELECT a.id, b.id, ai_similarity(a.name, b.name) AS score +FROM companies a +JOIN companies b ON a.id < b.id +WHERE ai_similarity(a.name, b.name) > 0.85; +``` + +### Pattern 5: Complex JSON Extraction with `ai_query` (last resort) + +Use only when the output schema has nested arrays or requires multi-step reasoning that no task-specific function handles: + +```python +from pyspark.sql.functions import expr, from_json, col + +df = ( + spark.table("parsed_documents") + .withColumn("ai_response", expr(""" + ai_query( + 'databricks-claude-sonnet-4', + concat('Extract invoice as JSON with nested itens array: ', text_blocks), + responseFormat => '{"type":"json_object"}', + failOnError => false + ) + """)) + .withColumn("invoice", from_json( + col("ai_response.response"), + "STRUCT>>" + )) +) +``` + +### Pattern 6: Time Series Forecasting + +```sql +SELECT * +FROM ai_forecast( + observed => TABLE(SELECT date, sales FROM daily_sales), + horizon => '2026-12-31', + time_col => 'date', + value_col => 'sales' +); +-- Returns: date, sales_forecast, sales_upper, sales_lower +``` + +## Reference Files + +- [1-task-functions.md](1-task-functions.md) — Full syntax, parameters, SQL + PySpark examples for all 9 task-specific functions (`ai_analyze_sentiment`, `ai_classify`, `ai_extract`, `ai_fix_grammar`, `ai_gen`, `ai_mask`, `ai_similarity`, `ai_summarize`, `ai_translate`) and `ai_parse_document` +- [2-ai-query.md](2-ai-query.md) — `ai_query` complete reference: all parameters, structured output with `responseFormat`, multimodal `files =>`, UDF patterns, and error handling +- [3-ai-forecast.md](3-ai-forecast.md) — `ai_forecast` parameters, single-metric, multi-group, multi-metric, and confidence interval patterns +- [4-document-processing-pipeline.md](4-document-processing-pipeline.md) — End-to-end batch document processing pipeline using AI Functions in a Lakeflow Declarative Pipeline; includes `config.yml` centralization, function selection logic, custom RAG pipeline (parse → chunk → Vector Search), and DSPy/LangChain guidance for near-real-time variants + +## Common Issues + +| Issue | Solution | +|---|---| +| `ai_parse_document` not found | Requires DBR **17.1+**. Check cluster runtime. | +| `ai_forecast` fails | Requires **Pro or Serverless** SQL warehouse — not available on Classic or Starter. | +| All functions return NULL | Input column is NULL. Filter with `WHERE col IS NOT NULL` before calling. | +| `ai_translate` fails for a language | Supported: English, German, French, Italian, Portuguese, Hindi, Spanish, Thai. Use `ai_query` with a multilingual model for others. | +| `ai_classify` returns unexpected labels | Use clear, mutually exclusive label names. Fewer labels (2–5) produces more reliable results. | +| `ai_query` raises on some rows in a batch job | Add `failOnError => false` — returns a STRUCT with `.response` and `.error` instead of raising. | +| Batch job runs slowly | Use DBR **15.4 ML LTS** cluster (not serverless or interactive) for optimized batch inference throughput. | +| Want to swap models without editing pipeline code | Store all model names and prompts in `config.yml` — see [4-document-processing-pipeline.md](4-document-processing-pipeline.md) for the pattern. | diff --git a/databricks-skills/databricks-parsing/SKILL.md b/databricks-skills/databricks-parsing/SKILL.md deleted file mode 100644 index 96b00b32..00000000 --- a/databricks-skills/databricks-parsing/SKILL.md +++ /dev/null @@ -1,275 +0,0 @@ ---- -name: databricks-parsing -description: "Parse documents (PDF, DOCX, PPTX, images) using ai_parse_document, or build custom RAG pipelines. Use when the user asks to parse documents or build a custom RAG." ---- - -# Databricks Document Parsing - -Parse unstructured documents into structured text using `ai_parse_document` — the foundation for document processing and custom RAG pipelines on Databricks. - -## When to Use - -Use this skill when: -- Parsing PDFs, DOCX, PPTX, or images into text -- Extracting structured data from unstructured documents -- Building a custom RAG pipeline (parse → chunk → index → query) -- Ingesting documents from Unity Catalog Volumes for search or analysis - -## Overview - -`ai_parse_document` is a SQL AI function that extracts content from binary documents. It runs on serverless SQL warehouses and supports PDF, DOC/DOCX, PPT/PPTX, JPG/JPEG, and PNG. - -| Aspect | Detail | -|--------|--------| -| **Function** | `ai_parse_document(content)` or `ai_parse_document(content, options)` | -| **Input** | Binary document content (from `read_files` with `format => 'binaryFile'`) | -| **Output** | VARIANT with `document.pages[]`, `document.elements[]`, `metadata` | -| **Requirements** | Databricks Runtime 17.1+, Serverless SQL Warehouse | -| **Tool** | Use via `execute_sql` — no dedicated MCP tool needed | - -## Quick Start - -Parse all documents in a Volume: - -```sql -SELECT - path, - ai_parse_document(content) AS parsed -FROM read_files('/Volumes/catalog/schema/volume/docs/', format => 'binaryFile'); -``` - -## Common Patterns - -### Pattern 1: Parse with Options - -```sql -SELECT ai_parse_document( - content, - map( - 'version', '2.0', - 'imageOutputPath', '/Volumes/catalog/schema/volume/images/', - 'descriptionElementTypes', '*' - ) -) AS parsed -FROM read_files('/Volumes/catalog/schema/volume/invoices/', format => 'binaryFile'); -``` - -**Options:** - -| Key | Values | Description | -|-----|--------|-------------| -| `version` | `'2.0'` | Output schema version | -| `imageOutputPath` | Volume path | Save rendered page images | -| `descriptionElementTypes` | `''`, `'figure'`, `'*'` | AI-generated descriptions (default: `'*'` for all) | - -### Pattern 2: Parse + Extract Structured Data - -Combine `ai_parse_document` with `ai_query` to extract specific fields. -Use `transform()` + `try_cast()` to concatenate element text, then pass -the full text to `ai_query` with `returnType => 'STRING'`. - -```sql -WITH parsed_documents AS ( - SELECT - path, - ai_parse_document(content) AS parsed - FROM read_files('/Volumes/catalog/schema/volume/invoices/', format => 'binaryFile') -), -parsed_text AS ( - SELECT - path, - concat_ws('\n\n', - transform( - try_cast(parsed:document:elements AS ARRAY), - element -> try_cast(element:content AS STRING) - ) - ) AS text - FROM parsed_documents - WHERE try_cast(parsed:error_status AS STRING) IS NULL -) -SELECT - path, - ai_query( - 'databricks-claude-sonnet-4', - concat( - 'Extract vendor name, invoice number, and total due from this document. ', - 'Return the result as a JSON object with keys: vendor, invoice_number, total_due. ', - text - ), - returnType => 'STRING' - ) AS structured_data -FROM parsed_text -WHERE text IS NOT NULL; -``` - -### Pattern 3: Custom RAG Pipeline - -End-to-end: parse documents → chunk text → store in Delta table → create Vector Search index. - -**Step 1 — Parse and chunk into a Delta table:** - -`ai_parse_document` returns a VARIANT. You must use `variant_get` with an explicit -`ARRAY` cast before calling `explode`, since `explode()` does not accept -raw VARIANT values. - -```sql -CREATE OR REPLACE TABLE catalog.schema.parsed_chunks AS -WITH parsed AS ( - SELECT - path, - ai_parse_document(content) AS doc - FROM read_files('/Volumes/catalog/schema/volume/docs/', format => 'binaryFile') -), -elements AS ( - SELECT - path, - explode(variant_get(doc, '$.document.elements', 'ARRAY')) AS element - FROM parsed -) -SELECT - md5(concat(path, variant_get(element, '$.content', 'STRING'))) AS chunk_id, - path AS source_path, - variant_get(element, '$.content', 'STRING') AS content, - variant_get(element, '$.type', 'STRING') AS element_type, - current_timestamp() AS parsed_at -FROM elements -WHERE variant_get(element, '$.content', 'STRING') IS NOT NULL - AND length(trim(variant_get(element, '$.content', 'STRING'))) > 10; -``` - -**Step 1a (production) — Incremental parsing with Structured Streaming:** - -For production pipelines where new documents arrive over time, use Structured -Streaming with checkpoints for exactly-once processing. Each run processes only -new files (tracked via checkpoints), then stops with `trigger(availableNow=True)`. - -See the official bundle example: -[databricks/bundle-examples/contrib/job_with_ai_parse_document](https://github.com/databricks/bundle-examples/tree/main/contrib/job_with_ai_parse_document) - -**Stage 1 — Parse raw documents (streaming):** - -```python -from pyspark.sql.functions import col, current_timestamp, expr - -files_df = ( - spark.readStream.format("binaryFile") - .option("pathGlobFilter", "*.{pdf,jpg,jpeg,png}") - .option("recursiveFileLookup", "true") - .load("/Volumes/catalog/schema/volume/docs/") -) - -parsed_df = ( - files_df - .repartition(8, expr("crc32(path) % 8")) - .withColumn("parsed", expr(""" - ai_parse_document(content, map( - 'version', '2.0', - 'descriptionElementTypes', '*' - )) - """)) - .withColumn("parsed_at", current_timestamp()) - .select("path", "parsed", "parsed_at") -) - -( - parsed_df.writeStream.format("delta") - .outputMode("append") - .option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/01_parse") - .option("mergeSchema", "true") - .trigger(availableNow=True) - .toTable("catalog.schema.parsed_documents_raw") -) -``` - -**Stage 2 — Extract text from parsed VARIANT (streaming):** - -Uses `transform()` to extract element content from the VARIANT array, and -`try_cast` for safe access. Error rows are preserved but flagged. - -```python -from pyspark.sql.functions import col, concat_ws, expr, lit, when - -parsed_stream = spark.readStream.format("delta").table("catalog.schema.parsed_documents_raw") - -text_df = ( - parsed_stream - .withColumn("text", - when( - expr("try_cast(parsed:error_status AS STRING)").isNotNull(), lit(None) - ).otherwise( - concat_ws("\n\n", expr(""" - transform( - try_cast(parsed:document:elements AS ARRAY), - element -> try_cast(element:content AS STRING) - ) - """)) - ) - ) - .withColumn("error_status", expr("try_cast(parsed:error_status AS STRING)")) - .select("path", "text", "error_status", "parsed_at") -) - -( - text_df.writeStream.format("delta") - .outputMode("append") - .option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/02_text") - .option("mergeSchema", "true") - .trigger(availableNow=True) - .toTable("catalog.schema.parsed_documents_text") -) -``` - -Key techniques from the official example: -- **`repartition` by file hash** — parallelizes `ai_parse_document` across workers -- **`trigger(availableNow=True)`** — processes all pending files then stops (batch-like) -- **Checkpoints** — exactly-once guarantee; no re-parsing on re-runs -- **`transform()` + `try_cast`** — safer than `explode` + `variant_get` for text extraction -- **Three-stage pipeline** — separate parse/text/structured stages with independent checkpoints - -**Step 1b — Enable Change Data Feed (required for Vector Search Delta Sync):** - -```sql -ALTER TABLE catalog.schema.parsed_chunks -SET TBLPROPERTIES (delta.enableChangeDataFeed = true); -``` - -**Step 2 — Create a Vector Search index and query it:** - -Use the **[databricks-vector-search](../databricks-vector-search/SKILL.md)** skill to create a -Delta Sync index on the chunked table and query it. Ensure CDF is enabled first -(Step 1b above). - -## Output Schema - -`ai_parse_document` returns a VARIANT with this structure: - -``` -document -├── pages[] -- page id, image_uri -└── elements[] -- extracted content - ├── type -- "text", "table", "figure", etc. - ├── content -- extracted text - ├── bbox -- bounding box coordinates - └── description -- AI-generated description -metadata -- file info, schema version -error_status[] -- errors per page (if any) -``` - -## Common Issues - -| Issue | Solution | -|-------|----------| -| **Function not available** | Requires Runtime 17.1+ and Serverless SQL Warehouse | -| **Region not supported** | US/EU regions, or enable cross-geography routing | -| **Large documents** | Use `LIMIT` during development to control costs | -| **`explode()` fails with VARIANT** | `explode()` requires ARRAY, not VARIANT. Use `variant_get(doc, '$.document.elements', 'ARRAY')` to cast before exploding | -| **Short/noisy chunks** | Filter with `length(trim(...)) > 10` — parsing produces tiny fragments (page numbers, headers) that pollute the index | -| **`ai_query` returns markdown fences** | Use `returnType => 'STRING'` for clean output. If fences still appear, strip with `regexp_replace(result, '```(json)?\\s*|```', '')` | -| **Re-parsing unchanged documents** | Use Structured Streaming with checkpoints — see Pattern 3, Step 1a | - -## Related Skills - -- **[databricks-vector-search](../databricks-vector-search/SKILL.md)** — Create indexes and query embeddings (Step 2 of RAG) -- **[databricks-agent-bricks](../databricks-agent-bricks/SKILL.md)** — Pre-built Knowledge Assistants (out-of-the-box RAG without custom parsing) -- **[databricks-spark-declarative-pipelines](../databricks-spark-declarative-pipelines/SKILL.md)** — Production pipelines for batch document processing -- **[databricks-dbsql](../databricks-dbsql/SKILL.md)** — Full AI functions reference including `ai_query`, `ai_extract`, `ai_classify` diff --git a/databricks-skills/install_skills.sh b/databricks-skills/install_skills.sh index 6220195e..652f315f 100755 --- a/databricks-skills/install_skills.sh +++ b/databricks-skills/install_skills.sh @@ -42,7 +42,7 @@ MLFLOW_REPO_RAW_URL="https://raw.githubusercontent.com/mlflow/skills" MLFLOW_REPO_REF="main" # Databricks skills (hosted in this repo) -DATABRICKS_SKILLS="databricks-agent-bricks databricks-aibi-dashboards databricks-asset-bundles databricks-app-python databricks-config databricks-dbsql databricks-docs databricks-genie databricks-iceberg databricks-jobs databricks-lakebase-autoscale databricks-lakebase-provisioned databricks-metric-views databricks-mlflow-evaluation databricks-model-serving databricks-parsing databricks-python-sdk databricks-spark-declarative-pipelines databricks-spark-structured-streaming databricks-synthetic-data-gen databricks-unity-catalog databricks-unstructured-pdf-generation databricks-vector-search databricks-zerobus-ingest spark-python-data-source" +DATABRICKS_SKILLS="databricks-agent-bricks databricks-ai-functions databricks-aibi-dashboards databricks-asset-bundles databricks-app-python databricks-config databricks-dbsql databricks-docs databricks-genie databricks-iceberg databricks-jobs databricks-lakebase-autoscale databricks-lakebase-provisioned databricks-metric-views databricks-mlflow-evaluation databricks-model-serving databricks-python-sdk databricks-spark-declarative-pipelines databricks-spark-structured-streaming databricks-synthetic-data-gen databricks-unity-catalog databricks-unstructured-pdf-generation databricks-vector-search databricks-zerobus-ingest spark-python-data-source" # MLflow skills (fetched from mlflow/skills repo) MLFLOW_SKILLS="agent-evaluation analyze-mlflow-chat-session analyze-mlflow-trace instrumenting-with-mlflow-tracing mlflow-onboarding querying-mlflow-metrics retrieving-mlflow-traces searching-mlflow-docs" @@ -63,6 +63,7 @@ get_skill_description() { case "$1" in # Databricks skills "databricks-agent-bricks") echo "Knowledge Assistants, Genie Spaces, Supervisor Agents" ;; + "databricks-ai-functions") echo "Built-in AI Functions (classify, extract, query, forecast, parse, etc.), doc processing & custom RAG" ;; "databricks-aibi-dashboards") echo "Databricks AI/BI Dashboards - create and manage dashboards" ;; "databricks-asset-bundles") echo "Databricks Asset Bundles - deployment and configuration" ;; "databricks-app-python") echo "Databricks Apps with Python (Dash, Streamlit)" ;; @@ -78,7 +79,6 @@ get_skill_description() { "databricks-lakebase-provisioned") echo "Lakebase Provisioned - data connections and reverse ETL" ;; "databricks-metric-views") echo "Unity Catalog Metric Views - governed business metrics in YAML" ;; "databricks-model-serving") echo "Model Serving - deploy MLflow models and AI agents" ;; - "databricks-parsing") echo "Document parsing with ai_parse_document and custom RAG pipelines" ;; "databricks-mlflow-evaluation") echo "MLflow evaluation and trace analysis" ;; "databricks-spark-declarative-pipelines") echo "Spark Declarative Pipelines (SDP/LDP/DLT)" ;; "spark-python-data-source") echo "Spark custom Python data sources" ;; @@ -106,6 +106,7 @@ get_skill_description() { get_skill_extra_files() { case "$1" in "databricks-agent-bricks") echo "1-knowledge-assistants.md 2-supervisor-agents.md" ;; + "databricks-ai-functions") echo "1-task-functions.md 2-ai-query.md 3-ai-forecast.md 4-document-processing-pipeline.md" ;; "databricks-aibi-dashboards") echo "widget-reference.md sql-patterns.md" ;; "databricks-genie") echo "spaces.md conversation.md" ;; "databricks-asset-bundles") echo "alerts_guidance.md SDP_guidance.md" ;; diff --git a/install.ps1 b/install.ps1 index 9b1d9db4..955675e4 100644 --- a/install.ps1 +++ b/install.ps1 @@ -82,7 +82,7 @@ $script:Skills = @( "databricks-agent-bricks", "databricks-aibi-dashboards", "databricks-app-python", "databricks-asset-bundles", "databricks-config", "databricks-dbsql", "databricks-docs", "databricks-genie", "databricks-iceberg", "databricks-jobs", "databricks-lakebase-autoscale", "databricks-lakebase-provisioned", - "databricks-metric-views", "databricks-mlflow-evaluation", "databricks-model-serving", "databricks-parsing", + "databricks-metric-views", "databricks-mlflow-evaluation", "databricks-model-serving", "databricks-ai-functions", "databricks-python-sdk", "databricks-spark-declarative-pipelines", "databricks-spark-structured-streaming", "databricks-synthetic-data-gen", "databricks-unity-catalog", "databricks-unstructured-pdf-generation", "databricks-vector-search", "databricks-zerobus-ingest", "spark-python-data-source" @@ -114,7 +114,7 @@ $script:ProfileAnalyst = @( ) $script:ProfileAiMlEngineer = @( "databricks-agent-bricks", "databricks-vector-search", "databricks-model-serving", - "databricks-genie", "databricks-parsing", "databricks-unstructured-pdf-generation", + "databricks-genie", "databricks-ai-functions", "databricks-unstructured-pdf-generation", "databricks-mlflow-evaluation", "databricks-synthetic-data-gen", "databricks-jobs" ) $script:ProfileAiMlMlflow = @( @@ -1114,7 +1114,7 @@ function Invoke-PromptCustomSkills { @{ Label = "Vector Search"; Value = "databricks-vector-search"; State = ($preselected -contains "databricks-vector-search"); Hint = "Similarity search" } @{ Label = "Model Serving"; Value = "databricks-model-serving"; State = ($preselected -contains "databricks-model-serving"); Hint = "Deploy models/agents" } @{ Label = "MLflow Evaluation"; Value = "databricks-mlflow-evaluation"; State = ($preselected -contains "databricks-mlflow-evaluation"); Hint = "Model evaluation" } - @{ Label = "Parsing"; Value = "databricks-parsing"; State = ($preselected -contains "databricks-parsing"); Hint = "Document parsing for RAG" } + @{ Label = "AI Functions"; Value = "databricks-ai-functions"; State = ($preselected -contains "databricks-ai-functions"); Hint = "AI Functions, document parsing & RAG" } @{ Label = "Unstructured PDF"; Value = "databricks-unstructured-pdf-generation"; State = ($preselected -contains "databricks-unstructured-pdf-generation"); Hint = "Synthetic PDFs for RAG" } @{ Label = "Synthetic Data"; Value = "databricks-synthetic-data-gen"; State = ($preselected -contains "databricks-synthetic-data-gen"); Hint = "Generate test data" } @{ Label = "Lakebase Autoscale"; Value = "databricks-lakebase-autoscale"; State = ($preselected -contains "databricks-lakebase-autoscale"); Hint = "Managed PostgreSQL" } diff --git a/install.sh b/install.sh index e83b5df8..4c36df55 100755 --- a/install.sh +++ b/install.sh @@ -88,7 +88,7 @@ MIN_SDK_VERSION="0.85.0" G='\033[0;32m' Y='\033[1;33m' R='\033[0;31m' BL='\033[0;34m' B='\033[1m' D='\033[2m' N='\033[0m' # Databricks skills (bundled in repo) -SKILLS="databricks-agent-bricks databricks-aibi-dashboards databricks-app-python databricks-asset-bundles databricks-config databricks-dbsql databricks-docs databricks-genie databricks-iceberg databricks-jobs databricks-lakebase-autoscale databricks-lakebase-provisioned databricks-metric-views databricks-mlflow-evaluation databricks-model-serving databricks-parsing databricks-python-sdk databricks-spark-declarative-pipelines databricks-spark-structured-streaming databricks-synthetic-data-gen databricks-unity-catalog databricks-unstructured-pdf-generation databricks-vector-search databricks-zerobus-ingest spark-python-data-source" +SKILLS="databricks-agent-bricks databricks-ai-functions databricks-aibi-dashboards databricks-app-python databricks-asset-bundles databricks-config databricks-dbsql databricks-docs databricks-genie databricks-iceberg databricks-jobs databricks-lakebase-autoscale databricks-lakebase-provisioned databricks-metric-views databricks-mlflow-evaluation databricks-model-serving databricks-python-sdk databricks-spark-declarative-pipelines databricks-spark-structured-streaming databricks-synthetic-data-gen databricks-unity-catalog databricks-unstructured-pdf-generation databricks-vector-search databricks-zerobus-ingest spark-python-data-source" # MLflow skills (fetched from mlflow/skills repo) MLFLOW_SKILLS="agent-evaluation analyze-mlflow-chat-session analyze-mlflow-trace instrumenting-with-mlflow-tracing mlflow-onboarding querying-mlflow-metrics retrieving-mlflow-traces searching-mlflow-docs" @@ -105,7 +105,7 @@ CORE_SKILLS="databricks-config databricks-docs databricks-python-sdk databricks- # Profile definitions (non-core skills only — core skills are always added) PROFILE_DATA_ENGINEER="databricks-spark-declarative-pipelines databricks-spark-structured-streaming databricks-jobs databricks-asset-bundles databricks-dbsql databricks-iceberg databricks-zerobus-ingest spark-python-data-source databricks-metric-views databricks-synthetic-data-gen" PROFILE_ANALYST="databricks-aibi-dashboards databricks-dbsql databricks-genie databricks-metric-views" -PROFILE_AIML_ENGINEER="databricks-agent-bricks databricks-vector-search databricks-model-serving databricks-genie databricks-parsing databricks-unstructured-pdf-generation databricks-mlflow-evaluation databricks-synthetic-data-gen databricks-jobs" +PROFILE_AIML_ENGINEER="databricks-agent-bricks databricks-ai-functions databricks-vector-search databricks-model-serving databricks-genie databricks-unstructured-pdf-generation databricks-mlflow-evaluation databricks-synthetic-data-gen databricks-jobs" PROFILE_AIML_MLFLOW="agent-evaluation analyze-mlflow-chat-session analyze-mlflow-trace instrumenting-with-mlflow-tracing mlflow-onboarding querying-mlflow-metrics retrieving-mlflow-traces searching-mlflow-docs" PROFILE_APP_DEVELOPER="databricks-app-python databricks-app-apx databricks-lakebase-autoscale databricks-lakebase-provisioned databricks-model-serving databricks-dbsql databricks-jobs databricks-asset-bundles" @@ -912,7 +912,7 @@ prompt_custom_skills() { "Vector Search|databricks-vector-search|$(_is_preselected databricks-vector-search)|Similarity search" \ "Model Serving|databricks-model-serving|$(_is_preselected databricks-model-serving)|Deploy models/agents" \ "MLflow Evaluation|databricks-mlflow-evaluation|$(_is_preselected databricks-mlflow-evaluation)|Model evaluation" \ - "Parsing|databricks-parsing|$(_is_preselected databricks-parsing)|Document parsing for RAG" \ + "AI Functions|databricks-ai-functions|$(_is_preselected databricks-ai-functions)|AI Functions, document parsing & RAG" \ "Unstructured PDF|databricks-unstructured-pdf-generation|$(_is_preselected databricks-unstructured-pdf-generation)|Synthetic PDFs for RAG" \ "Synthetic Data|databricks-synthetic-data-gen|$(_is_preselected databricks-synthetic-data-gen)|Generate test data" \ "Lakebase Autoscale|databricks-lakebase-autoscale|$(_is_preselected databricks-lakebase-autoscale)|Managed PostgreSQL" \