Kafka Connect
Integration Details
This plugin extracts the following:
- Source and Sink Connectors in Kafka Connect as Data Pipelines
- For Source connectors - Data Jobs to represent lineage information between source dataset to Kafka topic per
{connector_name}:{source_dataset}combination - For Sink connectors - Data Jobs to represent lineage information between Kafka topic to destination dataset per
{connector_name}:{topic}combination
Requirements
Java Runtime Dependency:
This source requires Java to be installed and available on the system for transform pipeline support (RegexRouter, etc.). The Java runtime is accessed via JPype to enable Java regex pattern matching that's compatible with Kafka Connect transforms.
- Python installations: Install Java separately (e.g.,
apt-get install openjdk-11-jre-headlesson Debian/Ubuntu) - Docker deployments: Ensure your DataHub ingestion Docker image includes a Java runtime. The official DataHub images include Java by default.
- Impact: Without Java, transform pipeline features will be disabled and lineage accuracy may be reduced for connectors using transforms
Note for Docker users: If you're building custom Docker images for DataHub ingestion, ensure a Java Runtime Environment (JRE) is included in your image to support full transform pipeline functionality.
Environment Support
DataHub's Kafka Connect source supports both self-hosted and Confluent Cloud environments with automatic detection and environment-specific topic retrieval strategies:
Self-hosted Kafka Connect
- Topic Discovery: Uses runtime
/connectors/{name}/topicsAPI endpoint - Accuracy: Returns actual topics that connectors are currently reading from/writing to
- Benefits: Most accurate topic information as it reflects actual runtime state
- Requirements: Standard Kafka Connect REST API access
Confluent Cloud
- Topic Discovery: Uses comprehensive Kafka REST API v3 for optimal transform pipeline support with config-based fallback
- Method: Gets all topics from Kafka cluster via REST API, applies reverse transform pipeline for accurate mappings
- Transform Support: Full support for complex transform pipelines via reverse pipeline strategy using actual cluster topics
- Fallback: Falls back to config-based derivation if Kafka API is unavailable
Environment Detection: Automatically detects environment based on connect_uri patterns containing confluent.cloud.
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
| Source Concept | DataHub Concept | Notes |
|---|---|---|
"kafka-connect" | Data Platform | |
| Connector | DataFlow | |
| Kafka Topic | Dataset |
Supported Connectors and Lineage Extraction
DataHub supports different connector types with varying levels of lineage extraction capabilities depending on the environment (self-hosted vs Confluent Cloud):
Source Connectors
| Connector Type | Self-hosted Support | Confluent Cloud Support | Topic Discovery Method | Lineage Extraction |
|---|---|---|---|---|
Platform JDBC Sourceio.confluent.connect.jdbc.JdbcSourceConnector | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
Cloud PostgreSQL CDCPostgresCdcSource | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
Cloud PostgreSQL CDC V2PostgresCdcSourceV2 | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
Cloud MySQL SourceMySqlSource | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
Cloud MySQL CDCMySqlCdcSource | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
Debezium MySQLio.debezium.connector.mysql.MySqlConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
Debezium PostgreSQLio.debezium.connector.postgresql.PostgresConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
Debezium SQL Serverio.debezium.connector.sqlserver.SqlServerConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
Debezium Oracleio.debezium.connector.oracle.OracleConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
Debezium DB2io.debezium.connector.db2.Db2Connector | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
Debezium MongoDBio.debezium.connector.mongodb.MongoDbConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Collection → Topic CDC mapping |
Debezium Vitessio.debezium.connector.vitess.VitessConnector | ✅ Full | ✅ Partial | Runtime API / Config-based | Table → Topic CDC mapping |
MongoDB Sourcecom.mongodb.kafka.connect.MongoSourceConnector | ✅ Full | 🔧 Config Required | Runtime API / Manual config | Collection → Topic mapping |
| Generic Connectors | 🔧 Config Required | 🔧 Config Required | User-defined mapping | Custom lineage mapping |
Sink Connectors
| Connector Type | Self-hosted Support | Confluent Cloud Support | Topic Discovery Method | Lineage Extraction |
|---|---|---|---|---|
BigQuery Sinkcom.wepay.kafka.connect.bigquery.BigQuerySinkConnector | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
S3 Sinkio.confluent.connect.s3.S3SinkConnector | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → S3 object mapping |
Snowflake Sinkcom.snowflake.kafka.connector.SnowflakeSinkConnector | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
Cloud PostgreSQL SinkPostgresSink | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
Cloud MySQL SinkMySqlSink | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
Cloud Snowflake SinkSnowflakeSink | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
Legend:
- ✅ Full: Complete lineage extraction with accurate topic discovery
- ✅ Partial: Lineage extraction supported but topic discovery may be limited (config-based only)
- 🔧 Config Required: Requires
generic_connectorsconfiguration for lineage mapping
Supported Transforms
DataHub uses an advanced transform pipeline strategy that automatically handles complex transform chains by applying the complete pipeline to all topics and checking if results exist. This provides robust support for any combination of transforms.
Topic Routing Transforms
- RegexRouter:
org.apache.kafka.connect.transforms.RegexRouter - Cloud RegexRouter:
io.confluent.connect.cloud.transforms.TopicRegexRouter - Debezium EventRouter:
io.debezium.transforms.outbox.EventRouter(Outbox pattern)
Non-Topic Routing Transforms
DataHub recognizes but passes through these transforms (they don't affect lineage):
- InsertField, ReplaceField, MaskField, ValueToKey, HoistField, ExtractField
- SetSchemaMetadata, Flatten, Cast, HeadersFrom, TimestampConverter
- Filter, InsertHeader, DropHeaders, Drop, TombstoneHandler
Transform Pipeline Strategy
DataHub uses an improved reverse transform pipeline approach that:
- Takes all actual topics from the connector manifest/Kafka cluster
- Applies the complete transform pipeline to each topic
- Checks if transformed results exist in the actual topic list
- Creates lineage mappings only for successful matches
Benefits:
- ✅ Works with any transform combination (single or chained transforms)
- ✅ Handles complex scenarios like EventRouter + RegexRouter chains
- ✅ Uses actual topics as source of truth (no prediction needed)
- ✅ Future-proof for new transform types
- ✅ Works identically for both self-hosted and Confluent Cloud environments
Capabilities and Limitations
Transform Pipeline Support
✅ Fully Supported:
- Any combination of transforms: RegexRouter, EventRouter, and non-routing transforms
- Complex transform chains: Multiple chained transforms automatically handled
- Both environments: Self-hosted and Confluent Cloud work identically
- Future-proof: New transform types automatically supported
⚠️ Considerations:
- For connectors not listed in the supported connector table above, use the
generic_connectorsconfiguration to provide explicit lineage mappings - Some advanced connector-specific features may not be fully supported
Environment-Specific Behavior
Self-hosted Kafka Connect
- Topic Discovery: Uses runtime
/connectors/{name}/topicsAPI endpoint for maximum accuracy - Requirements: Standard Kafka Connect REST API access
- Fallback: If runtime API fails, falls back to config-based derivation
Confluent Cloud
- Topic Discovery: Uses comprehensive Kafka REST API v3 to get all topics, with automatic credential reuse
- Transform Support: Full support for all transform combinations via reverse pipeline strategy using actual cluster topics
- Auto-derivation: Automatically derives Kafka REST endpoint from connector configurations
Configuration Control
The use_connect_topics_api flag controls topic retrieval behavior:
- When
true(default): Uses environment-specific topic discovery with full transform support - When
false: Disables all topic discovery for air-gapped environments or performance optimization
Advanced Scenarios
Complex Transform Chains: The new reverse transform pipeline strategy handles complex scenarios automatically:
# Example: EventRouter + RegexRouter chain
transforms: EventRouter,RegexRouter
transforms.EventRouter.type: io.debezium.transforms.outbox.EventRouter
transforms.RegexRouter.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexRouter.regex: "outbox\\.event\\.(.*)"
transforms.RegexRouter.replacement: "events.$1"
Fallback Options:
- If transform pipeline cannot determine mappings, DataHub falls back to simple topic-based lineage
- For unsupported connector types or complex custom scenarios, use
generic_connectorsconfiguration
Performance Optimization:
- Set
use_connect_topics_api: falseto disable topic discovery in air-gapped environments - Transform pipeline processing adds minimal overhead and improves lineage accuracy
Important Capabilities
| Capability | Status | Notes |
|---|---|---|
| Detect Deleted Entities | ✅ | Enabled by default via stateful ingestion. |
| Platform Instance | ✅ | Enabled by default. |
| Schema Metadata | ✅ | Enabled by default. |
| Table-Level Lineage | ✅ | Enabled by default. |
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "kafka-connect"
config:
# Coordinates
connect_uri: "http://localhost:8083"
# Credentials
username: admin
password: password
# Optional
# Platform instance mapping to use when constructing URNs.
# Use if single instance of platform is referred across connectors.
platform_instance_map:
mysql: mysql_platform_instance
sink:
# sink configs
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description |
|---|---|
cluster_name One of string, null | Cluster to ingest from. Default: connect-cluster |
confluent_cloud_cluster_id One of string, null | Confluent Cloud Kafka Connect cluster ID (e.g., 'lkc-abc123'). When specified along with confluent_cloud_environment_id, the connect_uri will be automatically constructed. This is the recommended approach for Confluent Cloud instead of manually constructing the full URI. Default: None |
confluent_cloud_environment_id One of string, null | Confluent Cloud environment ID (e.g., 'env-xyz123'). When specified along with confluent_cloud_cluster_id, the connect_uri will be automatically constructed. This is the recommended approach for Confluent Cloud instead of manually constructing the full URI. Default: None |
connect_to_platform_map One of string, null | Platform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either platform_instance_map or connect_to_platform_map. e.g.connect_to_platform_map: { "postgres-connector-finance-db": "postgres": "core_finance_instance" } Default: None |
connect_uri string | URI to connect to. Default: http://localhost:8083/ |
convert_lineage_urns_to_lowercase boolean | Whether to convert the urns of ingested lineage dataset to lowercase Default: False |
kafka_api_key One of string, null | Optional: Confluent Cloud Kafka API key for authenticating with Kafka REST API v3. If not specified, DataHub will reuse the Connect credentials (username/password) for Kafka API authentication. Only needed if you want to use separate credentials for the Kafka API. Default: None |
kafka_api_secret One of string, null | Optional: Confluent Cloud Kafka API secret for authenticating with Kafka REST API v3. If not specified, DataHub will reuse the Connect credentials (username/password) for Kafka API authentication. Only needed if you want to use separate credentials for the Kafka API. Default: None |
kafka_rest_endpoint One of string, null | Optional: Confluent Cloud Kafka REST API endpoint for comprehensive topic retrieval. Format: https://pkc-xxxxx.region.provider.confluent.cloud If not specified, DataHub automatically derives the endpoint from connector configurations (kafka.endpoint). When available, enables getting all topics from Kafka cluster for improved transform pipeline accuracy. Default: None |
password One of string, null | Kafka Connect password. Default: None |
platform_instance One of string, null | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details. Default: None |
platform_instance_map One of string, null | Platform instance mapping to use when constructing URNs. e.g.platform_instance_map: { "hive": "warehouse" } Default: None |
schema_resolver_expand_patterns One of boolean, null | Enable table pattern expansion using DataHub schema metadata. When use_schema_resolver=True, this controls whether to expand patterns like 'database.*' to actual table names by querying DataHub. Only applies when use_schema_resolver is enabled. Defaults to True when use_schema_resolver is enabled. Default: None |
schema_resolver_finegrained_lineage One of boolean, null | Enable fine-grained (column-level) lineage extraction using DataHub schema metadata. When use_schema_resolver=True, this controls whether to generate column-level lineage by matching schemas between source tables and Kafka topics. Only applies when use_schema_resolver is enabled. Defaults to True when use_schema_resolver is enabled. Default: None |
use_connect_topics_api boolean | Whether to use Kafka Connect API for topic retrieval and validation. This flag controls the environment-specific topic retrieval strategy: When True (default): - Self-hosted environments: Uses runtime /connectors/{name}/topics API for accurate topic information - Confluent Cloud: Uses comprehensive Kafka REST API v3 to get all topics for transform pipeline, with config-based fallback When False: Disables all API-based topic retrieval for both environments. Returns empty topic lists. Useful for air-gapped environments or when topic validation isn't needed for performance optimization. Default: True |
use_schema_resolver boolean | Use DataHub's schema metadata to enhance Kafka Connect connector lineage. When enabled (requires DataHub graph connection): 1) Expands table patterns (e.g., 'database.*') to actual tables using DataHub metadata 2) Generates fine-grained column-level lineage for Kafka Connect sources/sinks. Auto-enabled for Confluent Cloud: This feature is automatically enabled for Confluent Cloud environments where DataHub graph connection is required. Set use_schema_resolver: false to disable. Prerequisite: Source database tables must be ingested into DataHub before Kafka Connect ingestion for this feature to work. Without prior database ingestion, schema resolver will not find table metadata. Default: False |
username One of string, null | Kafka Connect username. Default: None |
env string | The environment that all assets produced by this connector belong to Default: PROD |
connector_patterns AllowDenyPattern | A class to store allow deny regexes |
connector_patterns.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
connector_patterns.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
connector_patterns.allow.string string | |
connector_patterns.deny array | List of regex patterns to exclude from ingestion. Default: [] |
connector_patterns.deny.string string | |
generic_connectors array | Provide lineage graph for sources connectors other than Confluent JDBC Source Connector, Debezium Source Connector, and Mongo Source Connector Default: [] |
generic_connectors.GenericConnectorConfig GenericConnectorConfig | |
generic_connectors.GenericConnectorConfig.connector_name ❓ string | |
generic_connectors.GenericConnectorConfig.source_dataset ❓ string | |
generic_connectors.GenericConnectorConfig.source_platform ❓ string | |
provided_configs One of array, null | Provided Configurations Default: None |
provided_configs.ProvidedConfig ProvidedConfig | |
provided_configs.ProvidedConfig.path_key ❓ string | |
provided_configs.ProvidedConfig.provider ❓ string | |
provided_configs.ProvidedConfig.value ❓ string | |
stateful_ingestion One of StatefulStaleMetadataRemovalConfig, null | Default: None |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.fail_safe_threshold number | Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'. Default: 75.0 |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"$defs": {
"AllowDenyPattern": {
"additionalProperties": false,
"description": "A class to store allow deny regexes",
"properties": {
"allow": {
"default": [
".*"
],
"description": "List of regex patterns to include in ingestion",
"items": {
"type": "string"
},
"title": "Allow",
"type": "array"
},
"deny": {
"default": [],
"description": "List of regex patterns to exclude from ingestion.",
"items": {
"type": "string"
},
"title": "Deny",
"type": "array"
},
"ignoreCase": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": true,
"description": "Whether to ignore case sensitivity during pattern matching.",
"title": "Ignorecase"
}
},
"title": "AllowDenyPattern",
"type": "object"
},
"GenericConnectorConfig": {
"additionalProperties": false,
"properties": {
"connector_name": {
"title": "Connector Name",
"type": "string"
},
"source_dataset": {
"title": "Source Dataset",
"type": "string"
},
"source_platform": {
"title": "Source Platform",
"type": "string"
}
},
"required": [
"connector_name",
"source_dataset",
"source_platform"
],
"title": "GenericConnectorConfig",
"type": "object"
},
"ProvidedConfig": {
"additionalProperties": false,
"properties": {
"provider": {
"title": "Provider",
"type": "string"
},
"path_key": {
"title": "Path Key",
"type": "string"
},
"value": {
"title": "Value",
"type": "string"
}
},
"required": [
"provider",
"path_key",
"value"
],
"title": "ProvidedConfig",
"type": "object"
},
"StatefulStaleMetadataRemovalConfig": {
"additionalProperties": false,
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"properties": {
"enabled": {
"default": false,
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"title": "Enabled",
"type": "boolean"
},
"remove_stale_metadata": {
"default": true,
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"title": "Remove Stale Metadata",
"type": "boolean"
},
"fail_safe_threshold": {
"default": 75.0,
"description": "Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
"maximum": 100.0,
"minimum": 0.0,
"title": "Fail Safe Threshold",
"type": "number"
}
},
"title": "StatefulStaleMetadataRemovalConfig",
"type": "object"
}
},
"additionalProperties": false,
"properties": {
"stateful_ingestion": {
"anyOf": [
{
"$ref": "#/$defs/StatefulStaleMetadataRemovalConfig"
},
{
"type": "null"
}
],
"default": null
},
"env": {
"default": "PROD",
"description": "The environment that all assets produced by this connector belong to",
"title": "Env",
"type": "string"
},
"platform_instance_map": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "Platform instance mapping to use when constructing URNs. e.g.`platform_instance_map: { \"hive\": \"warehouse\" }`",
"title": "Platform Instance Map"
},
"platform_instance": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.",
"title": "Platform Instance"
},
"connect_uri": {
"default": "http://localhost:8083/",
"description": "URI to connect to.",
"title": "Connect Uri",
"type": "string"
},
"username": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Kafka Connect username.",
"title": "Username"
},
"password": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Kafka Connect password.",
"title": "Password"
},
"cluster_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": "connect-cluster",
"description": "Cluster to ingest from.",
"title": "Cluster Name"
},
"convert_lineage_urns_to_lowercase": {
"default": false,
"description": "Whether to convert the urns of ingested lineage dataset to lowercase",
"title": "Convert Lineage Urns To Lowercase",
"type": "boolean"
},
"connector_patterns": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "regex patterns for connectors to filter for ingestion."
},
"provided_configs": {
"anyOf": [
{
"items": {
"$ref": "#/$defs/ProvidedConfig"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Provided Configurations",
"title": "Provided Configs"
},
"connect_to_platform_map": {
"anyOf": [
{
"additionalProperties": {
"additionalProperties": {
"type": "string"
},
"type": "object"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "Platform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either `platform_instance_map` or `connect_to_platform_map`. e.g.`connect_to_platform_map: { \"postgres-connector-finance-db\": \"postgres\": \"core_finance_instance\" }`",
"title": "Connect To Platform Map"
},
"generic_connectors": {
"default": [],
"description": "Provide lineage graph for sources connectors other than Confluent JDBC Source Connector, Debezium Source Connector, and Mongo Source Connector",
"items": {
"$ref": "#/$defs/GenericConnectorConfig"
},
"title": "Generic Connectors",
"type": "array"
},
"use_connect_topics_api": {
"default": true,
"description": "Whether to use Kafka Connect API for topic retrieval and validation. This flag controls the environment-specific topic retrieval strategy: \n**When True (default):** - **Self-hosted environments:** Uses runtime `/connectors/{name}/topics` API for accurate topic information - **Confluent Cloud:** Uses comprehensive Kafka REST API v3 to get all topics for transform pipeline, with config-based fallback \n**When False:** Disables all API-based topic retrieval for both environments. Returns empty topic lists. Useful for air-gapped environments or when topic validation isn't needed for performance optimization.",
"title": "Use Connect Topics Api",
"type": "boolean"
},
"kafka_rest_endpoint": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional: Confluent Cloud Kafka REST API endpoint for comprehensive topic retrieval. Format: https://pkc-xxxxx.region.provider.confluent.cloud If not specified, DataHub automatically derives the endpoint from connector configurations (kafka.endpoint). When available, enables getting all topics from Kafka cluster for improved transform pipeline accuracy.",
"title": "Kafka Rest Endpoint"
},
"kafka_api_key": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional: Confluent Cloud Kafka API key for authenticating with Kafka REST API v3. If not specified, DataHub will reuse the Connect credentials (username/password) for Kafka API authentication. Only needed if you want to use separate credentials for the Kafka API.",
"title": "Kafka Api Key"
},
"kafka_api_secret": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional: Confluent Cloud Kafka API secret for authenticating with Kafka REST API v3. If not specified, DataHub will reuse the Connect credentials (username/password) for Kafka API authentication. Only needed if you want to use separate credentials for the Kafka API.",
"title": "Kafka Api Secret"
},
"confluent_cloud_environment_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Confluent Cloud environment ID (e.g., 'env-xyz123'). When specified along with confluent_cloud_cluster_id, the connect_uri will be automatically constructed. This is the recommended approach for Confluent Cloud instead of manually constructing the full URI.",
"title": "Confluent Cloud Environment Id"
},
"confluent_cloud_cluster_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Confluent Cloud Kafka Connect cluster ID (e.g., 'lkc-abc123'). When specified along with confluent_cloud_environment_id, the connect_uri will be automatically constructed. This is the recommended approach for Confluent Cloud instead of manually constructing the full URI.",
"title": "Confluent Cloud Cluster Id"
},
"use_schema_resolver": {
"default": false,
"description": "Use DataHub's schema metadata to enhance Kafka Connect connector lineage. When enabled (requires DataHub graph connection): 1) Expands table patterns (e.g., 'database.*') to actual tables using DataHub metadata 2) Generates fine-grained column-level lineage for Kafka Connect sources/sinks. \n\n**Auto-enabled for Confluent Cloud:** This feature is automatically enabled for Confluent Cloud environments where DataHub graph connection is required. Set `use_schema_resolver: false` to disable. \n\n**Prerequisite:** Source database tables must be ingested into DataHub before Kafka Connect ingestion for this feature to work. Without prior database ingestion, schema resolver will not find table metadata.",
"title": "Use Schema Resolver",
"type": "boolean"
},
"schema_resolver_expand_patterns": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Enable table pattern expansion using DataHub schema metadata. When use_schema_resolver=True, this controls whether to expand patterns like 'database.*' to actual table names by querying DataHub. Only applies when use_schema_resolver is enabled. Defaults to True when use_schema_resolver is enabled.",
"title": "Schema Resolver Expand Patterns"
},
"schema_resolver_finegrained_lineage": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Enable fine-grained (column-level) lineage extraction using DataHub schema metadata. When use_schema_resolver=True, this controls whether to generate column-level lineage by matching schemas between source tables and Kafka topics. Only applies when use_schema_resolver is enabled. Defaults to True when use_schema_resolver is enabled.",
"title": "Schema Resolver Finegrained Lineage"
}
},
"title": "KafkaConnectSourceConfig",
"type": "object"
}
Advanced Configurations
Environment-Specific Topic Discovery
DataHub's Kafka Connect source automatically detects your environment (self-hosted vs Confluent Cloud) and uses the appropriate topic discovery strategy:
Self-hosted Kafka Connect
Uses the runtime /connectors/{name}/topics API endpoint for accurate, real-time topic information:
source:
type: kafka-connect
config:
# Self-hosted Kafka Connect cluster
connect_uri: "http://localhost:8083"
# use_connect_topics_api: true # Default - enables runtime topic discovery
Confluent Cloud
Uses comprehensive transform pipeline support with Kafka REST API v3 topic validation and config-based fallback:
Recommended approach using environment and cluster IDs:
source:
type: kafka-connect
config:
# Auto-construct URI from environment and cluster IDs (recommended)
confluent_cloud_environment_id: "env-xyz123" # Your Confluent Cloud environment ID
confluent_cloud_cluster_id: "lkc-abc456" # Your Kafka Connect cluster ID
# Standard credentials for Kafka Connect API
username: "your-connect-api-key" # API key for Kafka Connect access
password: "your-connect-api-secret" # API secret for Kafka Connect access
# Optional: Separate credentials for Kafka REST API (if different from Connect API)
kafka_api_key: "your-kafka-api-key" # API key for Kafka REST API access
kafka_api_secret: "your-kafka-api-secret" # API secret for Kafka REST API access
# Optional: Dedicated Kafka REST endpoint for comprehensive topic retrieval
kafka_rest_endpoint: "https://pkc-xxxxx.region.provider.confluent.cloud"
# use_connect_topics_api: true # Default - enables comprehensive topic retrieval
Alternative approach using full URI (legacy):
source:
type: kafka-connect
config:
# Confluent Cloud Connect URI - automatically detected
connect_uri: "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-abc456"
username: "your-connect-api-key" # API key for Kafka Connect
password: "your-connect-api-secret" # API secret for Kafka Connect
kafka_api_key: "your-kafka-api-key" # API key for Kafka REST API (if different)
kafka_api_secret: "your-kafka-api-secret" # API secret for Kafka REST API (if different)
# Optional: Dedicated Kafka REST endpoint for comprehensive topic retrieval
kafka_rest_endpoint: "https://pkc-xxxxx.region.provider.confluent.cloud"
How Lineage Inference Works with Transform Pipelines:
Kafka Connect connectors can apply transforms (like RegexRouter) that modify topic names before data reaches Kafka. DataHub's lineage inference analyzes these transform configurations to determine how topics are produced:
- Configuration Analysis - Extracts source tables from connector configuration (
table.include.list,database.include.list) - Transform Application - Applies configured transforms (RegexRouter, EventRouter, etc.) to predict final topic names
- Topic Validation - Validates predicted topics against actual cluster topics using Kafka REST API v3
- Lineage Construction - Maps source tables to validated topics, preserving schema information
This approach works for both self-hosted and Confluent Cloud environments:
- Self-hosted: Uses runtime
/connectors/{name}/topicsAPI for actual topics produced by each connector - Confluent Cloud: Uses Kafka REST API v3 to get all cluster topics, then applies transform pipeline to match with connector config
Key Benefits:
- 90-95% accuracy for Cloud connectors with transforms (significant improvement over previous config-only approach)
- Full RegexRouter support with Java regex compatibility
- Complex transform chains handled correctly
- Schema preservation maintains full table names with schema information
Configuration Options:
- Environment/Cluster IDs (recommended): Use
confluent_cloud_environment_idandconfluent_cloud_cluster_idfor automatic URI construction - Auto-derivation: DataHub finds Kafka REST endpoint automatically from connector configs
- Manual endpoint: Specify
kafka_rest_endpointif auto-derivation doesn't work - Separate credentials (typical): Use
connect_api_key/connect_api_secretfor Connect API andkafka_api_key/kafka_api_secretfor Kafka REST API - Legacy credentials: Use
username/passwordfor Connect API (falls back for Kafka API if separate credentials not provided)
Air-gapped or Performance-Optimized Environments
Disable topic discovery entirely for environments where API access is not available or not needed:
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
use_connect_topics_api: false # Disables all topic discovery API calls
Note: When use_connect_topics_api is false, topic information will not be extracted, which may impact lineage accuracy but improves performance and works in air-gapped environments.
Enhanced Topic Resolution for Source and Sink Connectors
DataHub now provides intelligent topic resolution that works reliably across all environments, including Confluent Cloud where the Kafka Connect topics API is unavailable.
How It Works
Source Connectors (Debezium, Snowflake CDC, JDBC):
- Always derive expected topics from connector configuration (
table.include.list,database.include.list) - Apply configured transforms (RegexRouter, EventRouter, etc.) to predict final topic names
- When Kafka API is available: Filter to only topics that exist in Kafka
- When Kafka API is unavailable (Confluent Cloud): Create lineages for all configured tables without filtering
Sink Connectors (S3, Snowflake, BigQuery, JDBC):
- Support both explicit topic lists (
topicsfield) and regex patterns (topics.regexfield) - When
topics.regexis used:- Priority 1: Match against
manifest.topic_namesfrom Kafka API (if available) - Priority 2: Query DataHub for Kafka topics and match pattern (if
use_schema_resolverenabled) - Priority 3: Warn user that pattern cannot be expanded
- Priority 1: Match against
Configuration Examples
Source Connector with Pattern Expansion:
# Debezium PostgreSQL source with wildcard tables
connector.config:
table.include.list: "public.analytics_.*"
# When Kafka API unavailable, DataHub will:
# 1. Query DataHub for all PostgreSQL tables matching pattern
# 2. Derive expected topic names (server.schema.table format)
# 3. Apply transforms if configured
# 4. Create lineages without Kafka validation
Sink Connector with topics.regex (Confluent Cloud):
# S3 sink connector consuming from pattern-matched topics
connector.config:
topics.regex: "analytics\\..*" # Match topics like analytics.users, analytics.orders
# When Kafka API unavailable, DataHub will:
# 1. Query DataHub for all Kafka topics (requires use_schema_resolver: true)
# 2. Match topics against the regex pattern
# 3. Create lineages for matched topics
Enable DataHub Topic Querying for Sink Connectors:
source:
type: kafka-connect
config:
connect_uri: "https://api.confluent.cloud/connect/v1/environments/env-123/clusters/lkc-abc456"
username: "your-connect-api-key"
password: "your-connect-api-secret"
# Enable DataHub schema resolver for topic pattern expansion
use_schema_resolver: true # Required for topics.regex fallback
# Configure graph connection for DataHub queries
datahub_gms_url: "http://localhost:8080" # Your DataHub GMS endpoint
Key Benefits
- Confluent Cloud Support: Both source and sink connectors work correctly with pattern-based configurations
- Config as Source of Truth: Source connectors always derive topics from configuration, not from querying all tables in DataHub
- Smart Fallback: Sink connectors can query DataHub for Kafka topics when Kafka API is unavailable
- Pattern Expansion: Wildcards in
table.include.listandtopics.regexare properly expanded - Transform Support: All transforms (RegexRouter, EventRouter, etc.) are applied correctly
When DataHub Topic Querying is Used
DataHub will query for topics in these scenarios:
Source Connectors:
- When expanding wildcard patterns in
table.include.list(e.g.,ANALYTICS.PUBLIC.*) - Queries source platform (PostgreSQL, MySQL, etc.) for tables matching the pattern
Sink Connectors:
- When
topics.regexis used AND Kafka API is unavailable (Confluent Cloud) - Queries DataHub's Kafka platform for topics matching the regex pattern
- Requires
use_schema_resolver: truein configuration
Important Notes:
- DataHub never queries "all tables" to create lineages - config is always the source of truth
- Source connectors query source platforms (databases) to expand table patterns
- Sink connectors query Kafka platform to expand topic regex patterns
- Both require appropriate DataHub credentials and connectivity
Using DataHub Schema Resolver for Pattern Expansion and Column-Level Lineage
The Kafka Connect source can query DataHub for schema information to provide two capabilities:
- Pattern Expansion - Converts wildcard patterns like
database.*into actual table names by querying DataHub - Column-Level Lineage - Generates field-level lineage by matching schemas between source tables and Kafka topics
Both features require existing metadata in DataHub from your database and Kafka schema registry ingestion.
Auto-Enabled for Confluent Cloud
Starting with the latest version, use_schema_resolver is automatically enabled for Confluent Cloud environments to provide better defaults for enhanced lineage extraction. This gives you column-level lineage and pattern expansion out of the box!
Confluent Cloud (Auto-Enabled):
source:
type: kafka-connect
config:
# Confluent Cloud environment
confluent_cloud_environment_id: "env-xyz123"
confluent_cloud_cluster_id: "lkc-abc456"
username: "your-connect-api-key"
password: "your-connect-api-secret"
# Schema resolver automatically enabled! ✓
# use_schema_resolver: true (auto-enabled)
# schema_resolver_expand_patterns: true (auto-enabled)
# schema_resolver_finegrained_lineage: true (auto-enabled)
To disable (if you don't need these features):
source:
type: kafka-connect
config:
confluent_cloud_environment_id: "env-xyz123"
confluent_cloud_cluster_id: "lkc-abc456"
use_schema_resolver: false # Explicitly disable auto-enable
Self-hosted (Manual Enable Required):
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
# Must explicitly enable for self-hosted
use_schema_resolver: true
# DataHub connection
datahub_api:
server: "http://localhost:8080"
Important Prerequisites:
⚠️ Source database tables must be ingested into DataHub BEFORE running Kafka Connect ingestion
The schema resolver queries DataHub for existing table metadata. If your source databases haven't been ingested yet, the feature will have no effect. Run database ingestion first!
Recommended Ingestion Order:
- Ingest source databases (PostgreSQL, MySQL, Snowflake, etc.) → DataHub
- Ingest Kafka schema registry (optional, for topic schemas) → DataHub
- Run Kafka Connect ingestion → Enjoy enhanced lineage!
Configuration Overview
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
# Enable DataHub schema querying (auto-enabled for Confluent Cloud)
use_schema_resolver: true
# Control which features to use (both default to true when schema resolver enabled)
schema_resolver_expand_patterns: true # Expand wildcard patterns
schema_resolver_finegrained_lineage: true # Generate column-level lineage
# DataHub connection (required when use_schema_resolver=true)
datahub_api:
server: "http://localhost:8080"
token: "your-datahub-token" # Optional
Pattern Expansion
Converts wildcard patterns in connector configurations into actual table names by querying DataHub.
Example: MySQL Source with Wildcards
# Connector config contains pattern
connector.config:
table.include.list: "analytics.user_*" # Pattern: matches user_events, user_profiles, etc.
# DataHub config
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true
# Result: DataHub queries for MySQL tables matching "analytics.user_*"
# Finds: user_events, user_profiles, user_sessions
# Creates lineage:
# mysql.analytics.user_events -> kafka.server.analytics.user_events
# mysql.analytics.user_profiles -> kafka.server.analytics.user_profiles
# mysql.analytics.user_sessions -> kafka.server.analytics.user_sessions
When to use:
- Connector configs have wildcard patterns (
database.*,schema.table_*) - You want accurate lineage without manually listing every table
- Source metadata exists in DataHub from database ingestion
When to skip:
- Connector configs use explicit table lists (no patterns)
- Source metadata not yet in DataHub
- Want faster ingestion without DataHub API calls
Configuration:
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true # Enable pattern expansion
# If you only want column-level lineage but NOT pattern expansion:
# schema_resolver_expand_patterns: false
Behavior without schema resolver: Patterns are treated as literal table names, resulting in potentially incorrect lineage.
Column-Level Lineage
Generates field-level lineage by matching column names between source tables and Kafka topics.
Example: PostgreSQL to Kafka CDC
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_finegrained_lineage: true
# Source table schema in DataHub:
# postgres.public.users: [user_id, email, created_at, updated_at]
# Kafka topic schema in DataHub:
# kafka.server.public.users: [user_id, email, created_at, updated_at]
# Result: Column-level lineage created:
# postgres.public.users.user_id -> kafka.server.public.users.user_id
# postgres.public.users.email -> kafka.server.public.users.email
# postgres.public.users.created_at -> kafka.server.public.users.created_at
# postgres.public.users.updated_at -> kafka.server.public.users.updated_at
Requirements:
- Source table schema exists in DataHub (from database ingestion)
- Kafka topic schema exists in DataHub (from schema registry or Kafka ingestion)
- Column names match between source and target (case-insensitive matching)
Benefits:
- Impact Analysis: See which fields are affected by schema changes
- Data Tracing: Track specific data elements through pipelines
- Schema Understanding: Visualize how data flows at the field level
ReplaceField Transform Support:
Column-level lineage respects ReplaceField transforms that filter or rename columns:
# Connector excludes specific fields
connector.config:
transforms: "removeFields"
transforms.removeFields.type: "org.apache.kafka.connect.transforms.ReplaceField$Value"
transforms.removeFields.exclude: "internal_id,temp_column"
# DataHub behavior:
# Source schema: [user_id, email, internal_id, temp_column]
# After transform: [user_id, email]
# Column lineage created only for: user_id, email
Configuration:
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_finegrained_lineage: true # Enable column-level lineage
# If you only want pattern expansion but NOT column-level lineage:
# schema_resolver_finegrained_lineage: false
Behavior without schema resolver:
Only dataset-level lineage is created (e.g., postgres.users -> kafka.users), without field-level detail.
Complete Configuration Example
source:
type: kafka-connect
config:
# Kafka Connect cluster
connect_uri: "http://localhost:8083"
cluster_name: "production-connect"
# Enable schema resolver features
use_schema_resolver: true
schema_resolver_expand_patterns: true # Expand wildcard patterns
schema_resolver_finegrained_lineage: true # Generate column-level lineage
# DataHub connection
datahub_api:
server: "http://datahub.company.com"
token: "${DATAHUB_TOKEN}"
# Platform instances (if using multiple)
platform_instance_map:
postgres: "prod-postgres"
kafka: "prod-kafka"
Performance Impact
API Calls per Connector:
- Pattern expansion: 1 GraphQL query per unique wildcard pattern
- Column-level lineage: 2 GraphQL queries (source schema + target schema)
- Results cached for ingestion run duration
Optimization:
# Minimal configuration - no schema resolver
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
# use_schema_resolver: false # Default - no DataHub queries
# Pattern expansion only
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: true
schema_resolver_finegrained_lineage: false # Skip column lineage for faster ingestion
# Column lineage only
source:
type: kafka-connect
config:
use_schema_resolver: true
schema_resolver_expand_patterns: false # Skip pattern expansion
schema_resolver_finegrained_lineage: true
Best Practice: Run database and Kafka schema ingestion before Kafka Connect ingestion to pre-populate DataHub with schema metadata.
Troubleshooting
"Pattern expansion found no matches for: analytics.*"
Causes:
- Source database metadata not in DataHub
- Pattern syntax doesn't match DataHub dataset names
- Platform instance mismatch
Solutions:
- Run database ingestion first to populate DataHub
- Verify pattern matches table naming in source system
- Check
platform_instance_mapmatches database ingestion config - Use explicit table list to bypass pattern expansion temporarily
"SchemaResolver not available: DataHub graph connection is not available"
Causes:
- Missing
datahub_apiconfiguration - DataHub GMS not accessible
Solutions:
source:
type: kafka-connect
config:
use_schema_resolver: true
datahub_api:
server: "http://localhost:8080" # Add DataHub GMS URL
token: "your-token" # Add if authentication enabled
Column-level lineage not appearing
Check:
- Source table schema exists: Search for table in DataHub UI
- Kafka topic schema exists: Search for topic in DataHub UI
- Column names match (case differences are handled automatically)
- Check ingestion logs for warnings about missing schemas
Slow ingestion with schema resolver enabled
Profile:
- Check logs for "Schema resolver cache hits: X, misses: Y"
- High misses indicate missing metadata in DataHub
Temporarily disable to compare:
use_schema_resolver: false
Working with Platform Instances
If you've multiple instances of kafka OR source/sink systems that are referred in your kafka-connect setup, you'd need to configure platform instance for these systems in kafka-connect recipe to generate correct lineage edges. You must have already set platform_instance in recipes of original source/sink systems. Refer the document Working with Platform Instances to understand more about this.
There are two options available to declare source/sink system's platform_instance in kafka-connect recipe. If single instance of platform is used across all kafka-connect connectors, you can use platform_instance_map to specify platform_instance to use for a platform when constructing URNs for lineage.
Example:
# Map of platform name to platform instance
platform_instance_map:
snowflake: snowflake_platform_instance
mysql: mysql_platform_instance
If multiple instances of platform are used across kafka-connect connectors, you'd need to specify platform_instance to use for platform for every connector.
Example - Multiple MySQL Source Connectors each reading from different mysql instance
# Map of platform name to platform instance per connector
connect_to_platform_map:
mysql_connector1:
mysql: mysql_instance1
mysql_connector2:
mysql: mysql_instance2
Here mysql_connector1 and mysql_connector2 are names of MySQL source connectors as defined in kafka-connect connector config.
Example - Multiple MySQL Source Connectors each reading from difference mysql instance and writing to different kafka cluster
connect_to_platform_map:
mysql_connector1:
mysql: mysql_instance1
kafka: kafka_instance1
mysql_connector2:
mysql: mysql_instance2
kafka: kafka_instance2
You can also use combination of platform_instance_map and connect_to_platform_map in your recipe. Note that, the platform_instance specified for the connector in connect_to_platform_map will always take higher precedance even if platform_instance for same platform is set in platform_instance_map.
If you do not use platform_instance in original source/sink recipes, you do not need to specify them in above configurations.
Note that, you do not need to specify platform_instance for BigQuery.
Example - Multiple BigQuery Sink Connectors each writing to different kafka cluster
connect_to_platform_map:
bigquery_connector1:
kafka: kafka_instance1
bigquery_connector2:
kafka: kafka_instance2
Provided Configurations from External Sources
Kafka Connect supports pluggable configuration providers which can load configuration data from external sources at runtime. These values are not available to DataHub ingestion source through Kafka Connect APIs. If you are using such provided configurations to specify connection url (database, etc) in Kafka Connect connector configuration then you will need also add these in provided_configs section in recipe for DataHub to generate correct lineage.
# Optional mapping of provider configurations if using
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
Troubleshooting
Topic Discovery Issues
Problem: Missing or incomplete topic information in lineage
Solutions:
Verify Environment Detection:
# Check logs for environment detection messages
# Self-hosted: "Detected self-hosted Kafka Connect - using runtime topics API"
# Confluent Cloud: "Detected Confluent Cloud - using comprehensive Kafka REST API topic retrieval"Test API Connectivity:
# For self-hosted - test topics API
curl -X GET "http://localhost:8083/connectors/{connector-name}/topics"
# For Confluent Cloud - test Kafka REST API v3
curl -X GET "https://pkc-xxxxx.region.provider.confluent.cloud/kafka/v3/clusters/{cluster-id}/topics"Configuration Troubleshooting:
# Enable debug logging
source:
type: kafka-connect
config:
# ... other config ...
use_connect_topics_api: true # Ensure this is enabled (default)
Environment-Specific Issues
Self-hosted Issues:
- 403/401 errors: Check authentication credentials (
username,password) - 404 errors: Verify Kafka Connect cluster is running and REST API is accessible
- Empty topic lists: Check if connectors are actually running and processing data
Confluent Cloud Issues:
- Missing topics: Verify connector configuration has proper source table fields (
table.include.list,query) - Transform accuracy: Check that RegexRouter patterns in connector config are valid Java regex
- Complex transforms: Now fully supported via forward transform pipeline with topic validation
- Schema preservation: Full schema information (e.g.,
public.users) is maintained through transform pipeline
Performance Optimization
If topic discovery is impacting performance:
source:
type: kafka-connect
config:
connect_uri: "http://localhost:8083"
use_connect_topics_api: false # Disable for better performance (no topic info)
Code Coordinates
- Class Name:
datahub.ingestion.source.kafka_connect.kafka_connect.KafkaConnectSource - Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Kafka Connect, feel free to ping us on our Slack.