DataHubDocuments
DataHub Documents Source
Process Document entities from DataHub and generate semantic embeddings for semantic search.
Important Capabilities
| Capability | Status | Notes |
|---|---|---|
| Detect Deleted Entities | ✅ | Enabled by default via stateful ingestion. |
Extract Document entities from DataHub and generate semantic embeddings for semantic search.
Supports batch mode (GraphQL) and event-driven mode (Kafka MCL) with incremental processing. Automatically fetches embedding configuration from server to ensure alignment.
To enable automatic semantic search indexing for your documents, deploy this source to DataHub with a simple command:
# Create minimal recipe and deploy with hourly schedule
cat > /tmp/datahub-docs.yml << 'EOF'
source:
type: datahub-documents
config: {}
EOF
datahub ingest deploy -c /tmp/datahub-docs.yml --name "document-embeddings" --schedule "0 * * * *"
This creates a managed ingestion source in DataHub that automatically processes documents every hour and generates embeddings for semantic search.
What this does:
- ✅ Deploys ingestion recipe to DataHub
- ✅ Runs hourly (cron:
0 * * * *) to keep embeddings up-to-date - ✅ Uses event-driven mode (only processes changed documents)
- ✅ Auto-configures from server (no manual embedding setup needed)
Alternative schedules:
# Every 15 minutes: "*/15 * * * *"
# Every 6 hours: "0 */6 * * *"
# Daily at 2 AM: "0 2 * * *"
Note: In future DataHub versions, GMS will run this automatically. For now, manual deployment is required.
Overview
The DataHub Documents source processes Document entities already stored in DataHub and enriches them with semantic embeddings for semantic search. This source is designed to work with DataHub's native Document entities that have been created via GraphQL, Python SDK, or other ingestion sources (like Notion, Confluence, etc.).
Key Features
1. Smart Defaults with Server Configuration Sync
The source automatically fetches embedding configuration from your DataHub server, ensuring perfect alignment:
- Connects using environment variables (
DATAHUB_GMS_URL,DATAHUB_GMS_TOKEN) - Fetches embedding provider config (provider, model, region, dimensions)
- Validates local config against server (if provided)
- Works with minimal
config: {}in your recipe
2. Dual Operating Modes
Event-Driven Mode (Recommended):
- Processes documents in real-time from Kafka MCL events
- Only reprocesses when document content changes
- Efficient for continuous updates
- Automatically falls back to batch mode on first run
Batch Mode:
- Fetches all documents via GraphQL
- Good for initial setup or periodic full refreshes
- Processes all matching documents
3. Incremental Processing
- Tracks document content hashes to skip unchanged documents
- Uses DataHub's stateful ingestion framework
- Reduces processing time and API costs
- Force reprocess option available
4. Flexible Platform Filtering
- Process all NATIVE documents (default)
- Filter by specific platforms (e.g.,
["notion", "confluence"]) - Process all documents regardless of source type
Prerequisites
1. DataHub Server Configuration
You MUST configure semantic search on your DataHub server before using this source.
See the Semantic Search Configuration Guide for complete setup instructions.
Required server configuration:
- OpenSearch 2.17+ with k-NN plugin enabled
- AWS Bedrock or Cohere embedding provider configured
- Semantic search enabled in
application.yml
2. Document Entities in DataHub
This source processes existing Document entities. Documents can be created through:
- GraphQL API: Create documents programmatically
- Python SDK: Use
datahub.sdk.document.Document - External Sources: Notion, Confluence, SharePoint sources that emit Document entities
3. AWS Credentials (for Bedrock)
If using AWS Bedrock for embeddings:
- IAM permissions for
bedrock:InvokeModel - AWS credentials configured (env vars, instance role, or ECS task role)
- Bedrock model access enabled in AWS Console
4. Environment Variables
# Required for DataHub connection
export DATAHUB_GMS_URL="http://localhost:8080"
export DATAHUB_GMS_TOKEN="your-token-here"
# Optional: AWS credentials (if not using instance/task roles)
export AWS_PROFILE="datahub-dev"
# OR
export AWS_ACCESS_KEY_ID="your-key"
export AWS_SECRET_ACCESS_KEY="your-secret"
export AWS_REGION="us-west-2"
Common Use Cases
1. Event-Driven Processing (Production)
Process documents in real-time as they're created or updated:
source:
type: datahub-documents
config:
# Event mode enabled by default in recent versions
event_mode:
enabled: true
idle_timeout_seconds: 60
sink:
type: datahub-rest
config: {}
When to use:
- Production deployments
- Continuous document updates
- Real-time semantic search needs
2. Batch Processing (Initial Load)
Process all documents in a single run:
source:
type: datahub-documents
config:
event_mode:
enabled: false
# Optional: Process specific platforms
platform_filter: ["notion", "confluence"]
sink:
type: datahub-rest
config: {}
When to use:
- Initial setup
- Periodic full refreshes
- Backfilling embeddings
3. Platform-Specific Processing
Process documents from specific platforms only:
source:
type: datahub-documents
config:
# Process NATIVE documents + EXTERNAL from these platforms
platform_filter: ["notion", "confluence"]
incremental:
enabled: true
sink:
type: datahub-rest
config: {}
4. Force Reprocessing
Reprocess all documents regardless of content changes:
source:
type: datahub-documents
config:
incremental:
enabled: true
force_reprocess: true # Reprocess everything
# Useful when:
# - Changing chunking strategy
# - Updating embedding model
# - Fixing processing issues
sink:
type: datahub-rest
config: {}
5. Custom Chunking and Embedding
Override server configuration with local settings:
source:
type: datahub-documents
config:
# Custom chunking
chunking:
strategy: by_title # or 'basic'
max_characters: 1000 # Larger chunks
combine_text_under_n_chars: 200
# Override embedding config (validates against server)
embedding:
provider: bedrock
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3
aws_region: us-west-2
batch_size: 50
sink:
type: datahub-rest
config: {}
⚠️ Warning: Custom embedding configs are validated against the server. Mismatches will cause errors.
How It Works
Processing Pipeline
1. Fetch Mode Selection
├─ Event Mode: Subscribe to Kafka MCL events
└─ Batch Mode: GraphQL query for all documents
2. For Each Document:
├─ Check incremental state (skip if unchanged)
├─ Partition markdown → structured elements
├─ Chunk elements → semantic chunks
│ ├─ by_title: Preserves document structure
│ └─ basic: Fixed-size chunks with overlap
├─ Generate embeddings via LiteLLM
│ └─ Batches of 25 (configurable)
└─ Emit SemanticContent aspect → DataHub
3. State Management
├─ Batch Mode: Track document content hashes
└─ Event Mode: Track Kafka offsets
Event Mode Flow
First Run (No State):
- Falls back to batch mode
- Captures current Kafka offset BEFORE processing
- Processes all documents
- Saves offset to state
- Next run continues from captured offset
Subsequent Runs:
- Loads last committed offset from state
- Consumes events from last position
- Processes only changed documents
- Updates offset after each batch
- Exits after idle timeout (no new events)
Incremental Processing
Content Hash Calculation:
hash_input = {
"text": document.text,
"partition_strategy": config.partition_strategy,
"chunking_strategy": config.chunking.strategy,
"max_characters": config.chunking.max_characters,
# ... other chunking params
}
content_hash = sha256(json.dumps(hash_input))
When Documents Are Reprocessed:
- Text content changes
- Chunking configuration changes
- Partition strategy changes
force_reprocess: trueis set
Configuration Deep Dive
Platform Filtering
The platform_filter setting controls which documents are processed:
None (default):
platform_filter: null # or omit the field
- Processes all NATIVE documents (sourceType=NATIVE)
- Ignores EXTERNAL documents from other platforms
Specific Platforms:
platform_filter: ["notion", "confluence"]
- Processes NATIVE documents
- PLUS EXTERNAL documents from specified platforms
All Documents:
platform_filter: ["*"] # or ["ALL"]
- Processes ALL documents regardless of source type or platform
Event Mode Configuration
event_mode:
enabled: true
# Consumer ID for offset tracking
consumer_id: "datahub-documents-{pipeline_name}" # Default
# Kafka topics to consume
topics:
- "MetadataChangeLog_Versioned_v1"
# Lookback window for first run
lookback_days: null # null = start from latest, or specify days
# Reset offsets to beginning (DANGEROUS - reprocesses everything)
reset_offsets: false
# Exit after N seconds with no new events
idle_timeout_seconds: 30
# Kafka poll settings
poll_timeout_seconds: 2
poll_limit: 100
Chunking Strategies
by_title (Recommended):
chunking:
strategy: by_title
max_characters: 500
combine_text_under_n_chars: 100
- Preserves document structure
- Groups text under section headers
- Combines small chunks intelligently
- Better semantic coherence
basic:
chunking:
strategy: basic
max_characters: 500
overlap: 50 # Character overlap between chunks
- Simple fixed-size chunks
- Configurable overlap
- No structure awareness
Embedding Configuration
Default (Fetch from Server):
embedding: {} # or omit entirely
- Automatically fetches config from server
- Ensures alignment with server's semantic search
- Recommended for production
Override (Validated Against Server):
embedding:
provider: bedrock # bedrock, cohere, openai
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3 # Must match server!
aws_region: us-west-2
batch_size: 25
input_type: search_document # Cohere-specific
- Validates that config matches server
- Fails if mismatch detected
- Prevents broken semantic search
Break-Glass Override (NOT RECOMMENDED):
embedding:
allow_local_embedding_config: true
provider: bedrock
model: cohere.embed-english-v3
# ... other settings
- Bypasses server validation
- May break semantic search
- Only use for debugging or special cases
Stateful Ingestion
stateful_ingestion:
enabled: true # Enabled by default
# State backend configuration
state_provider:
type: datahub # Store state in DataHub
config:
datahub_api:
server: "http://localhost:8080"
token: "${DATAHUB_TOKEN}"
# Ignore previous state (fresh start)
ignore_old_state: false
# Don't commit new state (dry run)
ignore_new_state: false
Performance Tuning
Batch Size
embedding:
batch_size: 25 # Default
# Increase for faster processing (if provider supports):
# - Cohere: Up to 96
# - Bedrock: Up to 100 (but rate-limited)
Event Mode Settings
event_mode:
poll_limit: 100 # Fetch up to 100 events per poll
# Increase for high-volume scenarios:
poll_limit: 500 # Process more events per batch
Filtering
# Skip short or empty documents
skip_empty_text: true
min_text_length: 50 # Characters
# Process fewer documents
platform_filter: ["notion"] # Only one platform
document_urns: # Specific documents only
- "urn:li:document:abc123"
Monitoring and Observability
Report Metrics
The source reports the following metrics:
report = {
"num_documents_fetched": 100, # Total documents fetched
"num_documents_processed": 85, # Successfully processed
"num_documents_skipped": 15, # Skipped (various reasons)
"num_documents_skipped_unchanged": 10, # Unchanged content
"num_documents_skipped_empty": 5, # Empty or too short
"num_chunks_created": 425, # Total chunks generated
"num_embeddings_generated": 425, # Total embeddings
"processing_errors": [] # List of errors
}
Logging
Enable debug logging for detailed insights:
# In your ingestion recipe
source:
type: datahub-documents
config:
# ... your config
# Set log level via environment variable
# export DATAHUB_DEBUG=true
Look for these log messages:
"Loading embedding configuration from DataHub server...""✓ Loaded embedding configuration from server""Incremental mode enabled, state file: ...""Skipping document {urn} (unchanged content hash)"
Troubleshooting
Issue: "Semantic search is not enabled on the DataHub server"
Cause: Server does not have semantic search configured.
Solution:
- Configure semantic search on your DataHub server first
- See Semantic Search Configuration Guide
- Verify
ELASTICSEARCH_SEMANTIC_SEARCH_ENABLED=truein server config
Issue: "Server does not support semantic search configuration API"
Cause: Old DataHub server version (pre-v0.14.0).
Solutions:
Option 1 (Recommended): Upgrade DataHub server to v0.14.0+
Option 2: Provide local embedding config:
embedding:
provider: bedrock
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3
aws_region: us-west-2
Issue: Embedding Configuration Validation Fails
Error:
Embedding configuration mismatch with server:
- Model: local='cohere.embed-english-v3', server='amazon.titan-embed-text-v1'
Cause: Local config doesn't match server configuration.
Solution:
- Either remove local embedding config (use server config)
- Or update server config to match local settings
- Or update local config to match server
Issue: No Documents Being Processed
Possible Causes:
Platform Filter Too Restrictive:
# If you have NATIVE documents but filter for external platforms:
platform_filter: ["notion"] # Won't process NATIVE documents!
# Solution: Remove filter or use null
platform_filter: nullAll Documents Unchanged:
- Check incremental mode is working correctly
- Force reprocess if needed:
incremental.force_reprocess: true
Documents Have No Text:
- Verify documents have content in
Document.textfield - Check
min_text_lengththreshold
- Verify documents have content in
Issue: Event Mode Not Working
Symptoms: Falls back to batch mode every run.
Possible Causes:
Stateful Ingestion Disabled:
stateful_ingestion:
enabled: true # Must be enabled for event modeKafka Connection Issues:
- Check DataHub Kafka is accessible
- Verify network connectivity
- Check Kafka broker configuration
State Provider Misconfigured:
stateful_ingestion:
state_provider:
type: datahub
config:
datahub_api:
server: "http://correct-host:8080" # Correct URL
Issue: AWS Credentials Error
Error:
Unable to load credentials from any provider in the chain
Solutions:
Verify AWS_PROFILE:
export AWS_PROFILE=datahub-dev
cat ~/.aws/credentials # Check profile existsFor EC2 Instance Role:
# Check instance role is attached
curl http://169.254.169.254/latest/meta-data/iam/security-credentials/For ECS Task Role:
- Verify task definition has correct IAM role
- Check ECS task logs for IAM-related errors
Issue: Slow Processing
Optimization Strategies:
Increase Batch Size:
embedding:
batch_size: 50 # Up from default 25Use Event Mode:
- Only processes changed documents
- Much faster than batch mode for updates
Filter Documents:
platform_filter: ["notion"] # Process fewer platforms
min_text_length: 100 # Skip short documentsOptimize Chunking:
chunking:
max_characters: 1000 # Larger chunks = fewer embeddings
Cost Estimation
AWS Bedrock Pricing (Cohere Embed v3)
As of December 2024 in us-west-2:
- $0.0001 per 1,000 input tokens (~750 words)
Example Costs:
One-time Processing:
- 1,000 documents × 500 tokens each = 500,000 tokens = $0.05
- 10,000 documents × 500 tokens each = 5M tokens = $0.50
- 100,000 documents × 500 tokens each = 50M tokens = $5.00
Incremental Updates (Event Mode):
- 100 changed documents/day × 500 tokens = 50,000 tokens/day
- Monthly: 1.5M tokens = $0.15/month
Query Embeddings (GMS):
- Separate from this source (handled by GMS at search time)
- ~50 tokens per search query
- 10,000 queries = $0.05
Limitations
Processing Limitations
- Text Only: Only processes
Document.textfield (markdown format expected) - No Binary Content: Images, PDFs, etc. must be converted to text first
- Markdown Partitioning: Uses
unstructured.partition.mdwhich may not handle all markdown variants
Platform Filtering
- Source Type Required: Documents must have
sourceTypefield (defaults to NATIVE if missing) - Platform Identification: Relies on
dataPlatformInstanceor URL-based platform extraction
State Management
- State Size: State file grows with number of documents (includes hash for each)
- State Backend: Requires DataHub or file-based state provider
Related Documentation
- Semantic Search Configuration - Start Here
- Notion Source - Example document source with embeddings
- AWS Bedrock Documentation - Embedding models
- AWS Bedrock Pricing - Cost estimation
- DataHub Developer Guides - Additional developer documentation
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: datahub-documents
config:
# Minimal configuration - uses smart defaults
# Automatically connects to DataHub using DATAHUB_GMS_URL and DATAHUB_GMS_TOKEN environment variables
# Fetches embedding configuration from server
# Enables event-driven mode and incremental processing
# No sink needed when deploying to DataHub (writes back to itself)
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description |
|---|---|
max_documents integer | Maximum number of documents to process per ingestion run. The job will stop and fail with an error once this limit is reached. Set to 0 or -1 to disable the limit. Default: 10000 |
min_text_length integer | Minimum text length in characters to process (shorter documents are skipped) Default: 50 |
partition_strategy string | Text partitioning strategy. Currently only 'markdown' is supported. This field is included in the document hash to trigger reprocessing if the strategy changes. Default: markdown |
skip_empty_text boolean | Skip documents with no text content Default: True |
chunking ChunkingConfig | Chunking strategy configuration. |
chunking.combine_text_under_n_chars integer | Combine chunks smaller than this size Default: 100 |
chunking.max_characters integer | Maximum characters per chunk Default: 500 |
chunking.overlap integer | Character overlap between chunks Default: 0 |
chunking.strategy Enum | One of: "basic", "by_title" Default: by_title |
datahub DataHubConnectionConfig | DataHub connection configuration. Defaults are loaded in this priority: 1. Explicitly configured values in the recipe 2. Environment variables (DATAHUB_GMS_URL, DATAHUB_GMS_TOKEN) 3. ~/.datahubenv file (created by datahub init) 4. Hardcoded defaults (http://localhost:8080, no token) |
datahub.server string | DataHub GMS server URL (defaults to DATAHUB_GMS_URL env var, ~/.datahubenv, or localhost:8080) |
datahub.token One of string(password), null | DataHub API token for authentication (defaults to DATAHUB_GMS_TOKEN env var or ~/.datahubenv) |
document_urns One of array, null | Specific document URNs to process (if None, process all matching platforms) Default: None |
document_urns.string string | |
embedding EmbeddingConfig | Embedding generation configuration. Default behavior: Fetches configuration from DataHub server automatically. Override behavior: Validates local config against server when explicitly set. |
embedding.allow_local_embedding_config boolean | BREAK-GLASS: Allow local config without server validation. NOT RECOMMENDED - may break semantic search. Default: False |
embedding.api_key One of string(password), null | API key for Cohere (not needed for Bedrock with IAM roles) Default: None |
embedding.aws_region One of string, null | AWS region for Bedrock. If not set, loads from server. Default: None |
embedding.batch_size integer | Batch size for embedding API calls Default: 25 |
embedding.documents_per_minute integer | Maximum number of documents to embed per minute when rate_limit is enabled. Default: 300 |
embedding.input_type One of string, null | Input type for Cohere embeddings Default: search_document |
embedding.model One of string, null | Model name. If not set, loads from server. Default: None |
embedding.model_embedding_key One of string, null | Storage key for embeddings (e.g., 'cohere_embed_v3'). Required if overriding server config. If not set, loads from server. Default: None |
embedding.provider One of Enum, null | Embedding provider (bedrock uses AWS, cohere/openai use API key). If not set, loads from server. Default: None |
embedding.rate_limit boolean | Enable rate limiting for embedding API calls. Default: True |
event_mode EventModeConfig | Event-driven mode configuration. |
event_mode.consumer_id One of string, null | Consumer ID for offset tracking (defaults to 'datahub-documents-{pipeline_name}') Default: None |
event_mode.enabled boolean | Enable event-driven mode (polls MCL events instead of GraphQL batch) Default: False |
event_mode.idle_timeout_seconds integer | Exit after this many seconds with no new events (incremental batch mode) Default: 30 |
event_mode.lookback_days One of integer, null | Number of days to look back for events on first run (None means start from latest) Default: None |
event_mode.poll_limit integer | Maximum number of events to fetch per poll Default: 100 |
event_mode.poll_timeout_seconds integer | Timeout for each poll request Default: 2 |
event_mode.reset_offsets boolean | Reset consumer offsets to start from beginning Default: False |
event_mode.topics array | Topics to consume for document changes Default: ['MetadataChangeLog_Versioned_v1'] |
event_mode.topics.string string | |
incremental IncrementalConfig | Incremental processing configuration. |
incremental.enabled boolean | Only process documents whose text content has changed (tracks content hash). Uses stateful ingestion when enabled. The state_file_path option is deprecated and ignored when stateful ingestion is enabled. Default: True |
incremental.force_reprocess boolean | Force reprocess all documents regardless of content hash Default: False |
incremental.state_file_path One of string, null | [DEPRECATED] Path to state file. This option is ignored when stateful ingestion is enabled. State is now managed through DataHub's stateful ingestion framework. Default: None |
platform_filter One of array, null | Filter documents by platforms. Default (None): Process all NATIVE documents (sourceType=NATIVE) regardless of platform. To include external documents from specific platforms, add them here (e.g., ['notion', 'confluence']). This will process NATIVE documents + EXTERNAL documents from the specified platforms. Use ['*'] or ['ALL'] to process all documents regardless of source type or platform. Default: None |
platform_filter.string string | |
stateful_ingestion DocumentChunkingStatefulIngestionConfig | Configuration for document chunking stateful ingestion. |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
The JSONSchema for this configuration is inlined below.
{
"$defs": {
"ChunkingConfig": {
"additionalProperties": false,
"description": "Chunking strategy configuration.",
"properties": {
"strategy": {
"default": "by_title",
"description": "Chunking strategy to use",
"enum": [
"basic",
"by_title"
],
"title": "Strategy",
"type": "string"
},
"max_characters": {
"default": 500,
"description": "Maximum characters per chunk",
"title": "Max Characters",
"type": "integer"
},
"overlap": {
"default": 0,
"description": "Character overlap between chunks",
"title": "Overlap",
"type": "integer"
},
"combine_text_under_n_chars": {
"default": 100,
"description": "Combine chunks smaller than this size",
"title": "Combine Text Under N Chars",
"type": "integer"
}
},
"title": "ChunkingConfig",
"type": "object"
},
"DataHubConnectionConfig": {
"additionalProperties": false,
"description": "DataHub connection configuration.\n\nDefaults are loaded in this priority:\n1. Explicitly configured values in the recipe\n2. Environment variables (DATAHUB_GMS_URL, DATAHUB_GMS_TOKEN)\n3. ~/.datahubenv file (created by `datahub init`)\n4. Hardcoded defaults (http://localhost:8080, no token)",
"properties": {
"server": {
"description": "DataHub GMS server URL (defaults to DATAHUB_GMS_URL env var, ~/.datahubenv, or localhost:8080)",
"title": "Server",
"type": "string"
},
"token": {
"anyOf": [
{
"format": "password",
"type": "string",
"writeOnly": true
},
{
"type": "null"
}
],
"description": "DataHub API token for authentication (defaults to DATAHUB_GMS_TOKEN env var or ~/.datahubenv)",
"title": "Token"
}
},
"title": "DataHubConnectionConfig",
"type": "object"
},
"DocumentChunkingStatefulIngestionConfig": {
"additionalProperties": false,
"description": "Configuration for document chunking stateful ingestion.",
"properties": {
"enabled": {
"default": false,
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"title": "Enabled",
"type": "boolean"
}
},
"title": "DocumentChunkingStatefulIngestionConfig",
"type": "object"
},
"EmbeddingConfig": {
"additionalProperties": false,
"description": "Embedding generation configuration.\n\nDefault behavior: Fetches configuration from DataHub server automatically.\nOverride behavior: Validates local config against server when explicitly set.",
"properties": {
"provider": {
"anyOf": [
{
"enum": [
"bedrock",
"cohere",
"openai"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Embedding provider (bedrock uses AWS, cohere/openai use API key). If not set, loads from server.",
"title": "Provider"
},
"model": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Model name. If not set, loads from server.",
"title": "Model"
},
"model_embedding_key": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Storage key for embeddings (e.g., 'cohere_embed_v3'). Required if overriding server config. If not set, loads from server.",
"title": "Model Embedding Key"
},
"aws_region": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS region for Bedrock. If not set, loads from server.",
"title": "Aws Region"
},
"api_key": {
"anyOf": [
{
"format": "password",
"type": "string",
"writeOnly": true
},
{
"type": "null"
}
],
"default": null,
"description": "API key for Cohere (not needed for Bedrock with IAM roles)",
"title": "Api Key"
},
"batch_size": {
"default": 25,
"description": "Batch size for embedding API calls",
"title": "Batch Size",
"type": "integer"
},
"input_type": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": "search_document",
"description": "Input type for Cohere embeddings",
"title": "Input Type"
},
"rate_limit": {
"default": true,
"description": "Enable rate limiting for embedding API calls.",
"title": "Rate Limit",
"type": "boolean"
},
"documents_per_minute": {
"default": 300,
"description": "Maximum number of documents to embed per minute when rate_limit is enabled.",
"exclusiveMinimum": 0,
"title": "Documents Per Minute",
"type": "integer"
},
"allow_local_embedding_config": {
"default": false,
"description": "BREAK-GLASS: Allow local config without server validation. NOT RECOMMENDED - may break semantic search.",
"title": "Allow Local Embedding Config",
"type": "boolean"
}
},
"title": "EmbeddingConfig",
"type": "object"
},
"EventModeConfig": {
"additionalProperties": false,
"description": "Event-driven mode configuration.",
"properties": {
"enabled": {
"default": false,
"description": "Enable event-driven mode (polls MCL events instead of GraphQL batch)",
"title": "Enabled",
"type": "boolean"
},
"consumer_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Consumer ID for offset tracking (defaults to 'datahub-documents-{pipeline_name}')",
"title": "Consumer Id"
},
"topics": {
"default": [
"MetadataChangeLog_Versioned_v1"
],
"description": "Topics to consume for document changes",
"items": {
"type": "string"
},
"title": "Topics",
"type": "array"
},
"lookback_days": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Number of days to look back for events on first run (None means start from latest)",
"title": "Lookback Days"
},
"reset_offsets": {
"default": false,
"description": "Reset consumer offsets to start from beginning",
"title": "Reset Offsets",
"type": "boolean"
},
"idle_timeout_seconds": {
"default": 30,
"description": "Exit after this many seconds with no new events (incremental batch mode)",
"title": "Idle Timeout Seconds",
"type": "integer"
},
"poll_timeout_seconds": {
"default": 2,
"description": "Timeout for each poll request",
"title": "Poll Timeout Seconds",
"type": "integer"
},
"poll_limit": {
"default": 100,
"description": "Maximum number of events to fetch per poll",
"title": "Poll Limit",
"type": "integer"
}
},
"title": "EventModeConfig",
"type": "object"
},
"IncrementalConfig": {
"additionalProperties": false,
"description": "Incremental processing configuration.",
"properties": {
"enabled": {
"default": true,
"description": "Only process documents whose text content has changed (tracks content hash). Uses stateful ingestion when enabled. The state_file_path option is deprecated and ignored when stateful ingestion is enabled.",
"title": "Enabled",
"type": "boolean"
},
"state_file_path": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "[DEPRECATED] Path to state file. This option is ignored when stateful ingestion is enabled. State is now managed through DataHub's stateful ingestion framework.",
"title": "State File Path"
},
"force_reprocess": {
"default": false,
"description": "Force reprocess all documents regardless of content hash",
"title": "Force Reprocess",
"type": "boolean"
}
},
"title": "IncrementalConfig",
"type": "object"
}
},
"description": "Configuration for DataHub Documents Source.",
"properties": {
"stateful_ingestion": {
"$ref": "#/$defs/DocumentChunkingStatefulIngestionConfig",
"description": "Stateful ingestion configuration. Enabled by default to support incremental mode (document hash tracking) and event mode (offset tracking)."
},
"datahub": {
"$ref": "#/$defs/DataHubConnectionConfig",
"description": "DataHub connection configuration. Only used when running standalone (e.g., CLI ingestion). In managed ingestion (deployed sources), the connection is automatically configured from the sink."
},
"platform_filter": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Filter documents by platforms. Default (None): Process all NATIVE documents (sourceType=NATIVE) regardless of platform. To include external documents from specific platforms, add them here (e.g., ['notion', 'confluence']). This will process NATIVE documents + EXTERNAL documents from the specified platforms. Use ['*'] or ['ALL'] to process all documents regardless of source type or platform.",
"title": "Platform Filter"
},
"document_urns": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Specific document URNs to process (if None, process all matching platforms)",
"title": "Document Urns"
},
"event_mode": {
"$ref": "#/$defs/EventModeConfig",
"description": "Event-driven mode configuration (polls Kafka MCL events)"
},
"incremental": {
"$ref": "#/$defs/IncrementalConfig",
"description": "Incremental processing configuration (skip unchanged documents)"
},
"chunking": {
"$ref": "#/$defs/ChunkingConfig",
"description": "Text chunking strategy configuration"
},
"embedding": {
"$ref": "#/$defs/EmbeddingConfig",
"description": "Embedding generation configuration (LiteLLM with Cohere/Bedrock)"
},
"max_documents": {
"default": 10000,
"description": "Maximum number of documents to process per ingestion run. The job will stop and fail with an error once this limit is reached. Set to 0 or -1 to disable the limit.",
"minimum": -1,
"title": "Max Documents",
"type": "integer"
},
"partition_strategy": {
"const": "markdown",
"default": "markdown",
"description": "Text partitioning strategy. Currently only 'markdown' is supported. This field is included in the document hash to trigger reprocessing if the strategy changes.",
"title": "Partition Strategy",
"type": "string"
},
"skip_empty_text": {
"default": true,
"description": "Skip documents with no text content",
"title": "Skip Empty Text",
"type": "boolean"
},
"min_text_length": {
"default": 50,
"description": "Minimum text length in characters to process (shorter documents are skipped)",
"title": "Min Text Length",
"type": "integer"
}
},
"title": "DataHubDocumentsSourceConfig",
"type": "object"
}
Code Coordinates
- Class Name:
datahub.ingestion.source.datahub_documents.datahub_documents_source.DataHubDocumentsSource - Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DataHubDocuments, feel free to ping us on our Slack.