DynamoDB
Important Capabilities
Capability | Status | Notes |
---|---|---|
Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata |
Platform Instance | ✅ | By default, platform_instance will use the AWS account id |
This plugin extracts the following:
AWS DynamoDB table names with their region, and infer schema of attribute names and types by scanning the table
Prerequisities
In order to execute this source, you need to attach the AmazonDynamoDBReadOnlyAccess
policy to a user in your AWS account. Then create an API access key and secret for the user.
For a user to be able to create API access key, it needs the following access key permissions. Your AWS account admin can create a policy with these permissions and attach to the user, you can find more details in Managing access keys for IAM users
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"iam:ListAccessKeys",
"iam:CreateAccessKey",
"iam:UpdateAccessKey",
"iam:DeleteAccessKey"
],
"Resource": "arn:aws:iam::${aws_account_id}:user/${aws:username}"
}
]
}
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[dynamodb]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: dynamodb
config:
platform_instance: "AWS_ACCOUNT_ID"
aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
aws_session_token: "${AWS_SESSION_TOKEN}"
aws_region: "${AWS_REGION}"
#
# If there are items that have most representative fields of the table, users could use the
# `include_table_item` option to provide a list of primary keys of the table in dynamodb format.
# For each `region.table`, the list of primary keys can be at most 100.
# We include these items in addition to the first 100 items in the table when we scan it.
#
# include_table_item:
# region.table_name:
# [
# {
# "partition_key_name": { "attribute_type": "attribute_value" },
# "sort_key_name": { "attribute_type": "attribute_value" },
# },
# ]
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
aws_access_key_id string | AWS access key ID. Can be auto-detected, see the AWS boto3 docs for details. |
aws_advanced_config object | Advanced AWS configuration options. These are passed directly to botocore.config.Config. |
aws_endpoint_url string | The AWS service endpoint. This is normally constructed automatically, but can be overridden here. |
aws_profile string | Named AWS profile to use. Only used if access key / secret are unset. If not set the default will be used |
aws_proxy map(str,string) | |
aws_region string | AWS region code. |
aws_secret_access_key string | AWS secret access key. Can be auto-detected, see the AWS boto3 docs for details. |
aws_session_token string | AWS session token. Can be auto-detected, see the AWS boto3 docs for details. |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to |
read_timeout number | The timeout for reading from the connection (in seconds). Default: 60 |
env string | The environment that all assets produced by this connector belong to Default: PROD |
aws_role One of string, array | AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are documented at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role |
aws_role.union One of string, AwsAssumeRoleConfig | |
aws_role.union.RoleArn ❓ string | ARN of the role to assume. |
aws_role.union.ExternalId string | External ID to use when assuming the role. |
classification ClassificationConfig | For details, refer Classification. Default: {'enabled': False, 'sample_size': 100, 'max_worker... |
classification.enabled boolean | Whether classification should be used to auto-detect glossary terms Default: False |
classification.info_type_to_term map(str,string) | |
classification.max_workers integer | Number of worker threads to use for classification. Set to 1 to disable. Default: 4 |
classification.sample_size integer | Number of sample values used for classification. Default: 100 |
classification.classifiers array | Classifiers to use to auto-detect glossary terms. If more than one classifier, infotype predictions from the classifier defined later in sequence take precedance. Default: [{'type': 'datahub', 'config': None}] |
classification.classifiers.DynamicTypedClassifierConfig DynamicTypedClassifierConfig | |
classification.classifiers.DynamicTypedClassifierConfig.type ❓ string | The type of the classifier to use. For DataHub, use datahub |
classification.classifiers.DynamicTypedClassifierConfig.config object | The configuration required for initializing the classifier. If not specified, uses defaults for classifer type. |
classification.column_pattern AllowDenyPattern | Regex patterns to filter columns for classification. This is used in combination with other patterns in parent config. Specify regex to match the column name in database.schema.table.column format. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
classification.column_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
classification.column_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
classification.column_pattern.allow.string string | |
classification.column_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
classification.column_pattern.deny.string string | |
classification.table_pattern AllowDenyPattern | Regex patterns to filter tables for classification. This is used in combination with other patterns in parent config. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*' Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
classification.table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
classification.table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
classification.table_pattern.allow.string string | |
classification.table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
classification.table_pattern.deny.string string | |
database_pattern AllowDenyPattern | regex patterns for databases to filter in ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
database_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
database_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
database_pattern.allow.string string | |
database_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
database_pattern.deny.string string | |
domain map(str,AllowDenyPattern) | A class to store allow deny regexes |
domain. key .allowarray | List of regex patterns to include in ingestion Default: ['.*'] |
domain. key .allow.stringstring | |
domain. key .ignoreCaseboolean | Whether to ignore case sensitivity during pattern matching. Default: True |
domain. key .denyarray | List of regex patterns to exclude from ingestion. Default: [] |
domain. key .deny.stringstring | |
include_table_item map(str,array) | |
include_table_item. key .objectobject | |
table_pattern AllowDenyPattern | Regex patterns for tables to filter in ingestion. The table name format is 'region.table' Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
table_pattern.allow.string string | |
table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
table_pattern.deny.string string | |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "DynamoDBConfig",
"description": "Any source that is a primary producer of Dataset metadata should inherit this class",
"type": "object",
"properties": {
"aws_access_key_id": {
"title": "Aws Access Key Id",
"description": "AWS access key ID. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"type": "string"
},
"aws_secret_access_key": {
"title": "Aws Secret Access Key",
"description": "AWS secret access key. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"type": "string"
},
"aws_session_token": {
"title": "Aws Session Token",
"description": "AWS session token. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"type": "string"
},
"aws_role": {
"title": "Aws Role",
"description": "AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are documented at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role",
"anyOf": [
{
"type": "string"
},
{
"type": "array",
"items": {
"anyOf": [
{
"type": "string"
},
{
"$ref": "#/definitions/AwsAssumeRoleConfig"
}
]
}
}
]
},
"aws_profile": {
"title": "Aws Profile",
"description": "Named AWS profile to use. Only used if access key / secret are unset. If not set the default will be used",
"type": "string"
},
"aws_region": {
"title": "Aws Region",
"description": "AWS region code.",
"type": "string"
},
"aws_endpoint_url": {
"title": "Aws Endpoint Url",
"description": "The AWS service endpoint. This is normally [constructed automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html), but can be overridden here.",
"type": "string"
},
"aws_proxy": {
"title": "Aws Proxy",
"description": "A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"read_timeout": {
"title": "Read Timeout",
"description": "The timeout for reading from the connection (in seconds).",
"default": 60,
"type": "number"
},
"aws_advanced_config": {
"title": "Aws Advanced Config",
"description": "Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).",
"type": "object"
},
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"database_pattern": {
"title": "Database Pattern",
"description": "regex patterns for databases to filter in ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns for tables to filter in ingestion. The table name format is 'region.table'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"classification": {
"title": "Classification",
"description": "For details, refer [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
"default": {
"enabled": false,
"sample_size": 100,
"max_workers": 4,
"table_pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"column_pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"info_type_to_term": {},
"classifiers": [
{
"type": "datahub",
"config": null
}
]
},
"allOf": [
{
"$ref": "#/definitions/ClassificationConfig"
}
]
},
"stateful_ingestion": {
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"domain": {
"title": "Domain",
"description": "regex patterns for tables to filter to assign domain_key. ",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/AllowDenyPattern"
}
},
"include_table_item": {
"title": "Include Table Item",
"description": "[Advanced] The primary keys of items of a table in dynamodb format the user would like to include in schema. Refer \"Advanced Configurations\" section for more details",
"type": "object",
"additionalProperties": {
"type": "array",
"items": {
"type": "object"
}
}
}
},
"additionalProperties": false,
"definitions": {
"AwsAssumeRoleConfig": {
"title": "AwsAssumeRoleConfig",
"type": "object",
"properties": {
"RoleArn": {
"title": "Rolearn",
"description": "ARN of the role to assume.",
"type": "string"
},
"ExternalId": {
"title": "Externalid",
"description": "External ID to use when assuming the role.",
"type": "string"
}
},
"required": [
"RoleArn"
]
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"DynamicTypedClassifierConfig": {
"title": "DynamicTypedClassifierConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the classifier to use. For DataHub, use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the classifier. If not specified, uses defaults for classifer type."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"ClassificationConfig": {
"title": "ClassificationConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether classification should be used to auto-detect glossary terms",
"default": false,
"type": "boolean"
},
"sample_size": {
"title": "Sample Size",
"description": "Number of sample values used for classification.",
"default": 100,
"type": "integer"
},
"max_workers": {
"title": "Max Workers",
"description": "Number of worker threads to use for classification. Set to 1 to disable.",
"default": 4,
"type": "integer"
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns to filter tables for classification. This is used in combination with other patterns in parent config. Specify regex to match the entire table name in `database.schema.table` format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"column_pattern": {
"title": "Column Pattern",
"description": "Regex patterns to filter columns for classification. This is used in combination with other patterns in parent config. Specify regex to match the column name in `database.schema.table.column` format.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"info_type_to_term": {
"title": "Info Type To Term",
"description": "Optional mapping to provide glossary term identifier for info type",
"default": {},
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"classifiers": {
"title": "Classifiers",
"description": "Classifiers to use to auto-detect glossary terms. If more than one classifier, infotype predictions from the classifier defined later in sequence take precedance.",
"default": [
{
"type": "datahub",
"config": null
}
],
"type": "array",
"items": {
"$ref": "#/definitions/DynamicTypedClassifierConfig"
}
}
},
"additionalProperties": false
},
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
"default": {},
"type": "object"
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
}
}
}
Advanced Configurations
Using include_table_item
config
If there are items that have most representative fields of the table, users could use the include_table_item
option to provide a list of primary keys of the table in dynamodb format. We include these items in addition to the first 100 items in the table when we scan it.
Take AWS DynamoDB Developer Guide Example tables and data as an example, if a account has a table Reply
in the us-west-2
region with composite primary key Id
and ReplyDateTime
, users can use include_table_item
to include 2 items as following:
Example:
# The table name should be in the format of region.table_name
# The primary keys should be in the DynamoDB format
include_table_item:
us-west-2.Reply:
[
{
"ReplyDateTime": { "S": "2015-09-22T19:58:22.947Z" },
"Id": { "S": "Amazon DynamoDB#DynamoDB Thread 1" },
},
{
"ReplyDateTime": { "S": "2015-10-05T19:58:22.947Z" },
"Id": { "S": "Amazon DynamoDB#DynamoDB Thread 2" },
},
]
Code Coordinates
- Class Name:
datahub.ingestion.source.dynamodb.dynamodb.DynamoDBSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DynamoDB, feel free to ping us on our Slack.