Saltar al contenido principal
Buenos Aires: Good Airflows

Buenos Aires: Good Airflows

Leandro GutierrezAlrededor de 4 minBigDataIngenieria de datosAirflow

Algunas de las funciones principales de un ingeniero de datos son el obtener, transformar y mover información de un punto a otro, sumado esto a la impactante cantidad de datos que son generadas en la actualidad, se torna imprecindible la estandarización en la definición, implementacion y administración de estos flujos de trabajo, o como solemos llamarles pipelines, compreden un set de tareas a ejecutarse en un cierto orden y con una determinada frecuencia.

Cuando se trata de agendar tareas programadas quizás lo primero que llega a nuestra mente es el famoso servicio Cron de los sistemas Unix, el cual periodicamente y de manera desatendida despierta para ejecutar las tareas que le hayan sido asignadas. Los tiempos modernos arribaron y las necesidades de facilitar el desarrollo, despliegue y monitoreo de nuestras flujos de trabajo arribaron con ellos. Hoy daremos un vistazo a Apache Airflow y sus conceptos clave.

Airflow

Airflow, es una plataforma open-source que permite desarrollar, agendar y monitorear tareas programadas, o como ellos mismos se denominan un orquestador de flujos de trabajo orientado a procesos batch. Desarrollado en Python, Airflow provee un framework para crear nuestras propios flujos de trabajo (Workflows), cuenta además con una amplia gama de integraciones con diversos sistemas, algunos oficiales y otros desarrollados por terceros, los cuales resuelven la mayoría de los casos de uso común.

Al ser un framework de programación cuenta con las siguientes ventajas:

  • Los Workflows pueden ser versionados y controlados con herramientas como Git, permitiendo trabajar de manera colaborativa y en simultaneo.
  • Se pueden escribir Tests para validaciones.
  • Los componentes son extensibles y podemos desarrollar nuestra propias implementaciones.
airflow-view.png
airflow-view.png

Arquitectura

Su arquitecutra distribuida comprende multiples componentes:

  • Scheduler: es el encargado de lanzar los workflows programados y de coordinar cada una de las tareas que se deben ejecutar.
  • WebUI: interfaz de monitoreo, lanzamineto y debugueo de nuestros DAGs.
  • Worker: es quien efectivamente realiza las tareas provistas por el Scheduler. En instalaciones básicas es parte del Scheduler.
  • DAGs folder: carpeta leida por el Scheduler para levantar y agendar las tareas.
  • Metadata database: en ella Airflow almacena los estados de las ejecuciones y otros metadatos.
airflow-arch.png
airflow-arch.png

Existen otros componentes opcionales que pueden ser habilitados para mejorar escalabilidad y performance del stack:

  • Triggerer
  • Dag processor
  • Plugins folder

componentes opcionalesopen in new window

Workflow

El concepto Workflow es modelado como un DAG (Grafo Asiclico Direccionado), es decir no forma bucles y su final de ejecución está garantizado. Cada nodo del grafo se representa una tarea a realizar. Estos Tasks pueden ser componentes estandares, llamados Operators, como un HttpApiCliente o BashExecutor; o bien pueden ser funciones Python definidas por el usuario, con la flexibilidad casi infinita que eso implica.

Un DAG define las dependencias entre nuestros Tasks y la secuencia de ejecución de los mismos. Mientras que un Task define que se está haciendo.

airflow-dag.png
airflow-dag.png

Declarando nuestro DAG

Existen múltiples maneras de declarar nuestro DAG:

  • Utilizando el Context Manager, el cual inyectará el DAG a cualquiera de las tareas definidas dentro del contexto:
import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="dag_id",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
):
    EmptyOperator(task_id="task")
  • Inyectando el DAG en cada instanciación de una tarea mediante el Constructor standard :
import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

my_dag = DAG(
    dag_id="dag_id",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
  • Utilizando el @dag decorator, el cual convierte una funcion en un generador de DAGs:
import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

Tasks

Asimismo existen tres formas de definir un Task:

  • Operators: casos de uso común que se standarizaron para facilitar nuestro trabajo. Muchos de ellos vienen integrados, otros son desarrollados por terceras partes y deben ser instalados como dependencias. Ejemplos:
    • HttpOperator
    • MySqlOperator
    • PostgresOperator
    • SlackAPIOperator
  • Sensors: son una subclase especial de Operators que funcionan como hooks asícronos.
  • Taskflow API: funciones Python decoradas con @Task decorator

Tras bambalinas todas son subclases de BaseOperator, por lo tanto los conceptos de Task y Operator son intercambiables.

Por ejemplo si quisieramos checkear el estado de una API podriamos utilizar un SimpleHttpOperator, componente base distribuido por la plataforma, o podemos desarrollar nuestra propia función en Python en la cual hacemos uso del paquete requests, ambos caminos son validos.

with DAG(
    dag_id='check_api',
    start_date=datetime.datetime(2021, 1, 1),
    schedule_interval='@daily',
    max_active_runs=1,
    catchup=False
) as dag:
    task_http_sensor_check = SimpleHttpOperator(
        task_id="http_sensor_check",
        http_conn_id="api-conn",
        endpoint="/ping",
        method="GET",
        dag=dag,
    )

    task_http_sensor_check
@dag(
    dag_id='check_api',
    start_date=datetime.datetime(2021, 1, 1),
    schedule_interval='@daily',
    max_active_runs=1,
    catchup=False
)
def check_api():
    @task(retries=3, retry_delay=timedelta(seconds=10), retry_exponential_backoff=True)
    def check_api_task():
        conn = Connection.get_connection_from_secrets("api-conn")
        endpoint = '/ping'

        url = conn.get_uri() + endpoint

        r = requests.get(url)
        if r.ok:
            msg = 'Success'
            logging.info(msg)
            return r.text
        else:
            msg = 'Error'
            logging.warning(msg)
            raise AirflowException(msg)

    check_api_task()


check_api()

Control de flujo

El flujo de ejecución de nuestro DAG se definen mediante el seteo de dependencias upstreams y downstrems. Conviven hoy dos maneras de definir estas dependencias, la primera mediante el uso de los operadores >> y <<, y la segunda con los metodos set_upstream y set_downstream.

first_task >> [second_task, third_task]
third_task << fourth_task

O bien:

first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

Estas dependencias representan las aristas en nuestros grafos y definen el orden en que Airflow ejecutará las tareas. Por defecto un Task esperará que todos sus upstreams se ejecuten correctamente antes de correr (este comportamiento puede modificarse según necesitemos).

Un punto importante en Airflow es que no es una plataforma de ETL, sino como mencionamos anteriormente es un gestor de cargas de trabajo, por lo que no está diseñado para intercambiar grandes volumenes de datos entre tareas. Esto debe ser tenido en cuenta al momento de programar nuestros DAGs.

Para intercambiar información entre nuestras Tasks se emplean tres metodos:

  • XComs (Cross-communications): mecanismo por el cual nuestras tareas envian y reciben pequeños bloques de información.
  • Servicios de almacenamiento externo: es la mejor manera de intercambiar grandes volumenes de información, cada tarea debe encargarse de pullear y pushear la información que desea procesar o transferir.
  • TaskFlow: la API automaticamente inyecta la salida de nuestro Task a la siguiente etapa, haciendo uso de XComs implícitos. De nuevo no es recomendado para grandes volumenes de datos.

Casos de uso

Airflow está pensado para tareas batch asíncronas Los casos de uso mas común son:

  • Checkeos de tipo keep-alive
  • Controles de calidad de datos
  • Pre agregaciones
  • Ejecutar acciones remotas contra un servicio

Conclusiones

Como hemos visto Airflow nos provee de un framework completo para diseñar e implementar nuestros pipelines de trabajo. Su amplia variedad de Operadores resuelven una gran cantidad de los casos de uso comun y su Taskflow API nos brinda la mayor de las libertades al momento de programar nuestras tareas. Admite un abanico considerable de conectores a diferentes tecnologías, algunos propios y otros desarrollados por terceros, pero vastamente avalados por la comunidad. Su interfaz gráfica es sencilla de interpretar y permitiendonos gestionar rapidamente nuestros trabajos programados. Y su arquitectura distribuida es facilmente escalable según nuestras necesidades.

Denle una mirada y jueguen con la tecnología que seguramente les resultará por demás útil:

install Airflowopen in new window