مشروع مصغر: بناء مسار ETL يستخرج بيانات من API، يعالجها بـ PySpark، ويحفظها

دقائق القراءة: 6

مشروع مصغر: بناء مسار ETL يستخرج بيانات من API، يعالجها بـ PySpark، ويحفظها

يمثل هذا المشروع خطوة عملية متقدمة تجمع بين مفاهيم بناء خطوط أنابيب البيانات (ETL – Extract, Transform, Load) باستخدام بايثون وبين قدرات المعالجة الموزعة التي يقدمها Apache Spark. الفكرة الأساسية هي سحب بيانات خام من واجهة خارجية، ثم تحويلها إلى بنية قابلة للتحليل، وأخيراً تخزينها بصيغة مناسبة للاستهلاك التحليلي أو التشغيلي.

على الرغم من أن المثال هنا مصغر، إلا أن تصميمه يعكس نمطاً تستخدمه الشركات فعلياً عند بناء أنظمة تجمع بيانات العملاء أو الطلبات أو الأحداث الرقمية من خدمات خارجية. لذلك ستتعرف ليس فقط على كتابة الكود، بل على المنطق المعماري الذي يجعل خط البيانات مستقراً وقابلاً للتوسع.

فكرة المشروع والبنية العامة

سنفترض وجود API يعيد بيانات بصيغة JSON حول المستخدمين أو الطلبات. في مرحلة الاستخراج سنسحب البيانات عبر طلبات HTTP، ثم نحول الاستجابة إلى قائمة سجلات أولية.

بعد ذلك نمرر البيانات إلى بيئة PySpark لإنشاء DataFrame موزع. هنا تظهر قوة Spark في التنظيف، التصفية، توحيد الأنواع، وإجراء اشتقاقات جاهزة للتخزين.

أما مرحلة التحميل فسنحفظ النتائج في ملفات Parquet، لأنها فعالة جداً في ضغط البيانات وتسريع القراءة اللاحقة، خاصة إذا كانت الوجهة النهائية مستودع بيانات أو طبقة تحليلية.

اختيار صيغة Parquet بدلاً من CSV ليس قراراً شكلياً؛ فهو قرار معماري يؤثر مباشرة على سرعة القراءة، وتقليل استهلاك التخزين، وإمكانية تنفيذ تحليلات عمودية على أعمدة محددة دون تحميل الملف بالكامل.

المتطلبات التقنية قبل التنفيذ

قبل كتابة المشروع، يفضّل أن تكون البيئة مجهزة كما في إعداد بيئة PySpark: معالجة البيانات الموزعة على عدة أجهزة في نفس الوقت. كذلك إذا كنت ما تزال في بداياتك مع بيئات التحليل، فستفيدك مراجعة إعداد مختبر البيانات: تثبيت بيئة Jupyter Notebook ومكتبات التحليل الأساسية.

سنحتاج إلى المكونات التالية:

  • لغة Python.
  • مكتبة requests لاستخراج البيانات من الواجهة.
  • مكتبة PySpark للمعالجة الموزعة.
  • مسار تخزين محلي أو سحابي لحفظ المخرجات.

المرحلة الأولى: استخراج البيانات من API

مرحلة Extract لا تعني فقط جلب البيانات، بل أيضاً التعامل مع الاستجابة بطريقة موثوقة: التحقق من حالة الطلب، إدارة المهلات الزمنية، ومراعاة شكل الحقول. وهذا يتوافق مع ما تم شرحه في استخراج البيانات (Extract): سحب ملايين السجلات من واجهات API وقواعد بيانات SQL.

import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, current_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

API_URL = "https://jsonplaceholder.typicode.com/users"

response = requests.get(API_URL, timeout=30)
response.raise_for_status()

raw_data = response.json()

spark = SparkSession.builder \
    .appName("Mini_ETL_API_PySpark_Project") \
    .getOrCreate()

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("website", StringType(), True)
])

records = [
    {
        "id": item.get("id"),
        "name": item.get("name"),
        "username": item.get("username"),
        "email": item.get("email"),
        "phone": item.get("phone"),
        "website": item.get("website")
    }
    for item in raw_data
]

df = spark.createDataFrame(records, schema=schema)
df.show(truncate=False)

في هذا الجزء أنشأنا جلسة SparkSession ثم حددنا Schema صريحاً بدلاً من ترك الاستنتاج التلقائي. هذه الممارسة مهمة لأن التخمين الآلي للأنواع قد ينجح في المشاريع الصغيرة، لكنه يسبب مشكلات في البيئات الإنتاجية حين تتغير بنية البيانات فجأة.

المرحلة الثانية: تحويل البيانات وتنظيفها

هنا تبدأ القيمة الحقيقية لخط ETL. سنقوم بتوحيد النصوص، إزالة الفراغات، استبعاد السجلات غير المكتملة، وإضافة طابع زمني لعملية المعالجة. وإذا كنت مهتماً بالتفاصيل المنهجية للتنظيف، فراجع تنظيف البيانات (Data Cleaning): اكتشاف ومعالجة القيم المفقودة (Missing Values) ومعالجة البيانات المكررة والمشوهة (Duplicates & Outliers) باستخدام بايثون.

clean_df = df.select(
    col("id"),
    trim(col("name")).alias("full_name"),
    lower(trim(col("username"))).alias("username"),
    lower(trim(col("email"))).alias("email"),
    trim(col("phone")).alias("phone"),
    lower(trim(col("website"))).alias("website")
).dropDuplicates(["id"]) \
 .filter(col("email").isNotNull() & col("username").isNotNull()) \
 .withColumn("ingestion_timestamp", current_timestamp())

clean_df.show(truncate=False)
clean_df.printSchema()

هذا التحويل البسيط يحمل أكثر من فائدة: أولاً يجعل البيانات متجانسة، وثانياً يرفع جودة المطابقة عند الربط مع جداول أخرى، وثالثاً يضيف أثراً زمنياً ضرورياً للمراقبة والتدقيق. في الأنظمة الكبيرة، وجود حقل مثل ingestion_timestamp يساعد فرق الحوكمة على تتبع مصدر كل دفعة بيانات.

تنفيذ تحليل سريع باستخدام Spark SQL

من الممارسات القوية داخل Spark تحويل DataFrame إلى جدول مؤقت وإجراء استعلامات مباشرة، كما في تنفيذ استعلامات SQL مباشرة على البيانات الضخمة داخل بيئة Spark.

clean_df.createOrReplaceTempView("users_cleaned")

result_df = spark.sql("""
    SELECT
        website,
        COUNT(*) AS total_users
    FROM users_cleaned
    GROUP BY website
    ORDER BY total_users DESC
""")

result_df.show(truncate=False)

هذا النمط مفيد عندما يعمل مهندسو البيانات مع محللين معتادين على SQL. كما أنه يسهّل بناء طبقة وسيطة يمكن الاستعلام عنها دون إعادة كتابة منطق التحويل في كل مرة.

إذا كانت بيانات API كبيرة أو مجزأة عبر صفحات Pagination، فمن الأفضل تخزين النسخة الخام أولاً داخل طبقة Raw Zone ثم تشغيل التحويلات عليها. هذا يمنحك قابلية إعادة المعالجة دون استهلاك إضافي لواجهة المصدر.

المرحلة الثالثة: تحميل البيانات وحفظها

في مرحلة Load سنحفظ البيانات النظيفة بصيغة محسنة. يمكن لاحقاً استيرادها إلى مستودعات تحليلية أو ربطها مع أنظمة BI. ولمزيد من التوسع المفاهيمي، راجع تحميل البيانات (Load): إدراج البيانات المعالجة في مستودعات البيانات (Data Warehouses).

output_path = "output/users_parquet"

clean_df.write \
    .mode("overwrite") \
    .parquet(output_path)

print(f"Data saved successfully to: {output_path}")

استخدام النمط overwrite مناسب للتجارب أو الدفعات الكاملة، لكن في البيئات الحقيقية قد تفضل append أو التحميل المرحلي وفق تاريخ التشغيل لتجنب فقدان الإصدارات السابقة.

أفضل ممارسات هندسية لتطوير المشروع

تحسين الأداء في PySpark يبدأ من التصميم الصحيح للبيانات، لا من إضافة عتاد أكثر فقط. تقليل عدد الأعمدة، اختيار أنواع بيانات دقيقة، وتجنب العمليات الواسعة غير الضرورية مثل shuffle المكثف، ينعكس مباشرة على زمن التنفيذ والتكلفة.

متى يصبح هذا المشروع نموذجاً إنتاجياً حقيقياً؟

يتحول هذا المثال إلى مشروع جاهز للإنتاج عندما تضيف إليه عناصر الحوكمة والموثوقية: تخزين البيانات الخام، فحص جودة الحقول، إدارة الإصدارات، التنبيه عند الفشل، وجدولة دورية واضحة. كذلك يمكن توسيعه لقراءة بيانات من أكثر من مصدر ثم تنفيذ عمليات Join ودمج كما في دمج وتوحيد الجداول (Merge, Join, Concat) لبناء قاعدة بيانات تحليلية شاملة.

كما يمكن لاحقاً استخدام البيانات الناتجة في لوحات معلومات، أو تدريب نماذج Machine Learning بعد تهيئتها، وهو امتداد منطقي لما يبدأ عادة من مدخل إلى علوم البيانات: كيف تحول الأرقام العشوائية إلى قرارات استراتيجية؟.

خاتمة

هذا المشروع المصغر يوضح كيف يمكن بناء مسار ETL حديث يبدأ من API، يمر عبر معالجة موزعة باستخدام PySpark، ثم ينتهي بتخزين منظم وقابل للتوسع. ورغم بساطة المثال، فإن المبادئ المستخدمة فيه هي نفسها التي تعتمد عليها البنى الحديثة في هندسة البيانات.

إذا أتقنت هذا النمط، فستصبح قادراً على بناء خطوط بيانات أكثر تعقيداً، سواء لبيانات دفعية Batch أو تدفقية Streaming مثل معالجة تدفق البيانات اللحظية (Spark Structured Streaming)، مع جاهزية أفضل للأنظمة التحليلية والقرارات المعتمدة على البيانات.

2 comments

اترك تعليقاً

لن يتم نشر عنوان بريدك الإلكتروني. الحقول الإلزامية مشار إليها بـ *