Skip to content

Can't read S3 remote logs when using gevent/eventlent webserver workers. #8212

@webster-chainalysis

Description

@webster-chainalysis

Hey everyone. I’ve upgraded to 1.10.9. It appears that the logging changes broke the functionality for reading S3 remote logs in the Web UI (writing is ok). In the change log it mentions that Airflow's logging mechanism has been refactored to uses Python’s builtin logging module:

[AIRFLOW-1611] Customize logging

I followed the directions in the changelog and created the following log config:

import os
import six

from airflow import AirflowException
from airflow.configuration import conf
from airflow.utils.file import mkdirs
from typing import Dict, Any

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()

FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()

LOG_FORMAT = conf.get('core', 'LOG_FORMAT')

COLORED_LOG_FORMAT = conf.get('core', 'COLORED_LOG_FORMAT')

COLORED_LOG = conf.getboolean('core', 'COLORED_CONSOLE_LOG')

COLORED_FORMATTER_CLASS = conf.get('core', 'COLORED_FORMATTER_CLASS')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
    conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')

FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')

PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

FORMATTER_CLASS_KEY = '()' if six.PY2 else 'class'


#
# Getting this from environment because the changelog for 1.10.9 says to set
# the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The
# `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
#
REMOTE_BASE_LOG_FOLDER = os.environ.get('AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
        'airflow_coloured': {
            'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
            FORMATTER_CLASS_KEY: COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter'
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
        's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}  # type: Dict[str, Any]

However, the task log reader is always defaulting to using the FileTaskHandler. This should not occur because I have the following settings in airflow.cfg:

remote_logging = True
remote_base_log_folder = s3://my-bucket-name
remote_log_conn_id = aws_default
task_log_reader = s3.task

The s3.task handler passed to the task_log_reader setting should be creating an instance of the S3TaskHandler class to read the task logs to from S3. This occurs when rendering the get_logs_with_metadata view in www/views.py.


Apache Airflow version: 1.10.9
Kubernetes version: 1.15

Environment:

  • Cloud provider or hardware configuration: AWS
  • OS (e.g. from /etc/os-release): python:3.7.6-buster docker image

What happened: Logs did not appear in the Airflow Web UI. The FileTaskHandler tries to fetch the file locally or from the worker on port 8793. However, the logs do not exist in either location since we are using the Kubernetes Executor. This produces the following errors messages:

*** Log file does not exist: /usr/local/airflow/logs/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log
*** Fetching from: http://MY_DAG_NAME-0dde5ff5a786437cb14234:8793/log/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='MY_DAG_NAME-0dde5ff5a786437cb14234', port=8793): Max retries exceeded with url: /log/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f708332fc90>: Failed to establish a new connection: [Errno -2] Name or service not known'))

What you expected to happen:

The logs should be rendered in the Web UI using the S3TaskHandler class.

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind:bugThis is a clearly a bugprovider:amazonAWS/Amazon - related issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions