from typing import Any, Iterator, Mapping, Optional, Sequence, Set
import dagster._check as check
from dagster import Permissive, resource
from dagster._annotations import public
from dagster._utils.merger import merge_dicts
from ..dbt_resource import DbtResource
from .constants import CLI_COMMON_FLAGS_CONFIG_SCHEMA, CLI_COMMON_OPTIONS_CONFIG_SCHEMA
from .types import DbtCliOutput
from .utils import (
execute_cli,
execute_cli_stream,
parse_manifest,
parse_run_results,
remove_run_results,
)
[docs]class DbtCliResource(DbtResource):
"""A resource that allows you to execute dbt cli commands.
For the most up-to-date documentation on the specific parameters available to you for each
command, check out the dbt docs:
https://docs.getdbt.com/reference/commands/run
To use this as a dagster resource, we recommend using
:func:`dbt_cli_resource <dagster_dbt.dbt_cli_resource>`.
"""
def __init__(
self,
executable: str,
default_flags: Mapping[str, Any],
warn_error: bool,
ignore_handled_error: bool,
target_path: str,
logger: Optional[Any] = None,
docs_url: Optional[str] = None,
json_log_format: bool = True,
capture_logs: bool = True,
debug: bool = False,
):
self._default_flags = default_flags
self._executable = executable
self._warn_error = warn_error
self._ignore_handled_error = ignore_handled_error
self._target_path = target_path
self._docs_url = docs_url
self._json_log_format = json_log_format
self._capture_logs = capture_logs
self._debug = debug
super().__init__(logger)
@property
def default_flags(self) -> Mapping[str, Any]:
"""A set of params populated from resource config that are passed as flags to each dbt CLI command.
"""
return self._format_params(self._default_flags, replace_underscores=True)
@property
def strict_flags(self) -> Set[str]:
"""A set of flags that should not be auto-populated from the default flags unless they are
arguments to the associated function.
"""
return {"models", "exclude", "select"}
def _get_flags_dict(self, kwargs) -> Mapping[str, Any]:
extra_flags = {} if kwargs is None else kwargs
# remove default flags that are declared as "strict" and not explicitly passed in
default_flags = {
k: v
for k, v in self.default_flags.items()
if not (k in self.strict_flags and k not in extra_flags)
}
return merge_dicts(
default_flags, self._format_params(extra_flags, replace_underscores=True)
)
[docs] @public
def cli(self, command: str, **kwargs) -> DbtCliOutput:
"""Executes a dbt CLI command. Params passed in as keyword arguments will be merged with the
default flags that were configured on resource initialization (if any) overriding the
default values if necessary.
Args:
command (str): The command you wish to run (e.g. 'run', 'test', 'docs generate', etc.)
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
command = check.str_param(command, "command")
return execute_cli(
executable=self._executable,
command=command,
flags_dict=self._get_flags_dict(kwargs),
log=self.logger,
warn_error=self._warn_error,
ignore_handled_error=self._ignore_handled_error,
target_path=self._target_path,
docs_url=self._docs_url,
json_log_format=self._json_log_format,
capture_logs=self._capture_logs,
debug=self._debug,
)
def cli_stream_json(self, command: str, **kwargs) -> Iterator[Mapping[str, Any]]:
"""Executes a dbt CLI command. Params passed in as keyword arguments will be merged with the
default flags that were configured on resource initialization (if any) overriding the
default values if necessary.
Args:
command (str): The command you wish to run (e.g. 'run', 'test', 'docs generate', etc.)
"""
check.invariant(self._json_log_format, "Cannot stream JSON if json_log_format is False.")
for event in execute_cli_stream(
executable=self._executable,
command=command,
flags_dict=self._get_flags_dict(kwargs),
log=self.logger,
warn_error=self._warn_error,
ignore_handled_error=self._ignore_handled_error,
json_log_format=self._json_log_format,
capture_logs=self._capture_logs,
debug=self._debug,
):
if event.parsed_json_line is not None:
yield event.parsed_json_line
[docs] @public
def compile(
self,
models: Optional[Sequence[str]] = None,
exclude: Optional[Sequence[str]] = None,
select: Optional[Sequence[str]] = None,
**kwargs,
) -> DbtCliOutput:
"""Run the ``compile`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
models (List[str], optional): the models to include in compilation.
exclude (List[str]), optional): the models to exclude from compilation.
select (List[str], optional): the models to include in compilation.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli("compile", models=models, exclude=exclude, select=select, **kwargs)
[docs] @public
def run(
self,
models: Optional[Sequence[str]] = None,
exclude: Optional[Sequence[str]] = None,
select: Optional[Sequence[str]] = None,
**kwargs,
) -> DbtCliOutput:
"""Run the ``run`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
models (List[str], optional): the models to include in the run.
exclude (List[str]), optional): the models to exclude from the run.
select (List[str], optional): the models to include in the run.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli("run", models=models, exclude=exclude, select=select, **kwargs)
[docs] @public
def snapshot(
self,
select: Optional[Sequence[str]] = None,
exclude: Optional[Sequence[str]] = None,
**kwargs,
) -> DbtCliOutput:
"""Run the ``snapshot`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
select (List[str], optional): the snapshots to include in the run.
exclude (List[str], optional): the snapshots to exclude from the run.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli("snapshot", select=select, exclude=exclude, **kwargs)
[docs] @public
def test(
self,
models: Optional[Sequence[str]] = None,
exclude: Optional[Sequence[str]] = None,
data: bool = True,
schema: bool = True,
select: Optional[Sequence[str]] = None,
**kwargs,
) -> DbtCliOutput:
"""Run the ``test`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
models (List[str], optional): the models to include in testing.
exclude (List[str], optional): the models to exclude from testing.
data (bool, optional): If ``True`` (default), then run data tests.
schema (bool, optional): If ``True`` (default), then run schema tests.
select (List[str], optional): the models to include in testing.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
if data and schema:
# do not include these arguments if both are True, as these are deprecated in later
# versions of dbt, and for older versions the functionality is the same regardless of
# if both are set or neither are set.
return self.cli("test", models=models, exclude=exclude, select=select, **kwargs)
return self.cli(
"test",
models=models,
exclude=exclude,
data=data,
schema=schema,
select=select,
**kwargs,
)
[docs] @public
def seed(
self,
show: bool = False,
select: Optional[Sequence[str]] = None,
exclude: Optional[Sequence[str]] = None,
**kwargs,
) -> DbtCliOutput:
"""Run the ``seed`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
show (bool, optional): If ``True``, then show a sample of the seeded data in the
response. Defaults to ``False``.
select (List[str], optional): the snapshots to include in the run.
exclude (List[str], optional): the snapshots to exclude from the run.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli("seed", show=show, select=select, exclude=exclude, **kwargs)
[docs] @public
def ls(
self,
select: Optional[Sequence[str]] = None,
models: Optional[Sequence[str]] = None,
exclude: Optional[Sequence[str]] = None,
**kwargs,
) -> DbtCliOutput:
"""Run the ``ls`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
select (List[str], optional): the resources to include in the output.
models (List[str], optional): the models to include in the output.
exclude (List[str], optional): the resources to exclude from the output.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli("ls", select=select, models=models, exclude=exclude, **kwargs)
[docs] @public
def build(self, select: Optional[Sequence[str]] = None, **kwargs) -> DbtCliOutput:
"""Run the ``build`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
select (List[str], optional): the models/resources to include in the run.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli("build", select=select, **kwargs)
[docs] @public
def freshness(self, select: Optional[Sequence[str]] = None, **kwargs) -> DbtCliOutput:
"""Run the ``source snapshot-freshness`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
select (List[str], optional): the sources to include in the run.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli("source snapshot-freshness", select=select, **kwargs)
[docs] @public
def generate_docs(self, compile_project: bool = False, **kwargs) -> DbtCliOutput:
"""Run the ``docs generate`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
compile_project (bool, optional): If true, compile the project before generating a catalog.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli("docs generate", compile=compile_project, **kwargs)
[docs] @public
def run_operation(
self, macro: str, args: Optional[Mapping[str, Any]] = None, **kwargs
) -> DbtCliOutput:
"""Run the ``run-operation`` command on a dbt project. kwargs are passed in as additional parameters.
Args:
macro (str): the dbt macro to invoke.
args (Dict[str, Any], optional): the keyword arguments to be supplied to the macro.
Returns:
DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing
parsed log output as well as the contents of run_results.json (if applicable).
"""
return self.cli(f"run-operation {macro}", args=args, **kwargs)
[docs] @public
def get_run_results_json(self, **kwargs) -> Optional[Mapping[str, Any]]:
"""Get a parsed version of the run_results.json file for the relevant dbt project.
Returns:
Dict[str, Any]: dictionary containing the parsed contents of the manifest json file
for this dbt project.
"""
project_dir = kwargs.get("project_dir", self.default_flags["project-dir"])
target_path = kwargs.get("target_path", self._target_path)
return parse_run_results(project_dir, target_path)
[docs] @public
def remove_run_results_json(self, **kwargs):
"""Remove the run_results.json file from previous runs (if it exists)."""
project_dir = kwargs.get("project_dir", self.default_flags["project-dir"])
target_path = kwargs.get("target_path", self._target_path)
remove_run_results(project_dir, target_path)
[docs] @public
def get_manifest_json(self, **kwargs) -> Optional[Mapping[str, Any]]:
"""Get a parsed version of the manifest.json file for the relevant dbt project.
Returns:
Dict[str, Any]: dictionary containing the parsed contents of the manifest json file
for this dbt project.
"""
project_dir = kwargs.get("project_dir", self.default_flags["project-dir"])
target_path = kwargs.get("target_path", self._target_path)
return parse_manifest(project_dir, target_path)
[docs]@resource(
config_schema=Permissive(
{
k.replace("-", "_"): v
for k, v in dict(
**CLI_COMMON_FLAGS_CONFIG_SCHEMA, **CLI_COMMON_OPTIONS_CONFIG_SCHEMA
).items()
}
)
)
def dbt_cli_resource(context) -> DbtCliResource:
"""This resource issues dbt CLI commands against a configured dbt project."""
# set of options in the config schema that are not flags
non_flag_options = {k.replace("-", "_") for k in CLI_COMMON_OPTIONS_CONFIG_SCHEMA}
# all config options that are intended to be used as flags for dbt commands
default_flags = {k: v for k, v in context.resource_config.items() if k not in non_flag_options}
return DbtCliResource(
executable=context.resource_config["dbt_executable"],
default_flags=default_flags,
warn_error=context.resource_config["warn_error"],
ignore_handled_error=context.resource_config["ignore_handled_error"],
target_path=context.resource_config["target_path"],
logger=context.log,
docs_url=context.resource_config.get("docs_url"),
capture_logs=context.resource_config["capture_logs"],
json_log_format=context.resource_config["json_log_format"],
debug=context.resource_config["debug"],
)