Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,8 @@ airflow-*.err
airflow-*.out
airflow-*.log
airflow-*.pid

# mypy
.mypy_cache/
.dmypy.json
dmypy.json
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ jobs:
stage: pre-test
install: pip install flake8
script: flake8
- name: mypy
stage: pre-test
install: pip install mypy
script: mypy airflow
- name: Check license header
stage: pre-test
install: skip
Expand Down
2 changes: 1 addition & 1 deletion airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from airflow.exceptions import AirflowException

if settings.DAGS_FOLDER not in sys.path:
sys.path.append(settings.DAGS_FOLDER)
sys.path.append(settings.DAGS_FOLDER) # type: ignore

login = None

Expand Down
4 changes: 3 additions & 1 deletion airflow/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

from __future__ import print_function

from typing import Any

from airflow.exceptions import AirflowException
from airflow import configuration as conf
from importlib import import_module

from airflow.utils.log.logging_mixin import LoggingMixin

api_auth = None
api_auth = None # type: Any

log = LoggingMixin().log

Expand Down
2 changes: 1 addition & 1 deletion airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from airflow import configuration as conf

from flask import Response
from flask import _request_ctx_stack as stack
from flask import _request_ctx_stack as stack # type: ignore
from flask import make_response
from flask import request
from flask import g
Expand Down
22 changes: 15 additions & 7 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import argparse
from argparse import RawTextHelpFormatter
from builtins import input
from collections import namedtuple

from airflow.utils.timezone import parse as parsedate
import json
Expand All @@ -49,6 +48,7 @@
import psutil
import re
from urllib.parse import urlunparse
from typing import Any

import airflow
from airflow import api
Expand All @@ -69,7 +69,7 @@
from sqlalchemy.orm import exc

api.load_auth()
api_module = import_module(conf.get('cli', 'api_client'))
api_module = import_module(conf.get('cli', 'api_client')) # type: Any
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
auth=api.api_auth.client_auth)

Expand Down Expand Up @@ -1664,9 +1664,17 @@ def sync_perm(args):
dag.access_control)


Arg = namedtuple(
'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices', 'metavar'])
Arg.__new__.__defaults__ = (None, None, None, None, None, None, None)
class Arg(object):
Comment thread
ashb marked this conversation as resolved.
Outdated
def __init__(self, flags=None, help=None, action=None, default=None, nargs=None,
type=None, choices=None, metavar=None):
self.flags = flags
self.help = help
self.action = action
self.default = default
self.nargs = nargs
self.type = type
self.choices = choices
self.metavar = metavar


class CLIFactory(object):
Expand Down Expand Up @@ -2380,8 +2388,8 @@ def get_parser(cls, dag_parser=False):
continue
arg = cls.args[arg]
kwargs = {
f: getattr(arg, f)
for f in arg._fields if f != 'flags' and getattr(arg, f)}
f: v
for f, v in vars(arg).items() if f != 'flags' and v}
sp.add_argument(*arg.flags, **kwargs)
sp.set_defaults(func=sub['func'])
return parser
Expand Down
3 changes: 2 additions & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.

import os
from typing import Dict, Any
Comment thread
ashb marked this conversation as resolved.
Outdated

from airflow import configuration as conf
from airflow.utils.file import mkdirs
Expand Down Expand Up @@ -107,7 +108,7 @@
'handlers': ['console'],
'level': LOG_LEVEL,
}
}
} # type: Dict[str, Any]

DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers': {
Expand Down
10 changes: 7 additions & 3 deletions airflow/contrib/example_dags/example_gcs_to_bq_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
# specific language governing permissions and limitations
# under the License.

from typing import Any

import airflow
from airflow import models
from airflow.operators import bash_operator

gcs_to_bq = None # type: Any
try:
from airflow.contrib.operators import gcs_to_bq
except ImportError:
gcs_to_bq = None
from airflow import models
from airflow.operators import bash_operator
pass


if gcs_to_bq is not None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/example_dags/example_winrm_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow.models import DAG
from datetime import timedelta

from airflow.contrib.hooks import WinRMHook
from airflow.contrib.hooks.winrm_hook import WinRMHook
from airflow.contrib.operators.winrm_operator import WinRMOperator


Expand Down
6 changes: 1 addition & 5 deletions airflow/contrib/hooks/databricks_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@
from requests import exceptions as requests_exceptions
from requests.auth import AuthBase
from time import sleep

try:
from urllib import parse as urlparse
except ImportError:
import urlparse
from six.moves.urllib import parse as urlparse

RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/restart")
START_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/start")
Expand Down
3 changes: 1 addition & 2 deletions airflow/contrib/hooks/gcp_api_base_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def _get_field(self, f, default=None):
def project_id(self):
return self._get_field('project')

@staticmethod
def fallback_to_default_project_id(func):
"""
Decorator that provides fallback for Google Cloud Platform project id. If
Expand Down Expand Up @@ -186,8 +187,6 @@ def inner_wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)
return inner_wrapper

fallback_to_default_project_id = staticmethod(fallback_to_default_project_id)

def _get_project_id(self, project_id):
"""
In case project_id is None, overrides it with default project_id from
Expand Down
5 changes: 3 additions & 2 deletions airflow/contrib/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import json
import time
from typing import Tuple, Optional
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from datetime import datetime as dt
Expand Down Expand Up @@ -70,7 +71,7 @@ def delete_pod(self, pod):
raise

def run_pod(self, pod, startup_timeout=120, get_logs=True):
# type: (Pod, int, bool) -> (State, str)
# type: (Pod, int, bool) -> Tuple[State, Optional[str]]
"""
Launches the pod synchronously and waits for completion.
Args:
Expand All @@ -91,7 +92,7 @@ def run_pod(self, pod, startup_timeout=120, get_logs=True):
return self._monitor_pod(pod, get_logs)

def _monitor_pod(self, pod, get_logs):
# type: (Pod, bool) -> (State, str)
# type: (Pod, bool) -> Tuple[State, Optional[str]]

if get_logs:
logs = self._client.read_namespaced_pod_log(
Expand Down
4 changes: 3 additions & 1 deletion airflow/contrib/operators/adls_list_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from typing import Iterable

from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -46,7 +48,7 @@ class AzureDataLakeStorageListOperator(BaseOperator):
azure_data_lake_conn_id='azure_data_lake_default'
)
"""
template_fields = ('path',)
template_fields = ('path',) # type: Iterable[str]
ui_color = '#901dd2'

@apply_defaults
Expand Down
12 changes: 9 additions & 3 deletions airflow/contrib/operators/azure_container_instances_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
# specific language governing permissions and limitations
# under the License.

from collections import namedtuple
from time import sleep
from typing import Dict, Sequence
Comment thread
ashb marked this conversation as resolved.
Outdated

from airflow.contrib.hooks.azure_container_instance_hook import AzureContainerInstanceHook
from airflow.contrib.hooks.azure_container_registry_hook import AzureContainerRegistryHook
Expand All @@ -36,8 +38,13 @@
from msrestazure.azure_exceptions import CloudError


DEFAULT_ENVIRONMENT_VARIABLES = {}
DEFAULT_VOLUMES = []
Volume = namedtuple(
'Volume',
['conn_id', 'account_name', 'share_name', 'mount_path', 'read_only'],
)

DEFAULT_ENVIRONMENT_VARIABLES = {} # type: Dict[str, str]
DEFAULT_VOLUMES = [] # type: Sequence[Volume]
DEFAULT_MEMORY_IN_GB = 2.0
DEFAULT_CPU = 1.0

Expand Down Expand Up @@ -98,7 +105,6 @@ class AzureContainerInstancesOperator(BaseOperator):
"""

template_fields = ('name', 'environment_variables')
Comment thread
ashb marked this conversation as resolved.
Outdated
template_ext = tuple()

@apply_defaults
def __init__(self, ci_conn_id, registry_conn_id, resource_group, name, image, region,
Expand Down
3 changes: 2 additions & 1 deletion airflow/contrib/operators/gcp_bigtable_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from typing import Iterable
import google.api_core.exceptions

from airflow import AirflowException
Expand All @@ -33,7 +34,7 @@ class BigtableValidationMixin(object):
Common class for Cloud Bigtable operators for validating required fields.
"""

REQUIRED_ATTRIBUTES = []
REQUIRED_ATTRIBUTES = [] # type: Iterable[str]

def _validate_inputs(self):
for attr_name in self.REQUIRED_ATTRIBUTES:
Expand Down
4 changes: 3 additions & 1 deletion airflow/contrib/operators/gcs_list_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from typing import Iterable

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -58,7 +60,7 @@ class GoogleCloudStorageListOperator(BaseOperator):
google_cloud_storage_conn_id=google_cloud_conn_id
)
"""
template_fields = ('bucket', 'prefix', 'delimiter')
template_fields = ('bucket', 'prefix', 'delimiter') # type: Iterable[str]
ui_color = '#f0eee4'

@apply_defaults
Expand Down
8 changes: 2 additions & 6 deletions airflow/contrib/operators/jenkins_job_trigger_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@
import jenkins
from jenkins import JenkinsException
from requests import Request
import six
from six.moves.urllib.error import HTTPError, URLError

try:
basestring
except NameError:
basestring = str # For python3 compatibility


def jenkins_request_with_headers(jenkins_server, req):
"""
Expand Down Expand Up @@ -138,7 +134,7 @@ def build_job(self, jenkins_server):
"""
# Warning if the parameter is too long, the URL can be longer than
# the maximum allowed size
if self.parameters and isinstance(self.parameters, basestring):
if self.parameters and isinstance(self.parameters, six.string_types):
import ast
self.parameters = ast.literal_eval(self.parameters)

Expand Down
4 changes: 0 additions & 4 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
from airflow.contrib.kubernetes.volume import Volume # noqa
from airflow.contrib.kubernetes.secret import Secret # noqa

template_fields = ('templates_dict',)
template_ext = tuple()
ui_color = '#ffefeb'


class KubernetesPodOperator(BaseOperator):
"""
Expand Down
6 changes: 4 additions & 2 deletions airflow/contrib/operators/qubole_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from typing import Iterable

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.qubole_hook import QuboleHook, COMMAND_ARGS, HYPHEN_ARGS, \
Expand Down Expand Up @@ -141,9 +143,9 @@ class QuboleOperator(BaseOperator):
'extract_query', 'boundary_query', 'macros', 'name', 'parameters',
'dbtap_id', 'hive_table', 'db_table', 'split_column', 'note_id',
'db_update_keys', 'export_dir', 'partition_spec', 'qubole_conn_id',
'arguments', 'user_program_arguments', 'cluster_label')
'arguments', 'user_program_arguments', 'cluster_label') # type: Iterable[str]

template_ext = ('.txt',)
template_ext = ('.txt',) # type: Iterable[str]
ui_color = '#3064A1'
ui_fgcolor = '#fff'
qubole_hook_allowed_args_list = ['command_type', 'qubole_conn_id', 'fetch_logs']
Expand Down
4 changes: 3 additions & 1 deletion airflow/contrib/operators/s3_list_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from typing import Iterable

from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -64,7 +66,7 @@ class S3ListOperator(BaseOperator):
aws_conn_id='aws_customers_conn'
)
"""
template_fields = ('bucket', 'prefix', 'delimiter')
template_fields = ('bucket', 'prefix', 'delimiter') # type: Iterable[str]
ui_color = '#ffd700'

@apply_defaults
Expand Down
4 changes: 3 additions & 1 deletion airflow/contrib/operators/sagemaker_base_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import json

from typing import Iterable

from airflow.contrib.hooks.sagemaker_hook import SageMakerHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
Expand All @@ -38,7 +40,7 @@ class SageMakerBaseOperator(BaseOperator):
template_ext = ()
ui_color = '#ededed'

integer_fields = []
integer_fields = [] # type: Iterable[Iterable[str]]

@apply_defaults
def __init__(self,
Expand Down
Loading