مشروع عملي (2): خط أنابيب بيانات (ETL) متكامل يجلب أسعار الأسهم اللحظية ويحللها ويتنبأ بحركتها

دقائق القراءة: 6

مشروع عملي (2): خط أنابيب بيانات ETL متكامل يجلب أسعار الأسهم اللحظية ويحللها ويتنبأ بحركتها

في هذا المشروع العملي نبني مساراً متكاملاً يبدأ من جلب أسعار الأسهم من واجهة API لحظية، ثم يمر بمرحلة تنظيف وتحويل وتخزين وتحليل، وينتهي بتوليد توقع مبسط لاتجاه الحركة القادمة. هذا النوع من المشاريع يجمع بين مفاهيم بناء خطوط أنابيب البيانات (ETL – Extract, Transform, Load) باستخدام بايثون، وتحليل السلاسل الزمنية، والتكامل بين Python وSQL وPySpark.

الهدف ليس تقديم نموذج تداول حقيقي، بل شرح كيف تُبنى بنية بيانات موثوقة وقابلة للتوسع لمعالجة بيانات مالية سريعة التغير. لذلك سنركز على هندسة البيانات بقدر تركيزنا على التحليل، لأن جودة التنبؤ تبدأ من جودة خط الإدخال والتحويل، لا من الخوارزمية فقط.

فكرة المشروع والمعمارية العامة

المسار المقترح يعمل على مراحل متتالية: استخراج الأسعار اللحظية، توحيد البنية، تنظيف القيم، اشتقاق المؤشرات الفنية البسيطة، تحميل النتائج إلى مخزن تحليلي، ثم تدريب نموذج تصنيفي يتنبأ هل سيغلق السعر التالي أعلى أم أقل من السعر الحالي.

في البيانات المالية اللحظية، الخطأ الشائع هو بناء نموذج قبل تثبيت تعريف موحد للزمن والسوق والمصدر. أي اختلاف في المنطقة الزمنية أو تكرار السجلات أو فقدان ترتيب الأحداث سيؤدي إلى تحيزات خطيرة في التحليل والتوقع.

مرحلة الاستخراج Extract

المرحلة الأولى تعتمد على سحب الأسعار من واجهة خارجية وإرجاعها على شكل JSON. من المهم حفظ الاستجابة الخام أولاً حتى نمتلك نسخة مرجعية للمراجعة أو إعادة المعالجة، وهي ممارسة مهمة في المشاريع التي تتطلب تتبع المصدر.

إذا كنت قد قرأت استخراج البيانات (Extract): سحب ملايين السجلات من واجهات API وقواعد بيانات SQL فستلاحظ أن استخراج البيانات الإنتاجي يجب أن يراعي حدود الطلبات، وإعادة المحاولة عند الفشل، وتسجيل الأخطاء بدقة.

import requests
import pandas as pd
from datetime import datetime

API_KEY = "YOUR_API_KEY"
symbols = ["AAPL", "MSFT", "GOOGL"]

def fetch_stock_quotes(symbol):
    url = f"https://example-finance-api.com/quote?symbol={symbol}&apikey={API_KEY}"
    response = requests.get(url, timeout=20)
    response.raise_for_status()
    data = response.json()
    return {
        "symbol": symbol,
        "price": float(data["price"]),
        "volume": int(data.get("volume", 0)),
        "event_time": data["timestamp"],
        "ingestion_time": datetime.utcnow().isoformat()
    }

rows = [fetch_stock_quotes(symbol) for symbol in symbols]
raw_df = pd.DataFrame(rows)
print(raw_df.head())

مرحلة التحويل والتنظيف Transform

بعد الاستخراج نبدأ بتنظيف البنية وتحويل أنواع البيانات. هنا تظهر أهمية فهم مكتبة Pandas (2): استكشاف هيكل البيانات وفهم DataFrame و Series لأن التعامل مع الأعمدة الزمنية والرقمية يجب أن يكون صارماً، خصوصاً في البيانات الحساسة مثل الأسعار.

نقوم بحذف السجلات الفاسدة، وترتيب القيم زمنياً، وإنشاء مؤشرات مشتقة مثل العائد النسبي ومتوسط الحركة القصير والطويل. هذه الخطوة مرتبطة مباشرة بمقال هندسة الميزات (Feature Engineering): كيف تستخرج بيانات جديدة من البيانات الحالية؟.

import numpy as np

df = raw_df.copy()
df["event_time"] = pd.to_datetime(df["event_time"], utc=True)
df = df.dropna(subset=["price", "event_time"])
df = df[df["price"] > 0]
df = df.sort_values(["symbol", "event_time"])

df["return_1"] = df.groupby("symbol")["price"].pct_change()
df["ma_3"] = df.groupby("symbol")["price"].transform(lambda s: s.rolling(3).mean())
df["ma_5"] = df.groupby("symbol")["price"].transform(lambda s: s.rolling(5).mean())
df["volatility_5"] = df.groupby("symbol")["return_1"].transform(lambda s: s.rolling(5).std())

df["target_up"] = (
    df.groupby("symbol")["price"].shift(-1) > df["price"]
).astype(int)

df = df.dropna()
print(df.tail())

لماذا هذه الميزات مفيدة؟

  • العائد return_1 يقيس التغير النسبي بين نقطتين زمنيتين.
  • المتوسطات المتحركة تكشف الزخم والاتجاه القصير.
  • التذبذب volatility يساعد على رصد تغير المخاطر.
  • المتغير الهدف target_up يحول المشكلة إلى تصنيف ثنائي.

التخزين والتحليل باستخدام SQL وSpark

عندما تنتقل البيانات من عشرات السجلات إلى ملايين النقاط اللحظية يومياً، تصبح المعالجة المحلية محدودة. هنا نستفيد من ما هو Apache Spark؟ ولماذا تتوقف مكتبة Pandas عن العمل مع البيانات الضخمة (Big Data)؟ ومن قراءة وتحليل ملفات ضخمة (بحجم جيجابايت) في ثوانٍ باستخدام PySpark DataFrames.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, stddev

spark = SparkSession.builder.appName("stock-etl").getOrCreate()

spark_df = spark.createDataFrame(df)

spark_df.write.mode("overwrite").saveAsTable("stock_features")

summary_df = (
    spark_df.groupBy("symbol")
    .agg(
        avg(col("price")).alias("avg_price"),
        avg(col("return_1")).alias("avg_return"),
        stddev(col("return_1")).alias("return_std")
    )
)

summary_df.show()

بعد تحميل البيانات إلى جدول تحليلي يمكن تشغيل استعلامات مجمعة لاستخراج ملخصات الأداء، أو ربطها بمصادر أخرى مثل الأخبار أو أحجام التداول التاريخية. هذه المرونة هي جوهر بناء طبقة تحليلية قابلة للنمو.

إذا كان مصدر البيانات لحظياً، فمن الأفضل فصل التخزين إلى طبقتين: طبقة خام Raw غير معدلة، وطبقة نظيفة Curated. هذا التصميم يسهل التدقيق، ويخفض تكلفة إعادة الحساب، ويحسن الثقة في نتائج النماذج.

بناء نموذج التنبؤ بالاتجاه القادم

الآن ننتقل إلى جزء Machine Learning. سنستخدم نموذجاً بسيطاً مثل RandomForestClassifier للتنبؤ بارتفاع أو انخفاض السعر التالي. هذا الأسلوب مفيد تعليمياً لأنه يربط بين هندسة البيانات وبناء النماذج بصورة عملية.

وحتى تكون التجربة منهجية، من المهم فهم تقسيم البيانات (Train/Test Split): لماذا يجب أن نختبر النموذج على بيانات لم يرها من قبل؟، إضافة إلى أساليب تقييم نماذج الانحدار (MSE, RMSE, R2): كيف تعرف أن توقعات الذكاء الاصطناعي دقيقة؟ أو تقييم التصنيف في المسائل الثنائية.

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

feature_cols = ["price", "volume", "return_1", "ma_3", "ma_5", "volatility_5"]
X = df[feature_cols]
y = df["target_up"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, shuffle=False
)

model = RandomForestClassifier(
    n_estimators=200,
    max_depth=6,
    random_state=42
)

model.fit(X_train, y_train)
predictions = model.predict(X_test)

print("Accuracy:", accuracy_score(y_test, predictions))
print(classification_report(y_test, predictions))

قراءة النتائج بشكل صحيح

إذا حصلت على دقة مرتفعة ظاهرياً فلا تتسرع. بيانات الأسواق تتغير هيكلياً، وقد ينجح النموذج على فترة قصيرة ثم يفشل لاحقاً. لذلك يجب اختبار الاستقرار الزمني، وعدم الاعتماد على مؤشر واحد فقط، ومقارنة الأداء بخط أساس بسيط مثل توقع استمرار الاتجاه الحالي.

التشغيل الآلي وجدولة المسار

المشروع يزداد قيمة عندما يتحول من سكربت تجريبي إلى مسار آلي. يمكنك جدولة التنفيذ كل دقيقة أو كل خمس دقائق باستخدام أتمتة خطوط الـ ETL: الجدولة باستخدام مكتبة Schedule وتشغيلها في الخلفية، أو الانتقال إلى إدارة احترافية عبر مقدمة في Apache Airflow: الأداة الأقوى عالمياً لجدولة وإدارة سير عمل البيانات.

  • مهمة لجلب الأسعار وتخزين النسخة الخام.
  • مهمة لتنظيف البيانات واشتقاق الميزات.
  • مهمة لتحميل النتائج إلى قاعدة تحليلية.
  • مهمة لتحديث النموذج أو تشغيل التنبؤات.
  • مهمة للتنبيه عند فشل أي مرحلة.

في بيئات الإنتاج، لا تجعل النموذج جزءاً مباشراً من خطوة الاستخراج. الأفضل فصل المسار إلى خدمات مستقلة: خدمة إدخال، وخدمة تحويل، وخدمة تدريب، وخدمة تنبؤ. هذا يقلل نقاط الفشل ويرفع قابلية التوسع والمراقبة.

توسعات احترافية للمشروع

بعد اكتمال النسخة الأساسية، يمكن توسيع المشروع بطرق أكثر تقدماً. مثلاً يمكن ربط بيانات الأسعار مع الأخبار المالية وتحليل المشاعر باستخدام تقنيات NLP، أو تخزين البيانات الخام في كائنات سحابية مثل AWS S3، أو بناء طبقة استعلام ضخمة عبر BigQuery.

كما يمكن الانتقال من نموذج تصنيف بسيط إلى نماذج أكثر ملاءمة للبيانات الزمنية، مع الاستفادة من التعامل مع التواريخ والوقت (Datetime): تحليل التوجهات الزمنية (Time Series) لتحسين التقطيع الزمني للبيانات وتجنب تسرب المعلومات من المستقبل إلى الماضي.

الخلاصة

هذا المشروع يوضح أن القيمة الحقيقية في عالم البيانات المالية لا تأتي من النموذج وحده، بل من خط أنابيب ETL المتين الذي يضمن جودة المصدر، وصحة التحويل، وكفاءة التخزين، وقابلية التحليل لاحقاً. ومن دون هذه الأسس ستصبح أي توقعات مجرد أرقام جذابة بلا موثوقية تشغيلية.

إذا أتقنت هذا النوع من المشاريع، فأنت لا تبني مجرد تجربة تعليمية، بل تضع اللبنة الأولى لنظام تحليلي مالي قابل للتوسع، سواء للاستخدام البحثي أو لتغذية لوحات معلومات، أو لدعم نماذج تنبؤية أكثر ذكاءً في المستقبل.

اترك تعليقاً

لن يتم نشر عنوان بريدك الإلكتروني. الحقول الإلزامية مشار إليها بـ *