Skip to main content

DataHubDocuments

DataHub Documents Source

Process Document entities from DataHub and generate semantic embeddings for semantic search. Incubating

Important Capabilities

CapabilityStatusNotes
Detect Deleted EntitiesEnabled by default via stateful ingestion.

Extract Document entities from DataHub and generate semantic embeddings for semantic search.

Supports batch mode (GraphQL) and event-driven mode (Kafka MCL) with incremental processing. Automatically fetches embedding configuration from server to ensure alignment.

Quick Start: Auto-Deploy for Semantic Search

To enable automatic semantic search indexing for your documents, deploy this source to DataHub with a simple command:

# Create minimal recipe and deploy with hourly schedule
cat > /tmp/datahub-docs.yml << 'EOF'
source:
type: datahub-documents
config: {}
EOF

datahub ingest deploy -c /tmp/datahub-docs.yml --name "document-embeddings" --schedule "0 * * * *"

This creates a managed ingestion source in DataHub that automatically processes documents every hour and generates embeddings for semantic search.

What this does:

  • ✅ Deploys ingestion recipe to DataHub
  • ✅ Runs hourly (cron: 0 * * * *) to keep embeddings up-to-date
  • ✅ Uses event-driven mode (only processes changed documents)
  • ✅ Auto-configures from server (no manual embedding setup needed)

Alternative schedules:

# Every 15 minutes: "*/15 * * * *"
# Every 6 hours: "0 */6 * * *"
# Daily at 2 AM: "0 2 * * *"

Note: In future DataHub versions, GMS will run this automatically. For now, manual deployment is required.

Overview

The DataHub Documents source processes Document entities already stored in DataHub and enriches them with semantic embeddings for semantic search. This source is designed to work with DataHub's native Document entities that have been created via GraphQL, Python SDK, or other ingestion sources (like Notion, Confluence, etc.).

Key Features

1. Smart Defaults with Server Configuration Sync

The source automatically fetches embedding configuration from your DataHub server, ensuring perfect alignment:

  • Connects using environment variables (DATAHUB_GMS_URL, DATAHUB_GMS_TOKEN)
  • Fetches embedding provider config (provider, model, region, dimensions)
  • Validates local config against server (if provided)
  • Works with minimal config: {} in your recipe

2. Dual Operating Modes

Event-Driven Mode (Recommended):

  • Processes documents in real-time from Kafka MCL events
  • Only reprocesses when document content changes
  • Efficient for continuous updates
  • Automatically falls back to batch mode on first run

Batch Mode:

  • Fetches all documents via GraphQL
  • Good for initial setup or periodic full refreshes
  • Processes all matching documents

3. Incremental Processing

  • Tracks document content hashes to skip unchanged documents
  • Uses DataHub's stateful ingestion framework
  • Reduces processing time and API costs
  • Force reprocess option available

4. Flexible Platform Filtering

  • Process all NATIVE documents (default)
  • Filter by specific platforms (e.g., ["notion", "confluence"])
  • Process all documents regardless of source type

Prerequisites

1. DataHub Server Configuration

You MUST configure semantic search on your DataHub server before using this source.

See the Semantic Search Configuration Guide for complete setup instructions.

Required server configuration:

  • OpenSearch 2.17+ with k-NN plugin enabled
  • AWS Bedrock or Cohere embedding provider configured
  • Semantic search enabled in application.yml

2. Document Entities in DataHub

This source processes existing Document entities. Documents can be created through:

  • GraphQL API: Create documents programmatically
  • Python SDK: Use datahub.sdk.document.Document
  • External Sources: Notion, Confluence, SharePoint sources that emit Document entities

3. AWS Credentials (for Bedrock)

If using AWS Bedrock for embeddings:

  • IAM permissions for bedrock:InvokeModel
  • AWS credentials configured (env vars, instance role, or ECS task role)
  • Bedrock model access enabled in AWS Console

4. Environment Variables

# Required for DataHub connection
export DATAHUB_GMS_URL="http://localhost:8080"
export DATAHUB_GMS_TOKEN="your-token-here"

# Optional: AWS credentials (if not using instance/task roles)
export AWS_PROFILE="datahub-dev"
# OR
export AWS_ACCESS_KEY_ID="your-key"
export AWS_SECRET_ACCESS_KEY="your-secret"
export AWS_REGION="us-west-2"

Common Use Cases

1. Event-Driven Processing (Production)

Process documents in real-time as they're created or updated:

source:
type: datahub-documents
config:
# Event mode enabled by default in recent versions
event_mode:
enabled: true
idle_timeout_seconds: 60

sink:
type: datahub-rest
config: {}

When to use:

  • Production deployments
  • Continuous document updates
  • Real-time semantic search needs

2. Batch Processing (Initial Load)

Process all documents in a single run:

source:
type: datahub-documents
config:
event_mode:
enabled: false

# Optional: Process specific platforms
platform_filter: ["notion", "confluence"]

sink:
type: datahub-rest
config: {}

When to use:

  • Initial setup
  • Periodic full refreshes
  • Backfilling embeddings

3. Platform-Specific Processing

Process documents from specific platforms only:

source:
type: datahub-documents
config:
# Process NATIVE documents + EXTERNAL from these platforms
platform_filter: ["notion", "confluence"]

incremental:
enabled: true

sink:
type: datahub-rest
config: {}

4. Force Reprocessing

Reprocess all documents regardless of content changes:

source:
type: datahub-documents
config:
incremental:
enabled: true
force_reprocess: true # Reprocess everything

# Useful when:
# - Changing chunking strategy
# - Updating embedding model
# - Fixing processing issues

sink:
type: datahub-rest
config: {}

5. Custom Chunking and Embedding

Override server configuration with local settings:

source:
type: datahub-documents
config:
# Custom chunking
chunking:
strategy: by_title # or 'basic'
max_characters: 1000 # Larger chunks
combine_text_under_n_chars: 200

# Override embedding config (validates against server)
embedding:
provider: bedrock
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3
aws_region: us-west-2
batch_size: 50

sink:
type: datahub-rest
config: {}

⚠️ Warning: Custom embedding configs are validated against the server. Mismatches will cause errors.

How It Works

Processing Pipeline

1. Fetch Mode Selection
├─ Event Mode: Subscribe to Kafka MCL events
└─ Batch Mode: GraphQL query for all documents

2. For Each Document:
├─ Check incremental state (skip if unchanged)
├─ Partition markdown → structured elements
├─ Chunk elements → semantic chunks
│ ├─ by_title: Preserves document structure
│ └─ basic: Fixed-size chunks with overlap
├─ Generate embeddings via LiteLLM
│ └─ Batches of 25 (configurable)
└─ Emit SemanticContent aspect → DataHub

3. State Management
├─ Batch Mode: Track document content hashes
└─ Event Mode: Track Kafka offsets

Event Mode Flow

First Run (No State):

  1. Falls back to batch mode
  2. Captures current Kafka offset BEFORE processing
  3. Processes all documents
  4. Saves offset to state
  5. Next run continues from captured offset

Subsequent Runs:

  1. Loads last committed offset from state
  2. Consumes events from last position
  3. Processes only changed documents
  4. Updates offset after each batch
  5. Exits after idle timeout (no new events)

Incremental Processing

Content Hash Calculation:

hash_input = {
"text": document.text,
"partition_strategy": config.partition_strategy,
"chunking_strategy": config.chunking.strategy,
"max_characters": config.chunking.max_characters,
# ... other chunking params
}
content_hash = sha256(json.dumps(hash_input))

When Documents Are Reprocessed:

  • Text content changes
  • Chunking configuration changes
  • Partition strategy changes
  • force_reprocess: true is set

Configuration Deep Dive

Platform Filtering

The platform_filter setting controls which documents are processed:

None (default):

platform_filter: null # or omit the field
  • Processes all NATIVE documents (sourceType=NATIVE)
  • Ignores EXTERNAL documents from other platforms

Specific Platforms:

platform_filter: ["notion", "confluence"]
  • Processes NATIVE documents
  • PLUS EXTERNAL documents from specified platforms

All Documents:

platform_filter: ["*"] # or ["ALL"]
  • Processes ALL documents regardless of source type or platform

Event Mode Configuration

event_mode:
enabled: true

# Consumer ID for offset tracking
consumer_id: "datahub-documents-{pipeline_name}" # Default

# Kafka topics to consume
topics:
- "MetadataChangeLog_Versioned_v1"

# Lookback window for first run
lookback_days: null # null = start from latest, or specify days

# Reset offsets to beginning (DANGEROUS - reprocesses everything)
reset_offsets: false

# Exit after N seconds with no new events
idle_timeout_seconds: 30

# Kafka poll settings
poll_timeout_seconds: 2
poll_limit: 100

Chunking Strategies

by_title (Recommended):

chunking:
strategy: by_title
max_characters: 500
combine_text_under_n_chars: 100
  • Preserves document structure
  • Groups text under section headers
  • Combines small chunks intelligently
  • Better semantic coherence

basic:

chunking:
strategy: basic
max_characters: 500
overlap: 50 # Character overlap between chunks
  • Simple fixed-size chunks
  • Configurable overlap
  • No structure awareness

Embedding Configuration

Default (Fetch from Server):

embedding: {} # or omit entirely
  • Automatically fetches config from server
  • Ensures alignment with server's semantic search
  • Recommended for production

Override (Validated Against Server):

embedding:
provider: bedrock # bedrock, cohere, openai
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3 # Must match server!
aws_region: us-west-2
batch_size: 25
input_type: search_document # Cohere-specific
  • Validates that config matches server
  • Fails if mismatch detected
  • Prevents broken semantic search

Break-Glass Override (NOT RECOMMENDED):

embedding:
allow_local_embedding_config: true
provider: bedrock
model: cohere.embed-english-v3
# ... other settings
  • Bypasses server validation
  • May break semantic search
  • Only use for debugging or special cases

Stateful Ingestion

stateful_ingestion:
enabled: true # Enabled by default

# State backend configuration
state_provider:
type: datahub # Store state in DataHub
config:
datahub_api:
server: "http://localhost:8080"
token: "${DATAHUB_TOKEN}"

# Ignore previous state (fresh start)
ignore_old_state: false

# Don't commit new state (dry run)
ignore_new_state: false

Performance Tuning

Batch Size

embedding:
batch_size: 25 # Default
# Increase for faster processing (if provider supports):
# - Cohere: Up to 96
# - Bedrock: Up to 100 (but rate-limited)

Event Mode Settings

event_mode:
poll_limit: 100 # Fetch up to 100 events per poll
# Increase for high-volume scenarios:
poll_limit: 500 # Process more events per batch

Filtering

# Skip short or empty documents
skip_empty_text: true
min_text_length: 50 # Characters

# Process fewer documents
platform_filter: ["notion"] # Only one platform
document_urns: # Specific documents only
- "urn:li:document:abc123"

Monitoring and Observability

Report Metrics

The source reports the following metrics:

report = {
"num_documents_fetched": 100, # Total documents fetched
"num_documents_processed": 85, # Successfully processed
"num_documents_skipped": 15, # Skipped (various reasons)
"num_documents_skipped_unchanged": 10, # Unchanged content
"num_documents_skipped_empty": 5, # Empty or too short
"num_chunks_created": 425, # Total chunks generated
"num_embeddings_generated": 425, # Total embeddings
"processing_errors": [] # List of errors
}

Logging

Enable debug logging for detailed insights:

# In your ingestion recipe
source:
type: datahub-documents
config:
# ... your config
# Set log level via environment variable
# export DATAHUB_DEBUG=true

Look for these log messages:

  • "Loading embedding configuration from DataHub server..."
  • "✓ Loaded embedding configuration from server"
  • "Incremental mode enabled, state file: ..."
  • "Skipping document {urn} (unchanged content hash)"

Troubleshooting

Issue: "Semantic search is not enabled on the DataHub server"

Cause: Server does not have semantic search configured.

Solution:

  1. Configure semantic search on your DataHub server first
  2. See Semantic Search Configuration Guide
  3. Verify ELASTICSEARCH_SEMANTIC_SEARCH_ENABLED=true in server config

Issue: "Server does not support semantic search configuration API"

Cause: Old DataHub server version (pre-v0.14.0).

Solutions:

Option 1 (Recommended): Upgrade DataHub server to v0.14.0+

Option 2: Provide local embedding config:

embedding:
provider: bedrock
model: cohere.embed-english-v3
model_embedding_key: cohere_embed_v3
aws_region: us-west-2

Issue: Embedding Configuration Validation Fails

Error:

Embedding configuration mismatch with server:
- Model: local='cohere.embed-english-v3', server='amazon.titan-embed-text-v1'

Cause: Local config doesn't match server configuration.

Solution:

  1. Either remove local embedding config (use server config)
  2. Or update server config to match local settings
  3. Or update local config to match server

Issue: No Documents Being Processed

Possible Causes:

  1. Platform Filter Too Restrictive:

    # If you have NATIVE documents but filter for external platforms:
    platform_filter: ["notion"] # Won't process NATIVE documents!

    # Solution: Remove filter or use null
    platform_filter: null
  2. All Documents Unchanged:

    • Check incremental mode is working correctly
    • Force reprocess if needed: incremental.force_reprocess: true
  3. Documents Have No Text:

    • Verify documents have content in Document.text field
    • Check min_text_length threshold

Issue: Event Mode Not Working

Symptoms: Falls back to batch mode every run.

Possible Causes:

  1. Stateful Ingestion Disabled:

    stateful_ingestion:
    enabled: true # Must be enabled for event mode
  2. Kafka Connection Issues:

    • Check DataHub Kafka is accessible
    • Verify network connectivity
    • Check Kafka broker configuration
  3. State Provider Misconfigured:

    stateful_ingestion:
    state_provider:
    type: datahub
    config:
    datahub_api:
    server: "http://correct-host:8080" # Correct URL

Issue: AWS Credentials Error

Error:

Unable to load credentials from any provider in the chain

Solutions:

  1. Verify AWS_PROFILE:

    export AWS_PROFILE=datahub-dev
    cat ~/.aws/credentials # Check profile exists
  2. For EC2 Instance Role:

    # Check instance role is attached
    curl http://169.254.169.254/latest/meta-data/iam/security-credentials/
  3. For ECS Task Role:

    • Verify task definition has correct IAM role
    • Check ECS task logs for IAM-related errors

Issue: Slow Processing

Optimization Strategies:

  1. Increase Batch Size:

    embedding:
    batch_size: 50 # Up from default 25
  2. Use Event Mode:

    • Only processes changed documents
    • Much faster than batch mode for updates
  3. Filter Documents:

    platform_filter: ["notion"] # Process fewer platforms
    min_text_length: 100 # Skip short documents
  4. Optimize Chunking:

    chunking:
    max_characters: 1000 # Larger chunks = fewer embeddings

Cost Estimation

AWS Bedrock Pricing (Cohere Embed v3)

As of December 2024 in us-west-2:

  • $0.0001 per 1,000 input tokens (~750 words)

Example Costs:

One-time Processing:

  • 1,000 documents × 500 tokens each = 500,000 tokens = $0.05
  • 10,000 documents × 500 tokens each = 5M tokens = $0.50
  • 100,000 documents × 500 tokens each = 50M tokens = $5.00

Incremental Updates (Event Mode):

  • 100 changed documents/day × 500 tokens = 50,000 tokens/day
  • Monthly: 1.5M tokens = $0.15/month

Query Embeddings (GMS):

  • Separate from this source (handled by GMS at search time)
  • ~50 tokens per search query
  • 10,000 queries = $0.05

Limitations

Processing Limitations

  • Text Only: Only processes Document.text field (markdown format expected)
  • No Binary Content: Images, PDFs, etc. must be converted to text first
  • Markdown Partitioning: Uses unstructured.partition.md which may not handle all markdown variants

Platform Filtering

  • Source Type Required: Documents must have sourceType field (defaults to NATIVE if missing)
  • Platform Identification: Relies on dataPlatformInstance or URL-based platform extraction

State Management

  • State Size: State file grows with number of documents (includes hash for each)
  • State Backend: Requires DataHub or file-based state provider

CLI based Ingestion

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: datahub-documents
config:
# Minimal configuration - uses smart defaults
# Automatically connects to DataHub using DATAHUB_GMS_URL and DATAHUB_GMS_TOKEN environment variables
# Fetches embedding configuration from server
# Enables event-driven mode and incremental processing
# No sink needed when deploying to DataHub (writes back to itself)

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
max_documents
integer
Maximum number of documents to process per ingestion run. The job will stop and fail with an error once this limit is reached. Set to 0 or -1 to disable the limit.
Default: 10000
min_text_length
integer
Minimum text length in characters to process (shorter documents are skipped)
Default: 50
partition_strategy
string
Text partitioning strategy. Currently only 'markdown' is supported. This field is included in the document hash to trigger reprocessing if the strategy changes.
Default: markdown
skip_empty_text
boolean
Skip documents with no text content
Default: True
chunking
ChunkingConfig
Chunking strategy configuration.
chunking.combine_text_under_n_chars
integer
Combine chunks smaller than this size
Default: 100
chunking.max_characters
integer
Maximum characters per chunk
Default: 500
chunking.overlap
integer
Character overlap between chunks
Default: 0
chunking.strategy
Enum
One of: "basic", "by_title"
Default: by_title
datahub
DataHubConnectionConfig
DataHub connection configuration.

Defaults are loaded in this priority:
1. Explicitly configured values in the recipe
2. Environment variables (DATAHUB_GMS_URL, DATAHUB_GMS_TOKEN)
3. ~/.datahubenv file (created by datahub init)
4. Hardcoded defaults (http://localhost:8080, no token)
datahub.server
string
DataHub GMS server URL (defaults to DATAHUB_GMS_URL env var, ~/.datahubenv, or localhost:8080)
datahub.token
One of string(password), null
DataHub API token for authentication (defaults to DATAHUB_GMS_TOKEN env var or ~/.datahubenv)
document_urns
One of array, null
Specific document URNs to process (if None, process all matching platforms)
Default: None
document_urns.string
string
embedding
EmbeddingConfig
Embedding generation configuration.

Default behavior: Fetches configuration from DataHub server automatically.
Override behavior: Validates local config against server when explicitly set.
embedding.allow_local_embedding_config
boolean
BREAK-GLASS: Allow local config without server validation. NOT RECOMMENDED - may break semantic search.
Default: False
embedding.api_key
One of string(password), null
API key for Cohere (not needed for Bedrock with IAM roles)
Default: None
embedding.aws_region
One of string, null
AWS region for Bedrock. If not set, loads from server.
Default: None
embedding.batch_size
integer
Batch size for embedding API calls
Default: 25
embedding.documents_per_minute
integer
Maximum number of documents to embed per minute when rate_limit is enabled.
Default: 300
embedding.input_type
One of string, null
Input type for Cohere embeddings
Default: search_document
embedding.model
One of string, null
Model name. If not set, loads from server.
Default: None
embedding.model_embedding_key
One of string, null
Storage key for embeddings (e.g., 'cohere_embed_v3'). Required if overriding server config. If not set, loads from server.
Default: None
embedding.provider
One of Enum, null
Embedding provider (bedrock uses AWS, cohere/openai use API key). If not set, loads from server.
Default: None
embedding.rate_limit
boolean
Enable rate limiting for embedding API calls.
Default: True
event_mode
EventModeConfig
Event-driven mode configuration.
event_mode.consumer_id
One of string, null
Consumer ID for offset tracking (defaults to 'datahub-documents-{pipeline_name}')
Default: None
event_mode.enabled
boolean
Enable event-driven mode (polls MCL events instead of GraphQL batch)
Default: False
event_mode.idle_timeout_seconds
integer
Exit after this many seconds with no new events (incremental batch mode)
Default: 30
event_mode.lookback_days
One of integer, null
Number of days to look back for events on first run (None means start from latest)
Default: None
event_mode.poll_limit
integer
Maximum number of events to fetch per poll
Default: 100
event_mode.poll_timeout_seconds
integer
Timeout for each poll request
Default: 2
event_mode.reset_offsets
boolean
Reset consumer offsets to start from beginning
Default: False
event_mode.topics
array
Topics to consume for document changes
Default: ['MetadataChangeLog_Versioned_v1']
event_mode.topics.string
string
incremental
IncrementalConfig
Incremental processing configuration.
incremental.enabled
boolean
Only process documents whose text content has changed (tracks content hash). Uses stateful ingestion when enabled. The state_file_path option is deprecated and ignored when stateful ingestion is enabled.
Default: True
incremental.force_reprocess
boolean
Force reprocess all documents regardless of content hash
Default: False
incremental.state_file_path
One of string, null
[DEPRECATED] Path to state file. This option is ignored when stateful ingestion is enabled. State is now managed through DataHub's stateful ingestion framework.
Default: None
platform_filter
One of array, null
Filter documents by platforms. Default (None): Process all NATIVE documents (sourceType=NATIVE) regardless of platform. To include external documents from specific platforms, add them here (e.g., ['notion', 'confluence']). This will process NATIVE documents + EXTERNAL documents from the specified platforms. Use ['*'] or ['ALL'] to process all documents regardless of source type or platform.
Default: None
platform_filter.string
string
stateful_ingestion
DocumentChunkingStatefulIngestionConfig
Configuration for document chunking stateful ingestion.
stateful_ingestion.enabled
boolean
Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False
Default: False

Code Coordinates

  • Class Name: datahub.ingestion.source.datahub_documents.datahub_documents_source.DataHubDocumentsSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for DataHubDocuments, feel free to ping us on our Slack.