Source code for dagster_dbt.cli.resources

from typing import Any, Iterator, Mapping, Optional, Sequence, Set

import dagster._check as check
from dagster import Permissive, resource
from dagster._annotations import public
from dagster._utils.merger import merge_dicts

from ..dbt_resource import DbtResource
from .constants import CLI_COMMON_FLAGS_CONFIG_SCHEMA, CLI_COMMON_OPTIONS_CONFIG_SCHEMA
from .types import DbtCliOutput
from .utils import (
    execute_cli,
    execute_cli_stream,
    parse_manifest,
    parse_run_results,
    remove_run_results,
)


[docs]class DbtCliResource(DbtResource): """A resource that allows you to execute dbt cli commands. For the most up-to-date documentation on the specific parameters available to you for each command, check out the dbt docs: https://docs.getdbt.com/reference/commands/run To use this as a dagster resource, we recommend using :func:`dbt_cli_resource <dagster_dbt.dbt_cli_resource>`. """ def __init__( self, executable: str, default_flags: Mapping[str, Any], warn_error: bool, ignore_handled_error: bool, target_path: str, logger: Optional[Any] = None, docs_url: Optional[str] = None, json_log_format: bool = True, capture_logs: bool = True, debug: bool = False, ): self._default_flags = default_flags self._executable = executable self._warn_error = warn_error self._ignore_handled_error = ignore_handled_error self._target_path = target_path self._docs_url = docs_url self._json_log_format = json_log_format self._capture_logs = capture_logs self._debug = debug super().__init__(logger) @property def default_flags(self) -> Mapping[str, Any]: """A set of params populated from resource config that are passed as flags to each dbt CLI command. """ return self._format_params(self._default_flags, replace_underscores=True) @property def strict_flags(self) -> Set[str]: """A set of flags that should not be auto-populated from the default flags unless they are arguments to the associated function. """ return {"models", "exclude", "select"} def _get_flags_dict(self, kwargs) -> Mapping[str, Any]: extra_flags = {} if kwargs is None else kwargs # remove default flags that are declared as "strict" and not explicitly passed in default_flags = { k: v for k, v in self.default_flags.items() if not (k in self.strict_flags and k not in extra_flags) } return merge_dicts( default_flags, self._format_params(extra_flags, replace_underscores=True) )
[docs] @public def cli(self, command: str, **kwargs) -> DbtCliOutput: """Executes a dbt CLI command. Params passed in as keyword arguments will be merged with the default flags that were configured on resource initialization (if any) overriding the default values if necessary. Args: command (str): The command you wish to run (e.g. 'run', 'test', 'docs generate', etc.) Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ command = check.str_param(command, "command") return execute_cli( executable=self._executable, command=command, flags_dict=self._get_flags_dict(kwargs), log=self.logger, warn_error=self._warn_error, ignore_handled_error=self._ignore_handled_error, target_path=self._target_path, docs_url=self._docs_url, json_log_format=self._json_log_format, capture_logs=self._capture_logs, debug=self._debug, )
def cli_stream_json(self, command: str, **kwargs) -> Iterator[Mapping[str, Any]]: """Executes a dbt CLI command. Params passed in as keyword arguments will be merged with the default flags that were configured on resource initialization (if any) overriding the default values if necessary. Args: command (str): The command you wish to run (e.g. 'run', 'test', 'docs generate', etc.) """ check.invariant(self._json_log_format, "Cannot stream JSON if json_log_format is False.") for event in execute_cli_stream( executable=self._executable, command=command, flags_dict=self._get_flags_dict(kwargs), log=self.logger, warn_error=self._warn_error, ignore_handled_error=self._ignore_handled_error, json_log_format=self._json_log_format, capture_logs=self._capture_logs, debug=self._debug, ): if event.parsed_json_line is not None: yield event.parsed_json_line
[docs] @public def compile( self, models: Optional[Sequence[str]] = None, exclude: Optional[Sequence[str]] = None, select: Optional[Sequence[str]] = None, **kwargs, ) -> DbtCliOutput: """Run the ``compile`` command on a dbt project. kwargs are passed in as additional parameters. Args: models (List[str], optional): the models to include in compilation. exclude (List[str]), optional): the models to exclude from compilation. select (List[str], optional): the models to include in compilation. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli("compile", models=models, exclude=exclude, select=select, **kwargs)
[docs] @public def run( self, models: Optional[Sequence[str]] = None, exclude: Optional[Sequence[str]] = None, select: Optional[Sequence[str]] = None, **kwargs, ) -> DbtCliOutput: """Run the ``run`` command on a dbt project. kwargs are passed in as additional parameters. Args: models (List[str], optional): the models to include in the run. exclude (List[str]), optional): the models to exclude from the run. select (List[str], optional): the models to include in the run. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli("run", models=models, exclude=exclude, select=select, **kwargs)
[docs] @public def snapshot( self, select: Optional[Sequence[str]] = None, exclude: Optional[Sequence[str]] = None, **kwargs, ) -> DbtCliOutput: """Run the ``snapshot`` command on a dbt project. kwargs are passed in as additional parameters. Args: select (List[str], optional): the snapshots to include in the run. exclude (List[str], optional): the snapshots to exclude from the run. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli("snapshot", select=select, exclude=exclude, **kwargs)
[docs] @public def test( self, models: Optional[Sequence[str]] = None, exclude: Optional[Sequence[str]] = None, data: bool = True, schema: bool = True, select: Optional[Sequence[str]] = None, **kwargs, ) -> DbtCliOutput: """Run the ``test`` command on a dbt project. kwargs are passed in as additional parameters. Args: models (List[str], optional): the models to include in testing. exclude (List[str], optional): the models to exclude from testing. data (bool, optional): If ``True`` (default), then run data tests. schema (bool, optional): If ``True`` (default), then run schema tests. select (List[str], optional): the models to include in testing. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ if data and schema: # do not include these arguments if both are True, as these are deprecated in later # versions of dbt, and for older versions the functionality is the same regardless of # if both are set or neither are set. return self.cli("test", models=models, exclude=exclude, select=select, **kwargs) return self.cli( "test", models=models, exclude=exclude, data=data, schema=schema, select=select, **kwargs, )
[docs] @public def seed( self, show: bool = False, select: Optional[Sequence[str]] = None, exclude: Optional[Sequence[str]] = None, **kwargs, ) -> DbtCliOutput: """Run the ``seed`` command on a dbt project. kwargs are passed in as additional parameters. Args: show (bool, optional): If ``True``, then show a sample of the seeded data in the response. Defaults to ``False``. select (List[str], optional): the snapshots to include in the run. exclude (List[str], optional): the snapshots to exclude from the run. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli("seed", show=show, select=select, exclude=exclude, **kwargs)
[docs] @public def ls( self, select: Optional[Sequence[str]] = None, models: Optional[Sequence[str]] = None, exclude: Optional[Sequence[str]] = None, **kwargs, ) -> DbtCliOutput: """Run the ``ls`` command on a dbt project. kwargs are passed in as additional parameters. Args: select (List[str], optional): the resources to include in the output. models (List[str], optional): the models to include in the output. exclude (List[str], optional): the resources to exclude from the output. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli("ls", select=select, models=models, exclude=exclude, **kwargs)
[docs] @public def build(self, select: Optional[Sequence[str]] = None, **kwargs) -> DbtCliOutput: """Run the ``build`` command on a dbt project. kwargs are passed in as additional parameters. Args: select (List[str], optional): the models/resources to include in the run. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli("build", select=select, **kwargs)
[docs] @public def freshness(self, select: Optional[Sequence[str]] = None, **kwargs) -> DbtCliOutput: """Run the ``source snapshot-freshness`` command on a dbt project. kwargs are passed in as additional parameters. Args: select (List[str], optional): the sources to include in the run. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli("source snapshot-freshness", select=select, **kwargs)
[docs] @public def generate_docs(self, compile_project: bool = False, **kwargs) -> DbtCliOutput: """Run the ``docs generate`` command on a dbt project. kwargs are passed in as additional parameters. Args: compile_project (bool, optional): If true, compile the project before generating a catalog. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli("docs generate", compile=compile_project, **kwargs)
[docs] @public def run_operation( self, macro: str, args: Optional[Mapping[str, Any]] = None, **kwargs ) -> DbtCliOutput: """Run the ``run-operation`` command on a dbt project. kwargs are passed in as additional parameters. Args: macro (str): the dbt macro to invoke. args (Dict[str, Any], optional): the keyword arguments to be supplied to the macro. Returns: DbtCliOutput: An instance of :class:`DbtCliOutput<dagster_dbt.DbtCliOutput>` containing parsed log output as well as the contents of run_results.json (if applicable). """ return self.cli(f"run-operation {macro}", args=args, **kwargs)
[docs] @public def get_run_results_json(self, **kwargs) -> Optional[Mapping[str, Any]]: """Get a parsed version of the run_results.json file for the relevant dbt project. Returns: Dict[str, Any]: dictionary containing the parsed contents of the manifest json file for this dbt project. """ project_dir = kwargs.get("project_dir", self.default_flags["project-dir"]) target_path = kwargs.get("target_path", self._target_path) return parse_run_results(project_dir, target_path)
[docs] @public def remove_run_results_json(self, **kwargs): """Remove the run_results.json file from previous runs (if it exists).""" project_dir = kwargs.get("project_dir", self.default_flags["project-dir"]) target_path = kwargs.get("target_path", self._target_path) remove_run_results(project_dir, target_path)
[docs] @public def get_manifest_json(self, **kwargs) -> Optional[Mapping[str, Any]]: """Get a parsed version of the manifest.json file for the relevant dbt project. Returns: Dict[str, Any]: dictionary containing the parsed contents of the manifest json file for this dbt project. """ project_dir = kwargs.get("project_dir", self.default_flags["project-dir"]) target_path = kwargs.get("target_path", self._target_path) return parse_manifest(project_dir, target_path)
[docs]@resource( config_schema=Permissive( { k.replace("-", "_"): v for k, v in dict( **CLI_COMMON_FLAGS_CONFIG_SCHEMA, **CLI_COMMON_OPTIONS_CONFIG_SCHEMA ).items() } ) ) def dbt_cli_resource(context) -> DbtCliResource: """This resource issues dbt CLI commands against a configured dbt project.""" # set of options in the config schema that are not flags non_flag_options = {k.replace("-", "_") for k in CLI_COMMON_OPTIONS_CONFIG_SCHEMA} # all config options that are intended to be used as flags for dbt commands default_flags = {k: v for k, v in context.resource_config.items() if k not in non_flag_options} return DbtCliResource( executable=context.resource_config["dbt_executable"], default_flags=default_flags, warn_error=context.resource_config["warn_error"], ignore_handled_error=context.resource_config["ignore_handled_error"], target_path=context.resource_config["target_path"], logger=context.log, docs_url=context.resource_config.get("docs_url"), capture_logs=context.resource_config["capture_logs"], json_log_format=context.resource_config["json_log_format"], debug=context.resource_config["debug"], )