Source code for dagster._utils.log

import copy
import logging
import sys
import traceback
from contextlib import contextmanager
from typing import Mapping, NamedTuple, Optional

import coloredlogs
import pendulum

import dagster._check as check
import dagster._seven as seven
from dagster._config import Enum, EnumValue
from dagster._core.definitions.logger_definition import logger
from dagster._core.utils import PYTHON_LOGGING_LEVELS_MAPPING, coerce_valid_log_level

LogLevelEnum = Enum("log_level", list(map(EnumValue, PYTHON_LOGGING_LEVELS_MAPPING.keys())))


class JsonFileHandler(logging.Handler):
    def __init__(self, json_path: str):
        super(JsonFileHandler, self).__init__()
        self.json_path = check.str_param(json_path, "json_path")

    def emit(self, record: logging.LogRecord) -> None:
        try:
            log_dict = copy.copy(record.__dict__)

            # This horrific monstrosity is to maintain backwards compatability
            # with the old behavior of the JsonFileHandler, which the clarify
            # project has a dependency on. It relied on the dagster-defined
            # properties smashing all the properties of the LogRecord object
            # and uploads all of those properties to a redshift table for
            # in order to do analytics on the log

            if "dagster_meta" in log_dict:
                dagster_meta_dict = log_dict["dagster_meta"]
                del log_dict["dagster_meta"]
            else:
                dagster_meta_dict = {}

            log_dict.update(dagster_meta_dict)

            with open(self.json_path, "a", encoding="utf8") as ff:
                text_line = seven.json.dumps(log_dict)
                ff.write(text_line + "\n")
        # Need to catch Exception here, so disabling lint
        except Exception as e:
            logging.critical("[%s] Error during logging!", self.__class__.__name__)
            logging.exception(str(e))


class StructuredLoggerMessage(
    NamedTuple(
        "_StructuredLoggerMessage",
        [
            ("name", str),
            ("message", str),
            ("level", int),
            ("meta", Mapping[object, object]),
            ("record", logging.LogRecord),
        ],
    )
):
    def __new__(
        cls,
        name: str,
        message: str,
        level: int,
        meta: Mapping[object, object],
        record: logging.LogRecord,
    ):
        return super(StructuredLoggerMessage, cls).__new__(
            cls,
            check.str_param(name, "name"),
            check.str_param(message, "message"),
            coerce_valid_log_level(level),
            check.mapping_param(meta, "meta"),
            check.inst_param(record, "record", logging.LogRecord),
        )


class JsonEventLoggerHandler(logging.Handler):
    def __init__(self, json_path: str, construct_event_record):
        super(JsonEventLoggerHandler, self).__init__()
        self.json_path = check.str_param(json_path, "json_path")
        self.construct_event_record = construct_event_record

    def emit(self, record: logging.LogRecord) -> None:
        try:
            event_record = self.construct_event_record(record)
            with open(self.json_path, "a", encoding="utf8") as ff:
                text_line = seven.json.dumps(event_record.to_dict())
                ff.write(text_line + "\n")

        # Need to catch Exception here, so disabling lint
        except Exception as e:
            logging.critical("[%s] Error during logging!", self.__class__.__name__)
            logging.exception(str(e))


class StructuredLoggerHandler(logging.Handler):
    def __init__(self, callback):
        super(StructuredLoggerHandler, self).__init__()
        self.callback = check.is_callable(callback, "callback")

    def emit(self, record: logging.LogRecord) -> None:
        try:
            self.callback(
                StructuredLoggerMessage(
                    name=record.name,
                    message=record.msg,
                    level=record.levelno,
                    meta=record.dagster_meta,  # type: ignore
                    record=record,
                )
            )
        # Need to catch Exception here, so disabling lint
        except Exception as e:
            logging.critical("[%s] Error during logging!", self.__class__.__name__)
            logging.exception(str(e))


def construct_single_handler_logger(name, level, handler):
    check.str_param(name, "name")
    check.inst_param(handler, "handler", logging.Handler)

    level = coerce_valid_log_level(level)

    @logger
    def single_handler_logger(_init_context):
        klass = logging.getLoggerClass()
        logger_ = klass(name, level=level)
        logger_.addHandler(handler)
        handler.setLevel(level)
        return logger_

    return single_handler_logger


# Base python logger whose messages will be captured as structured Dagster log messages.
BASE_DAGSTER_LOGGER = logging.getLogger(name="dagster")


[docs]def get_dagster_logger(name: Optional[str] = None) -> logging.Logger: """Creates a python logger whose output messages will be captured and converted into Dagster log messages. This means they will have structured information such as the step_key, run_id, etc. embedded into them, and will show up in the Dagster event log. This can be used as a more convenient alternative to `context.log` in most cases. If log level is not set explicitly, defaults to DEBUG. Args: name (Optional[str]): If supplied, will create a logger with the name "dagster.builtin.{name}", with properties inherited from the base Dagster logger. If omitted, the returned logger will be named "dagster.builtin". Returns: :class:`logging.Logger`: A logger whose output will be captured by Dagster. Example: .. code-block:: python from dagster import get_dagster_logger, op @op def hello_op(): log = get_dagster_logger() for i in range(5): # do something log.info(f"Did {i+1} things!") """ # enforce that the parent logger will always have a DEBUG log level BASE_DAGSTER_LOGGER.setLevel(logging.DEBUG) base_builtin = BASE_DAGSTER_LOGGER.getChild("builtin") if name: return base_builtin.getChild(name) return base_builtin
def define_structured_logger(name, callback, level): check.str_param(name, "name") check.callable_param(callback, "callback") level = coerce_valid_log_level(level) return construct_single_handler_logger(name, level, StructuredLoggerHandler(callback)) def define_json_file_logger(name, json_path, level): check.str_param(name, "name") check.str_param(json_path, "json_path") level = coerce_valid_log_level(level) stream_handler = JsonFileHandler(json_path) stream_handler.setFormatter(define_default_formatter()) return construct_single_handler_logger(name, level, stream_handler) def get_stack_trace_array(exception): check.inst_param(exception, "exception", Exception) if hasattr(exception, "__traceback__"): tb = exception.__traceback__ else: _exc_type, _exc_value, tb = sys.exc_info() return traceback.format_tb(tb) def _mockable_formatTime(record, datefmt=None): """Uses pendulum.now to determine the logging time, causing pendulum mocking to affect the logger timestamp in tests. """ return pendulum.now().strftime(datefmt if datefmt else default_date_format_string()) def default_format_string(): return "%(asctime)s - %(name)s - %(levelname)s - %(message)s" def default_date_format_string(): return "%Y-%m-%d %H:%M:%S %z" def define_default_formatter(): return logging.Formatter(default_format_string(), default_date_format_string()) @contextmanager def quieten(quiet=True, level=logging.WARNING): if quiet: logging.disable(level) try: yield finally: if quiet: logging.disable(logging.NOTSET) def configure_loggers(handler="default", log_level="INFO"): LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "colored": { "()": coloredlogs.ColoredFormatter, "fmt": default_format_string(), "datefmt": default_date_format_string(), "field_styles": {"levelname": {"color": "blue"}, "asctime": {"color": "green"}}, "level_styles": {"debug": {}, "error": {"color": "red"}}, }, }, "handlers": { "default": { "formatter": "colored", "class": "logging.StreamHandler", "stream": sys.stdout, "level": log_level, }, "null": { "class": "logging.NullHandler", }, }, "loggers": { "dagster": { "handlers": [handler], "level": "INFO", }, "dagit": { "handlers": [handler], "level": "INFO", }, }, } logging.config.dictConfig(LOGGING_CONFIG) if handler == "default": for name in ["dagster", "dagit"]: logging.getLogger(name).handlers[0].formatter.formatTime = _mockable_formatTime def create_console_logger(name, level): klass = logging.getLoggerClass() handler = klass(name, level=level) coloredlogs.install( logger=handler, level=level, fmt=default_format_string(), datefmt=default_date_format_string(), field_styles={"levelname": {"color": "blue"}, "asctime": {"color": "green"}}, level_styles={"debug": {}, "error": {"color": "red"}}, ) return handler