dbt (dagster-dbt)

Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single workflow. Dagster’s software-defined asset abstractions make it simple to define data assets that depend on specific dbt models, or to define the computation required to compute the sources that your dbt models depend on.

Related guides: Visualize and orchestrate assets in dbt Core and dbt Cloud.

dbt Core

Here, we provide interfaces to manage dbt projects invoked by the local dbt command line interface (dbt CLI).

Assets (dbt Core)

dagster_dbt.load_assets_from_dbt_project(project_dir, profiles_dir=None, target_dir=None, select=None, exclude=None, key_prefix=None, source_key_prefix=None, runtime_metadata_fn=None, io_manager_key=None, node_info_to_asset_key=<function _get_node_asset_key>, use_build_command=False, partitions_def=None, partition_key_to_vars_fn=None, node_info_to_group_fn=<function _get_node_group_name>, node_info_to_freshness_policy_fn=<function _get_node_freshness_policy>, node_info_to_definition_metadata_fn=<function _get_node_metadata>, display_raw_sql=None, dbt_resource_key='dbt')[source]

Loads a set of dbt models from a dbt project into Dagster assets.

Creates one Dagster asset for each dbt model. All assets will be re-materialized using a single dbt run or dbt build command.

Parameters:
  • project_dir (Optional[str]) – The directory containing the dbt project to load.

  • profiles_dir (Optional[str]) – The profiles directory to use for loading the DBT project. Defaults to a directory called “config” inside the project_dir.

  • target_dir (Optional[str]) – The target directory where dbt will place compiled artifacts. Defaults to “target” underneath the project_dir.

  • select (Optional[str]) – A dbt selection string for the models in a project that you want to include. Defaults to “*”.

  • exclude (Optional[str]) – A dbt selection string for the models in a project that you want to exclude. Defaults to “”.

  • key_prefix (Optional[Union[str, List[str]]]) – A prefix to apply to all models in the dbt project. Does not apply to sources.

  • dbt_resource_key (Optional[str]) – The resource key that the dbt resource will be specified at. Defaults to “dbt”.

  • source_key_prefix (Optional[Union[str, List[str]]]) – A prefix to apply to all sources in the dbt project. Does not apply to models.

  • runtime_metadata_fn – (Optional[Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]]): A function that will be run after any of the assets are materialized and returns metadata entries for the asset, to be displayed in the asset catalog for that run.

  • io_manager_key (Optional[str]) – The IO manager key that will be set on each of the returned assets. When other ops are downstream of the loaded assets, the IOManager specified here determines how the inputs to those ops are loaded. Defaults to “io_manager”.

  • node_info_to_asset_key – (Mapping[str, Any] -> AssetKey): A function that takes a dictionary of dbt metadata and returns the AssetKey that you want to represent a given model or source. By default: dbt model -> AssetKey([model_name]) and dbt source -> AssetKey([source_name, table_name])

  • use_build_command (bool) – Flag indicating if you want to use dbt build as the core computation for this asset, rather than dbt run.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the dbt assets.

  • partition_key_to_vars_fn (Optional[str -> Dict[str, Any]]) – A function to translate a given partition key (e.g. ‘2022-01-01’) to a dictionary of vars to be passed into the dbt invocation (e.g. {“run_date”: “2022-01-01”})

  • node_info_to_group_fn (Dict[str, Any] -> Optional[str]) – A function that takes a dictionary of dbt node info and returns the group that this node should be assigned to.

  • node_info_to_freshness_policy_fn (Dict[str, Any] -> Optional[FreshnessPolicy]) – A function that takes a dictionary of dbt node info and optionally returns a FreshnessPolicy that should be applied to this node. By default, freshness policies will be created from config applied to dbt models, i.e.: dagster_freshness_policy={“maximum_lag_minutes”: 60, “cron_schedule”: “0 9 * * *”} will result in that model being assigned FreshnessPolicy(maximum_lag_minutes=60, cron_schedule=”0 9 * * *”)

  • node_info_to_definition_metadata_fn (Dict[str, Any] -> Optional[Dict[str, MetadataUserInput]]) – A function that takes a dictionary of dbt node info and optionally returns a dictionary of metadata to be attached to the corresponding definition. This is added to the default metadata assigned to the node, which consists of the node’s schema (if present).

  • display_raw_sql (Optional[bool]) – [Experimental] A flag to indicate if the raw sql associated with each model should be included in the asset description. For large projects, setting this flag to False is advised to reduce the size of the resulting snapshot.

dagster_dbt.load_assets_from_dbt_manifest(manifest_json, select=None, exclude=None, key_prefix=None, source_key_prefix=None, runtime_metadata_fn=None, io_manager_key=None, selected_unique_ids=None, node_info_to_asset_key=<function _get_node_asset_key>, use_build_command=False, partitions_def=None, partition_key_to_vars_fn=None, node_info_to_group_fn=<function _get_node_group_name>, node_info_to_freshness_policy_fn=<function _get_node_freshness_policy>, node_info_to_definition_metadata_fn=<function _get_node_metadata>, display_raw_sql=None, dbt_resource_key='dbt')[source]

Loads a set of dbt models, described in a manifest.json, into Dagster assets.

Creates one Dagster asset for each dbt model. All assets will be re-materialized using a single dbt run command.

Parameters:
  • manifest_json (Optional[Mapping[str, Any]]) – The contents of a DBT manifest.json, which contains a set of models to load into assets.

  • select (Optional[str]) – A dbt selection string for the models in a project that you want to include. Defaults to “*”.

  • exclude (Optional[str]) – A dbt selection string for the models in a project that you want to exclude. Defaults to “”.

  • key_prefix (Optional[Union[str, List[str]]]) – A prefix to apply to all models in the dbt project. Does not apply to sources.

  • source_key_prefix (Optional[Union[str, List[str]]]) – A prefix to apply to all sources in the dbt project. Does not apply to models.

  • dbt_resource_key (Optional[str]) – The resource key that the dbt resource will be specified at. Defaults to “dbt”.

  • runtime_metadata_fn – (Optional[Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]]): A function that will be run after any of the assets are materialized and returns metadata entries for the asset, to be displayed in the asset catalog for that run.

  • io_manager_key (Optional[str]) – The IO manager key that will be set on each of the returned assets. When other ops are downstream of the loaded assets, the IOManager specified here determines how the inputs to those ops are loaded. Defaults to “io_manager”.

  • selected_unique_ids (Optional[Set[str]]) – The set of dbt unique_ids that you want to load as assets.

  • node_info_to_asset_key – (Mapping[str, Any] -> AssetKey): A function that takes a dictionary of dbt node info and returns the AssetKey that you want to represent that node. By default, the asset key will simply be the name of the dbt model.

  • use_build_command (bool) – Flag indicating if you want to use dbt build as the core computation for this asset, rather than dbt run.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the dbt assets.

  • partition_key_to_vars_fn (Optional[str -> Dict[str, Any]]) – A function to translate a given partition key (e.g. ‘2022-01-01’) to a dictionary of vars to be passed into the dbt invocation (e.g. {“run_date”: “2022-01-01”})

  • node_info_to_group_fn (Dict[str, Any] -> Optional[str]) – A function that takes a dictionary of dbt node info and returns the group that this node should be assigned to.

  • node_info_to_freshness_policy_fn (Dict[str, Any] -> Optional[FreshnessPolicy]) – A function that takes a dictionary of dbt node info and optionally returns a FreshnessPolicy that should be applied to this node. By default, freshness policies will be created from config applied to dbt models, i.e.: dagster_freshness_policy={“maximum_lag_minutes”: 60, “cron_schedule”: “0 9 * * *”} will result in that model being assigned FreshnessPolicy(maximum_lag_minutes=60, cron_schedule=”0 9 * * *”)

  • node_info_to_definition_metadata_fn (Dict[str, Any] -> Optional[Dict[str, MetadataUserInput]]) – A function that takes a dictionary of dbt node info and optionally returns a dictionary of metadata to be attached to the corresponding definition. This is added to the default metadata assigned to the node, which consists of the node’s schema (if present).

  • display_raw_sql (Optional[bool]) – [Experimental] A flag to indicate if the raw sql associated with each model should be included in the asset description. For large projects, setting this flag to False is advised to reduce the size of the resulting snapshot.

class dagster_dbt.DbtManifestAssetSelection(*args, **kwargs)[source]

Defines a selection of assets from a parsed dbt manifest.json file and a dbt-syntax selection string.

Parameters:
  • manifest_json (Mapping[str, Any]) – The parsed manifest.json file from your dbt project. Must provide either this argument or manifest_json_path.

  • manifest_json_path – (Optional[str]): The path to a manifest.json file representing the current state of your dbt project. Must provide either this argument or manifest_json.

  • select (str) – A dbt-syntax selection string, e.g. tag:foo or config.materialized:table.

  • exclude (str) – A dbt-syntax exclude string. Defaults to “”.

  • resource_types (Sequence[str]) – The resource types to select. Defaults to [“model”].

  • node_info_to_asset_key (Callable[[Mapping[str, Any]], AssetKey]) – A function that takes a dictionary of dbt metadata and returns the AssetKey that you want to represent a given model or source. If you pass in a custom function to load_assets_from_dbt_manifest, you must also pass in the same function here.

  • state_path – (Optional[str]): The path to a folder containing the manifest.json file representing the previous state of your dbt project. Providing this path will allow you to select dbt assets using the state: selector. To learn more, see the [dbt docs](https://docs.getdbt.com/reference/node-selection/methods#the-state-method).

Example

my_dbt_assets = load_assets_from_dbt_manifest(
    manifest_json,
    node_info_to_asset_key=my_node_info_to_asset_key_fn,
)

# This will select all assets that have the tag "foo" and are in the path "marts/finance"
my_selection = DbtManifestAssetSelection(
    manifest_json,
    select="tag:foo,path:marts/finance",
    node_info_to_asset_key=my_node_info_to_asset_key_fn,
)

# This will retrieve the asset keys according to the selection
selected_asset_keys = my_selection.resolve(my_dbt_assets)

Ops (dbt Core)

If you’re using asset-based dbt APIs like load_assets_from_dbt_project, you usually will not also use the below op-based APIs.

dagster_dbt provides a set of pre-built ops that work with either the CLI or RPC interfaces. For more advanced use cases, we suggest building your own ops which directly interact with these resources.

dagster_dbt.dbt_run_op = <dagster._core.definitions.op_definition.OpDefinition object>[source]

Config Schema:
yield_materializations (Bool, optional):

If True, materializations corresponding to the results of the dbt operation will be yielded when the op executes. Default: True

Default Value: True

asset_key_prefix (List[String], optional):

If provided and yield_materializations is True, these components will be used to prefix the generated asset keys.

Default Value: [‘dbt’]

This op executes a dbt run command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_run_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_run_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_run_op()
dagster_dbt.dbt_compile_op(context)[source]

This op executes a dbt compile command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_compile_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_compile_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_compile_op()
dagster_dbt.dbt_ls_op(context)[source]

This op executes a dbt ls command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_ls_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_ls_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_ls_op()
dagster_dbt.dbt_test_op(context)[source]

This op executes a dbt test command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_test_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_test_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_test_op()
dagster_dbt.dbt_snapshot_op(context)[source]

This op executes a dbt snapshot command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_snapshot_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_snapshot_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_snapshot_op()
dagster_dbt.dbt_seed_op(context)[source]

This op executes a dbt seed command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_seed_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_seed_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_seed_op()
dagster_dbt.dbt_docs_generate_op(context)[source]

This op executes a dbt docs generate command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_docs_generate_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_docs_generate_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_docs_generate_op()

Resources (dbt Core)

CLI Resources

class dagster_dbt.DbtCliResource(executable, default_flags, warn_error, ignore_handled_error, target_path, logger=None, docs_url=None, json_log_format=True, capture_logs=True, debug=False)[source]

A resource that allows you to execute dbt cli commands.

For the most up-to-date documentation on the specific parameters available to you for each command, check out the dbt docs:

https://docs.getdbt.com/reference/commands/run

To use this as a dagster resource, we recommend using dbt_cli_resource.

build(select=None, **kwargs)[source]

Run the build command on a dbt project. kwargs are passed in as additional parameters.

Parameters:

select (List[str], optional) – the models/resources to include in the run.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

cli(command, **kwargs)[source]
Executes a dbt CLI command. Params passed in as keyword arguments will be merged with the

default flags that were configured on resource initialization (if any) overriding the default values if necessary.

Parameters:

command (str) – The command you wish to run (e.g. ‘run’, ‘test’, ‘docs generate’, etc.)

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

compile(models=None, exclude=None, select=None, **kwargs)[source]

Run the compile command on a dbt project. kwargs are passed in as additional parameters.

Parameters:
  • models (List[str], optional) – the models to include in compilation.

  • exclude (List[str]), optional) – the models to exclude from compilation.

  • select (List[str], optional) – the models to include in compilation.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

freshness(select=None, **kwargs)[source]

Run the source snapshot-freshness command on a dbt project. kwargs are passed in as additional parameters.

Parameters:

select (List[str], optional) – the sources to include in the run.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

generate_docs(compile_project=False, **kwargs)[source]

Run the docs generate command on a dbt project. kwargs are passed in as additional parameters.

Parameters:

compile_project (bool, optional) – If true, compile the project before generating a catalog.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

get_manifest_json(**kwargs)[source]

Get a parsed version of the manifest.json file for the relevant dbt project.

Returns:

dictionary containing the parsed contents of the manifest json file

for this dbt project.

Return type:

Dict[str, Any]

get_run_results_json(**kwargs)[source]

Get a parsed version of the run_results.json file for the relevant dbt project.

Returns:

dictionary containing the parsed contents of the manifest json file

for this dbt project.

Return type:

Dict[str, Any]

ls(select=None, models=None, exclude=None, **kwargs)[source]

Run the ls command on a dbt project. kwargs are passed in as additional parameters.

Parameters:
  • select (List[str], optional) – the resources to include in the output.

  • models (List[str], optional) – the models to include in the output.

  • exclude (List[str], optional) – the resources to exclude from the output.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

remove_run_results_json(**kwargs)[source]

Remove the run_results.json file from previous runs (if it exists).

run(models=None, exclude=None, select=None, **kwargs)[source]

Run the run command on a dbt project. kwargs are passed in as additional parameters.

Parameters:
  • models (List[str], optional) – the models to include in the run.

  • exclude (List[str]), optional) – the models to exclude from the run.

  • select (List[str], optional) – the models to include in the run.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

run_operation(macro, args=None, **kwargs)[source]

Run the run-operation command on a dbt project. kwargs are passed in as additional parameters.

Parameters:
  • macro (str) – the dbt macro to invoke.

  • args (Dict[str, Any], optional) – the keyword arguments to be supplied to the macro.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

seed(show=False, select=None, exclude=None, **kwargs)[source]

Run the seed command on a dbt project. kwargs are passed in as additional parameters.

Parameters:
  • show (bool, optional) – If True, then show a sample of the seeded data in the response. Defaults to False.

  • select (List[str], optional) – the snapshots to include in the run.

  • exclude (List[str], optional) – the snapshots to exclude from the run.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

snapshot(select=None, exclude=None, **kwargs)[source]

Run the snapshot command on a dbt project. kwargs are passed in as additional parameters.

Parameters:
  • select (List[str], optional) – the snapshots to include in the run.

  • exclude (List[str], optional) – the snapshots to exclude from the run.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

test(models=None, exclude=None, data=True, schema=True, select=None, **kwargs)[source]

Run the test command on a dbt project. kwargs are passed in as additional parameters.

Parameters:
  • models (List[str], optional) – the models to include in testing.

  • exclude (List[str], optional) – the models to exclude from testing.

  • data (bool, optional) – If True (default), then run data tests.

  • schema (bool, optional) – If True (default), then run schema tests.

  • select (List[str], optional) – the models to include in testing.

Returns:

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type:

DbtCliOutput

class dagster_dbt.DbtCliOutput(command, return_code, raw_output, logs, result, docs_url=None)[source]

The results of executing a dbt command, along with additional metadata about the dbt CLI process that was run.

Note that users should not construct instances of this class directly. This class is intended to be constructed from the JSON output of dbt commands.

command

The full shell command that was executed.

Type:

str

return_code

The return code of the dbt CLI process.

Type:

int

raw_output

The raw output (stdout) of the dbt CLI process.

Type:

str

logs

List of parsed JSON logs produced by the dbt command.

Type:

List[Dict[str, Any]]

result

Dictionary containing dbt-reported result information contained in run_results.json. Some dbt commands do not produce results, and will therefore have result = None.

Type:

Optional[Dict[str, Any]]

docs_url

Hostname where dbt docs are being served for this project.

Type:

Optional[str]

dagster_dbt.dbt_cli_resource ResourceDefinition[source]

Config Schema:
project_dir (dagster.StringSource, optional):

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles_dir (dagster.StringSource, optional):

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (dagster.StringSource, optional):

Which profile to load. Overrides setting in dbt_project.yml.

target (dagster.StringSource, optional):

Which target to load for the given profile.

vars (permissive dict, optional):

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass_cache (Bool, optional):

If set, bypass the adapter-level cache of database state

Default Value: False

warn_error (Bool, optional):

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (dagster.StringSource, optional):

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Bool, optional):

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target_path (dagster.StringSource, optional):

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (dagster.StringSource, optional):

The url for where dbt docs are being served for this project.

json_log_format (Bool, optional):

When True, dbt will invoked with the –log-format json flag, allowing Dagster to parse the log messages and emit simpler log messages to the event log.

Default Value: True

capture_logs (Bool, optional):

When True, logs emitted from dbt will be logged to the Dagster event log.

Default Value: True

debug (Bool, optional):

When True, dbt will be invoked with the –debug flag.

Default Value: False

This resource issues dbt CLI commands against a configured dbt project.

RPC Resources

class dagster_dbt.DbtRpcResource(host='0.0.0.0', port=8580, jsonrpc_version='2.0', logger=None, **_)[source]

A client for a dbt RPC server.

To use this as a dagster resource, we recommend using dbt_rpc_resource.

class dagster_dbt.DbtRpcSyncResource(host='0.0.0.0', port=8580, jsonrpc_version='2.0', logger=None, poll_interval=1, **_)[source]
class dagster_dbt.DbtRpcOutput(response)[source]

The output from executing a dbt command via the dbt RPC server.

result

The parsed contents of the “result” field of the JSON response from the rpc server (if any).

Type:

Dict[str, Any]

response_dict

The entire contents of the JSON response from the rpc server.

Type:

Dict[str, Any]

response

The original Response from which this output was generated.

Type:

requests.Response

dagster_dbt.local_dbt_rpc_resource ResourceDefinition

This resource defines a dbt RPC client for an RPC server running on 0.0.0.0:8580.

dagster_dbt.dbt_rpc_resource ResourceDefinition[source]

Config Schema:
host (dagster.StringSource):

port (dagster.IntSource, optional):

Default Value: 8580

This resource defines a dbt RPC client.

To configure this resource, we recommend using the configured method.

Examples

from dagster_dbt import dbt_rpc_resource

custom_dbt_rpc_resource = dbt_rpc_resource.configured({"host": "80.80.80.80","port": 8080,})

@job(resource_defs={"dbt_rpc": custom_dbt_rpc_sync_resource})
def dbt_rpc_job():
    # Run ops with `required_resource_keys={"dbt_rpc", ...}`.
dagster_dbt.dbt_rpc_sync_resource ResourceDefinition[source]

Config Schema:
host (dagster.StringSource):

port (dagster.IntSource, optional):

Default Value: 8580

poll_interval (dagster.IntSource, optional):

Default Value: 1

This resource defines a synchronous dbt RPC client, which sends requests to a dbt RPC server, and waits for the request to complete before returning.

To configure this resource, we recommend using the configured method.

Examples

from dagster_dbt import dbt_rpc_sync_resource

custom_sync_dbt_rpc_resource = dbt_rpc_sync_resource.configured({"host": "80.80.80.80","port": 8080,})

@job(resource_defs={"dbt_rpc": custom_dbt_rpc_sync_resource})
def dbt_rpc_sync_job():
    # Run ops with `required_resource_keys={"dbt_rpc", ...}`.

dbt Cloud

Here, we provide interfaces to manage dbt projects invoked by the hosted dbt Cloud service.

Assets (dbt Cloud)

dagster_dbt.load_assets_from_dbt_cloud_job(dbt_cloud, job_id, node_info_to_asset_key=<function _get_node_asset_key>, node_info_to_group_fn=<function _get_node_group_name>, node_info_to_freshness_policy_fn=<function _get_node_freshness_policy>, partitions_def=None, partition_key_to_vars_fn=None)[source]

Loads a set of dbt models, managed by a dbt Cloud job, into Dagster assets. In order to determine the set of dbt models, the project is compiled to generate the necessary artifacts that define the dbt models and their dependencies.

One Dagster asset is created for each dbt model.

Parameters:
  • dbt_cloud (ResourceDefinition) – The dbt Cloud resource to use to connect to the dbt Cloud API.

  • job_id (int) – The ID of the dbt Cloud job to load assets from.

  • node_info_to_asset_key – (Mapping[str, Any] -> AssetKey): A function that takes a dictionary of dbt metadata and returns the AssetKey that you want to represent a given model or source. By default: dbt model -> AssetKey([model_name]) and dbt source -> AssetKey([source_name, table_name])

  • node_info_to_group_fn (Dict[str, Any] -> Optional[str]) – A function that takes a dictionary of dbt node info and returns the group that this node should be assigned to.

  • node_info_to_freshness_policy_fn (Dict[str, Any] -> Optional[FreshnessPolicy]) – A function that takes a dictionary of dbt node info and optionally returns a FreshnessPolicy that should be applied to this node. By default, freshness policies will be created from config applied to dbt models, i.e.: dagster_freshness_policy={“maximum_lag_minutes”: 60, “cron_schedule”: “0 9 * * *”} will result in that model being assigned FreshnessPolicy(maximum_lag_minutes=60, cron_schedule=”0 9 * * *”)

  • node_info_to_definition_metadata_fn (Dict[str, Any] -> Optional[Dict[str, MetadataUserInput]]) – A function that takes a dictionary of dbt node info and optionally returns a dictionary of metadata to be attached to the corresponding definition. This is added to the default metadata assigned to the node, which consists of the node’s schema (if present).

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the dbt assets.

  • partition_key_to_vars_fn (Optional[str -> Dict[str, Any]]) – A function to translate a given partition key (e.g. ‘2022-01-01’) to a dictionary of vars to be passed into the dbt invocation (e.g. {“run_date”: “2022-01-01”})

Returns:

A definition for the loaded assets.

Return type:

CacheableAssetsDefinition

Examples

from dagster import repository
from dagster_dbt import dbt_cloud_resource, load_assets_from_dbt_cloud_job

DBT_CLOUD_JOB_ID = 1234

dbt_cloud = dbt_cloud_resource.configured(
    {
        "auth_token": {"env": "DBT_CLOUD_API_TOKEN"},
        "account_id": {"env": "DBT_CLOUD_ACCOUNT_ID"},
    }
)

dbt_cloud_assets = load_assets_from_dbt_cloud_job(
    dbt_cloud=dbt_cloud, job_id=DBT_CLOUD_JOB_ID
)


@repository
def dbt_cloud_sandbox():
    return [dbt_cloud_assets]

Ops (dbt Cloud)

dagster_dbt.dbt_cloud_run_op = <dagster._core.definitions.op_definition.OpDefinition object>[source]

Config Schema:
job_id (Int):

The integer ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

poll_interval (Float, optional):

The time (in seconds) that will be waited between successive polls.

Default Value: 10

poll_timeout (Union[Float, None], optional):

The maximum time that will waited before this operation is timed out. By default, this will never time out.

Default Value: None

yield_materializations (Bool, optional):

If True, materializations corresponding to the results of the dbt operation will be yielded when the op executes.

Default Value: True

asset_key_prefix (List[String], optional):

If provided and yield_materializations is True, these components will be used to prefix the generated asset keys.

Default Value: [‘dbt’]

Initiates a run for a dbt Cloud job, then polls until the run completes. If the job fails or is otherwised stopped before succeeding, a dagster.Failure exception will be raised, and this op will fail.

It requires the use of a ‘dbt_cloud’ resource, which is used to connect to the dbt Cloud API.

Config Options:

job_id (int)

The integer ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

poll_interval (float)

The time (in seconds) that will be waited between successive polls. Defaults to 10.

poll_timeout (float)

The maximum time (in seconds) that will waited before this operation is timed out. By default, this will never time out.

yield_materializations (bool)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Defaults to True.

rasset_key_prefix (float)

If provided and yield_materializations is True, these components will be used to ” prefix the generated asset keys. Defaults to [“dbt”].

Examples:

from dagster import job
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op

my_dbt_cloud_resource = dbt_cloud_resource.configured(
    {"auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"}, "account_id": 77777}
)
run_dbt_nightly_sync = dbt_cloud_run_op.configured(
    {"job_id": 54321}, name="run_dbt_nightly_sync"
)

@job(resource_defs={"dbt_cloud": my_dbt_cloud_resource})
def dbt_cloud():
    run_dbt_nightly_sync()

Resources (dbt Cloud)

dagster_dbt.DbtCloudResourceV2

alias of DbtCloudResource

dagster_dbt.dbt_cloud_resource ResourceDefinition[source]

Config Schema:
auth_token (dagster.StringSource):

dbt Cloud API Token. User tokens can be found in the [dbt Cloud UI](https://cloud.getdbt.com/#/profile/api/), or see the [dbt Cloud Docs](https://docs.getdbt.com/docs/dbt-cloud/dbt-cloud-api/service-tokens) for instructions on creating a Service Account token.

account_id (dagster.IntSource):

dbt Cloud Account ID. This value can be found in the url of a variety of views in the dbt Cloud UI, e.g. https://cloud.getdbt.com/#/accounts/{account_id}/settings/.

disable_schedule_on_trigger (Bool, optional):

Specifies if you would like any job that is triggered using this resource to automatically disable its schedule.

Default Value: True

request_max_retries (Int, optional):

The maximum number of times requests to the dbt Cloud API should be retried before failing.

Default Value: 3

request_retry_delay (Float, optional):

Time (in seconds) to wait between each request retry.

Default Value: 0.25

dbt_cloud_host (dagster.StringSource, optional):

The hostname where dbt cloud is being hosted (e.g. https://my_org.cloud.getdbt.com/).

Default Value:https://cloud.getdbt.com/

This resource allows users to programatically interface with the dbt Cloud Administrative REST API (v2) to launch jobs and monitor their progress. This currently implements only a subset of the functionality exposed by the API.

For a complete set of documentation on the dbt Cloud Administrative REST API, including expected response JSON schemae, see the dbt Cloud API Docs.

To configure this resource, we recommend using the configured method.

Examples:

from dagster import job
from dagster_dbt import dbt_cloud_resource

my_dbt_cloud_resource = dbt_cloud_resource.configured(
    {
        "auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"},
        "account_id": {"env": "DBT_CLOUD_ACCOUNT_ID"},
    }
)

@job(resource_defs={"dbt_cloud": my_dbt_cloud_resource})
def my_dbt_cloud_job():
    ...

Types

class dagster_dbt.DbtOutput(result)[source]

Base class for both DbtCliOutput and DbtRPCOutput. Contains a single field, result, which represents the dbt-formatted result of the command that was run (if any).

Used internally, should not be instantiated directly by the user.

class dagster_dbt.DbtResource(logger=None)[source]

Base class for a resource allowing users to interface with dbt.

Errors

exception dagster_dbt.DagsterDbtError(description=None, metadata_entries=None, metadata=None, allow_retries=None)[source]

The base exception of the dagster-dbt library.

exception dagster_dbt.DagsterDbtCliRuntimeError(description, logs=None, raw_output=None, messages=None)[source]

Represents an error while executing a dbt CLI command.

exception dagster_dbt.DagsterDbtCliFatalRuntimeError(logs=None, raw_output=None, messages=None)[source]

Represents a fatal error in the dbt CLI (return code 2).

exception dagster_dbt.DagsterDbtCliHandledRuntimeError(logs=None, raw_output=None, messages=None)[source]

Represents a model error reported by the dbt CLI at runtime (return code 1).

exception dagster_dbt.DagsterDbtCliOutputsNotFoundError(path)[source]

Represents a problem in finding the target/run_results.json artifact when executing a dbt CLI command.

For more details on target/run_results.json, see https://docs.getdbt.com/reference/dbt-artifacts#run_resultsjson.

exception dagster_dbt.DagsterDbtCliUnexpectedOutputError(invalid_line_nos)[source]

Represents an error when parsing the output of a dbt CLI command.

exception dagster_dbt.DagsterDbtRpcUnexpectedPollOutputError(description=None, metadata_entries=None, metadata=None, allow_retries=None)[source]

Represents an unexpected response when polling the dbt RPC server.

Utils

dagster_dbt.utils.generate_materializations(dbt_output, asset_key_prefix=None)[source]

This function yields dagster.AssetMaterialization events for each model updated by a dbt command.

Information parsed from a DbtOutput object.

Note that this will not work with output from the dbt_rpc_resource, because this resource does not wait for a response from the RPC server before returning. Instead, use the dbt_rpc_sync_resource, which will wait for execution to complete.

Examples

from dagster import op, Output
from dagster_dbt.utils import generate_materializations
from dagster_dbt import dbt_cli_resource, dbt_rpc_sync_resource

@op(required_resource_keys={"dbt"})
def my_custom_dbt_run(context):
    dbt_output = context.resources.dbt.run()
    for materialization in generate_materializations(dbt_output):
        # you can modify the materialization object to add extra metadata, if desired
        yield materialization
    yield Output(my_dbt_output)

@job(resource_defs={{"dbt":dbt_cli_resource}})
def my_dbt_cli_job():
    my_custom_dbt_run()

@job(resource_defs={{"dbt":dbt_rpc_sync_resource}})
def my_dbt_rpc_job():
    my_custom_dbt_run()