使用Airflow开发大数据生产任务

Use airflow to develop big data task

在大数据领域,任务调度和工作流管理是核心需求。无论是数据的抽取、转化、加载(ETL),还是数据分析任务,管理复杂的任务依赖性、监控任务执行情况并确保任务按时完成都十分重要。Apache Airflow 是一个开源的工作流管理平台,能够帮助你解决这些问题。

问题和解决方案

在处理大数据任务时,我们常常需要一个灵活且强大的工具来管理复杂的工作流。理想的解决方案应该具备以下特点:

  • 开源和通用:可以适应各种不同的数据任务场景。
  • 水平扩展:能够随着数据量的增加轻松扩展。
  • 强大的调度和监控功能:能够在任务失败时自动重试,并提供清晰的任务执行日志。

Apache Airflow 满足了这些需求,并且由于其广泛的社区支持和丰富的插件生态系统,已经成为了大数据任务调度的标准选择之一。

Apache Airflow 简介

Apache Airflow 是一个开源的工作流管理平台,使用 Python 代码来定义任务和依赖关系。Airflow 可以通过调度和监控任务,帮助开发者管理和自动化复杂的工作流。

Airflow 的主要特点包括:

  • 动态的任务调度:可以根据任务的执行结果和状态决定后续任务的执行。
  • 丰富的可视化工具:提供了强大的 web 界面,可以清晰地展示任务的依赖关系和执行状态。
  • 可扩展的架构:支持多种执行器(Executor),可以在单机或分布式环境中运行。

本地安装

首先,我们需要在本地环境中安装 Apache Airflow 以进行开发。以下是安装步骤:

conda create -n airflow
conda activate airflow
conda install pip
pip install google-re2==1.1 #解决安装包依赖问题
pip install apache-airflow
pip install pandas
pip install apache-airflow-providers-dingding

这些步骤将创建一个新的 Conda 环境并安装 Airflow 及其依赖项。确保你使用的是兼容的 Python 版本,以避免兼容性问题。

启动本地实例

安装完成后,可以通过以下命令启动本地 Airflow 实例:

环境变量定义,注意在苹果M1上需要设置NO_PROXY="*" https://github.com/apache/airflow/discussions/24463

cat .env

PYTHONWARNINGS=ignore::DeprecationWarning:cgitb
NO_PROXY="*"
AIRFLOW__CORE__DAGS_FOLDER=./
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Asia/Shanghai
#AIRFLOW__CORE__HOSTNAME_CALLABLE=airflow.utils.net.get_host_ip_address
AIRFLOW__CORE__HOSTNAME_CALLABLE=airflow.utils.net.getfqdn

启动

conda activate airflow
export $(grep -v '^#' .env | xargs)
airflow standalone

这将启动一个本地的 Airflow 实例,并自动配置数据库、用户、和默认任务队列。你可以通过日志文件或终端中的输出信息来确认 Airflow 实例是否成功启动。

创建管理员账户

启动 Airflow 实例后,接下来需要创建一个管理员账户来访问 Airflow 的 Web 界面:

airflow users create --role Admin --username airflow --email hugozhu@gmail.com --firstname Hugo --lastname Zhu --password airflow

创建完管理员账户后,可以使用该账户登录 Airflow 的 Web 界面。

登录 Airflow Web 界面

打开浏览器,访问以下地址:

http://localhost:8080

使用刚才创建的账户信息(默认用户名和密码为 airflow:airflow)登录。

登录后,你将看到 Airflow 的 Web 界面。在这个界面中,你可以管理 DAG(有向无环图)、查看任务执行状态、调试失败的任务以及设置告警和通知。

编写第一个 DAG

在 Airflow 中,工作流是通过 DAG(Directed Acyclic Graph,有向无环图)定义的。每个 DAG 是由 Python 脚本定义的,它描述了任务及其依赖关系。

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# 定义 DAG
with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 8, 24),
    schedule_interval='@daily',
    catchup=False
) as dag:

    # 定义任务
    task1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    task2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
    )

    task3 = BashOperator(
        task_id='print_hello',
        bash_command='echo "Hello World"',
    )

    # 设置任务依赖
    task1 >> task2 >> task3

这个简单的 DAG 示例创建了三个任务:打印日期、等待 5 秒、打印 “Hello World”。这些任务按顺序执行。

结论

Apache Airflow 是一个功能强大且灵活的工作流管理平台,非常适合用于大数据任务的调度和管理。通过本地安装、创建管理员账户和编写第一个 DAG,您已经掌握了 Airflow 的基础使用方法。

接下来,您可以探索 Airflow 的更多高级特性,例如使用不同的执行器(Executor)在分布式环境中运行任务、集成各种数据源和目标以及实现复杂的任务调度逻辑。Airflow 的强大功能将为您的大数据项目提供强有力的支持。