كتابة أول ملف DAG (Directed Acyclic Graph) في Airflow لأتمتة مهمة يومية
كتابة أول ملف DAG في Airflow لأتمتة مهمة يومية
عندما تبدأ خطوط البيانات بالنمو، يصبح تشغيل السكربتات يدوياً عبئاً تشغيلياً ومصدراً للأخطاء. هنا تظهر قيمة Apache Airflow كمنصة متقدمة لإدارة وجدولة سير العمل، خصوصاً في البيئات التي تعتمد على ETL اليومي، وتجهيز البيانات التحليلية، وتحديث الجداول المرحلية والمستودعات.
إذا كنت قد قرأت سابقاً مقدمة في Apache Airflow: الأداة الأقوى عالمياً لجدولة وإدارة سير عمل البيانات فهذه الخطوة التالية الطبيعية: الانتقال من الفهم النظري إلى بناء أول ملف DAG حقيقي ينفذ مهمة يومية بشكل منظم وقابل للمراقبة.
في هذا المقال سنبني ملفاً عملياً يقوم بقراءة بيانات يومية، تنفيذ خطوة تحقق، ثم تشغيل معالجة بسيطة وإصدار نتيجة نهائية. الهدف ليس فقط تشغيل كود، بل فهم كيف يفكر Airflow في التبعية الزمنية، وإعادة التشغيل، والاعتمادية بين المهام.
ما هو ملف DAG ولماذا يمثل قلب الأتمتة؟
مصطلح Directed Acyclic Graph يعني تمثيلاً بيانياً للمهام وترتيب تنفيذها دون حلقات دائرية. كل مهمة تمثل عقدة، وكل علاقة اعتماد تمثل سهماً يحدد ما الذي يجب أن يحدث أولاً وما الذي ينتظر ما قبله.
في هندسة البيانات، هذه الفكرة حاسمة لأن خطوات مثل Extract وTransform وLoad لا يمكن تنفيذها عشوائياً. وهي نفس الفلسفة التي تناولناها في بناء خطوط أنابيب البيانات (ETL – Extract, Transform, Load) باستخدام بايثون، لكن Airflow يضيف طبقة تشغيل مؤسسية فوق هذا المنطق.
في البيئات الإنتاجية، قيمة
DAGلا تكمن في الجدولة فقط، بل في جعل تدفق البيانات قابلاً للتدقيق، وإعادة التشغيل الجزئي، ومراقبة الفشل، وربط التنفيذ بالتاريخ المنطقي للبيانات وليس بوقت ضغط زر التشغيل.
المكونات الأساسية التي يجب فهمها قبل كتابة أول ملف
1. الكائن DAG
هذا الكائن يحدد هوية سير العمل: الاسم، تاريخ البداية، الجدولة، عدد المحاولات، وسلوك التنفيذ. كل ملف Python داخل مجلد dags يعرّف واحداً أو أكثر من كائنات DAG.
2. المهام Tasks
المهام هي وحدات التنفيذ الفعلية. يمكن أن تكون أمراً عبر BashOperator أو دالة بايثون عبر PythonOperator أو حتى مهام مخصصة لتشغيل استعلامات SQL أو أعمال على Spark.
3. الاعتماديات Dependencies
يتم تحديد ترتيب التنفيذ باستخدام معاملات مثل >> و<<. بهذه الطريقة يعرف النظام أن مهمة الفحص يجب أن تسبق التحويل، وأن التحويل يسبق الحفظ أو الإشعار.
سيناريو عملي: أتمتة مهمة يومية لمعالجة ملف مبيعات
سنفترض وجود ملف يومي بصيغة CSV يتم استلامه كل صباح. المطلوب هو:
- التحقق من وجود الملف.
- قراءة البيانات ومعالجة السجلات الفارغة.
- حساب إجمالي المبيعات اليومي.
- حفظ تقرير مختصر في مجلد مخرجات.
هذا السيناريو يعكس ممارسات شائعة في الشركات، خصوصاً عند بناء خطوات قريبة من موضوعات مثل استخراج البيانات (Extract): سحب ملايين السجلات من واجهات API وقواعد بيانات SQL وتحويل البيانات (Transform): تنظيف وتشفير البيانات أثناء انتقالها آلياً.
الكود الكامل لأول ملف DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import os
import pandas as pd
INPUT_FILE = "/opt/airflow/data/daily_sales.csv"
OUTPUT_FILE = "/opt/airflow/data/daily_sales_summary.csv"
default_args = {
"owner": "data-team",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5)
}
def check_file_exists():
if not os.path.exists(INPUT_FILE):
raise FileNotFoundError(f"Input file not found: {INPUT_FILE}")
def process_sales_data():
df = pd.read_csv(INPUT_FILE)
df = df.dropna(subset=["product", "quantity", "price"])
df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce")
df["price"] = pd.to_numeric(df["price"], errors="coerce")
df = df.dropna(subset=["quantity", "price"])
df["total"] = df["quantity"] * df["price"]
summary = pd.DataFrame({
"run_date": [datetime.now().strftime("%Y-%m-%d")],
"records_count": [len(df)],
"total_sales": [df["total"].sum()]
})
summary.to_csv(OUTPUT_FILE, index=False)
def notify_completion():
print(f"Daily pipeline completed successfully. Output saved to: {OUTPUT_FILE}")
with DAG(
dag_id="daily_sales_pipeline",
default_args=default_args,
description="A simple daily ETL pipeline using Airflow",
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "sales", "daily"]
) as dag:
task_check_file = PythonOperator(
task_id="check_file_exists",
python_callable=check_file_exists
)
task_process_data = PythonOperator(
task_id="process_sales_data",
python_callable=process_sales_data
)
task_notify = PythonOperator(
task_id="notify_completion",
python_callable=notify_completion
)
task_check_file >> task_process_data >> task_notify
شرح الكود خطوة بخطوة
تعريف الإعدادات الافتراضية
القاموس default_args يحدد سلوكاً عاماً للمهام مثل عدد مرات إعادة المحاولة. هذا مهم عندما يفشل التنفيذ بسبب ملف متأخر أو انقطاع مؤقت في المصدر.
مهمة التحقق من الملف
الدالة check_file_exists تمثل طبقة تحقق مبكرة. بدلاً من اكتشاف المشكلة داخل مرحلة المعالجة بعد استهلاك وقت وموارد، يتم إيقاف التدفق مباشرة مع رسالة واضحة.
مهمة المعالجة
في process_sales_data نستخدم Pandas لقراءة الملف، حذف القيم الحرجة المفقودة، تحويل الحقول الرقمية، ثم حساب قيمة المبيعات. هذه الممارسات ترتبط مباشرة بمفاهيم تنظيف البيانات (Data Cleaning): اكتشاف ومعالجة القيم المفقودة (Missing Values) ومكتبة Pandas (1): قراءة واستدعاء البيانات من ملفات CSV و Excel برمجياً.
مهمة الإشعار
رغم أن المثال يكتفي بطباعة رسالة، إلا أن هذه المرحلة في البيئات الحقيقية قد ترسل بريداً إلكترونياً أو تنبيهاً إلى Slack أو تسجل مؤشرات تشغيل في نظام مراقبة مركزي.
كيف يقرأ Airflow هذا الملف تشغيلياً؟
عند وضع الملف داخل مجلد dags، يقوم Scheduler بتحليله دورياً واستخراج تعريف الـ DAG. بعد ذلك يقرر متى يجب إنشاء تشغيل جديد بناءً على schedule_interval وتاريخ البداية.
المهم هنا هو فهم أن التنفيذ مرتبط بما يسمى التاريخ المنطقي logical date وليس فقط وقت التشغيل الفعلي. هذه النقطة أساسية عند التعامل مع بيانات يومية متأخرة أو عند إعادة معالجة أيام سابقة.
إذا كنت تدير خطوط بيانات ضخمة، فمن الأفضل جعل كل مهمة صغيرة وواضحة المسؤولية بدلاً من وضع كل المنطق داخل دالة واحدة. هذا التصميم يحسن القابلية للصيانة، ويسهل عزل الفشل، ويمنحك مرونة أعلى عند التوسع إلى
Sparkأو قواعد بياناتNoSQL.
أفضل الممارسات عند بناء أول DAG
- استخدم أسماء واضحة للمهام مثل
extract_ordersوvalidate_schema. - لا تخلط بين منطق الجدولة ومنطق الأعمال داخل نفس الطبقة بشكل فوضوي.
- فعّل إعادة المحاولة فقط في الأخطاء المؤقتة، وليس في أخطاء التصميم أو تنسيق البيانات.
- اجعل المخرجات قابلة للتتبع عبر تواريخ تشغيل ثابتة بدلاً من الكتابة العشوائية فوق الملفات.
- ابدأ بمثال بسيط ثم وسّعه لاحقاً إلى تكاملات مثل
SQL Operatorsأو مهامPySpark.
متى تنتقل من Python البسيط إلى أتمتة مؤسسية كاملة؟
إذا كان لديك سكربت مجدول محلياً ويعمل مرة يومياً، فقد تكفيك أدوات بسيطة كما في أتمتة خطوط الـ ETL: الجدولة باستخدام مكتبة Schedule وتشغيلها في الخلفية. لكن عندما تظهر الحاجة إلى التبعيات، وسجلات التنفيذ، وإعادة المحاولة، والمراقبة المركزية، يصبح الانتقال إلى Airflow قراراً هندسياً منطقياً.
كما أن هذا الانتقال يصبح أكثر أهمية عند توسع المشروع ليشمل تحميل البيانات إلى مستودع تحليلي، وهي خطوة ترتبط عضوياً بمقال تحميل البيانات (Load): إدراج البيانات المعالجة في مستودعات البيانات (Data Warehouses).
الخلاصة
كتابة أول ملف DAG في Airflow ليست مجرد خطوة تعليمية، بل تأسيس حقيقي لعقلية الأتمتة في هندسة البيانات. أنت لا تكتب سكربتاً فقط، بل تبني تدفقاً قابلاً للجدولة، والمراقبة، والتوسع، والتعافي من الفشل.
ابدأ بملف بسيط واضح البنية، ثم حسّن التصميم تدريجياً بإضافة التحقق من الجودة، وفصل مراحل ETL، وربط المهام بمصادر ووجهات حقيقية. بهذه الطريقة يتحول Airflow من أداة جدولة إلى منصة تشغيل مركزية لعمليات البيانات اليومية.
1 comment