مشروع مصغر: بناء نموذج ذكاء اصطناعي يتنبأ باحتمالية إلغاء العملاء لاشتراكاتهم (Churn)

دقائق القراءة: 6

مشروع مصغر: بناء نموذج ذكاء اصطناعي يتنبأ باحتمالية إلغاء العملاء لاشتراكاتهم (Churn)

يُعد التنبؤ بإلغاء الاشتراكات من أكثر تطبيقات Machine Learning فائدةً في شركات الاتصالات، المنصات الرقمية، وخدمات SaaS. الفكرة الأساسية ليست مجرد معرفة من سيغادر، بل اكتشاف الإشارات المبكرة التي تسبق ذلك حتى تتدخل الشركة بعروض مناسبة أو دعم استباقي.

هذا المشروع يجمع بين مفاهيم مدخل إلى علوم البيانات: كيف تحول الأرقام العشوائية إلى قرارات استراتيجية؟ وبين الممارسة العملية في بناء نموذج تصنيف ثنائي. سنمر على مراحل جمع البيانات، تنظيفها، هندسة الميزات، التدريب، ثم التقييم التشغيلي للنموذج ضمن سياق تحليلي يمكن نقله لاحقاً إلى بيئات Big Data.

فهم مشكلة الأعمال وبنية البيانات

في سيناريو شائع، نمتلك جدولاً يحوي بيانات العملاء مثل مدة الاشتراك، عدد الشكاوى، قيمة الفاتورة، تكرار تسجيل الدخول، نوع الباقة، وطريقة الدفع. المتغير الهدف يكون عادةً عموداً مثل churn بقيمتين: 1 للعميل الذي غادر و0 لمن استمر.

قبل أي تدريب، نحتاج إلى التحقق من جودة البيانات. هذه المرحلة ترتبط مباشرة بمقالات مثل تنظيف البيانات (Data Cleaning): اكتشاف ومعالجة القيم المفقودة (Missing Values) ومعالجة البيانات المكررة والمشوهة (Duplicates & Outliers) باستخدام بايثون لأن النموذج يتأثر بشدة بالقيم الشاذة، التشفير غير المتسق، والسجلات الناقصة.

في أنظمة الاشتراكات الحقيقية، غالباً ما تكون بيانات الإلغاء غير متوازنة؛ فقد لا تتجاوز نسبة العملاء المغادرين 10% أو 15%. تجاهل هذه النقطة قد ينتج نموذجاً يبدو دقيقاً رقمياً، لكنه يفشل عملياً في اكتشاف العملاء المعرضين للمغادرة.

خط أنابيب المعالجة من الاستخراج إلى التدريب

1) قراءة البيانات وتوحيد الأنواع

يمكن البدء باستخدام Pandas إذا كان حجم البيانات مناسباً، خصوصاً بعد مراجعة مكتبة Pandas (1): قراءة واستدعاء البيانات من ملفات CSV و Excel برمجياً ومكتبة Pandas (2): استكشاف هيكل البيانات وفهم DataFrame و Series. الهدف هنا هو ضبط الأنواع الرقمية والنصية وتحويل الحقول المنطقية إلى صيغ قابلة للنمذجة.

import pandas as pd
import numpy as np

df = pd.read_csv("customer_churn.csv")

df["TotalCharges"] = pd.to_numeric(df["TotalCharges"], errors="coerce")
df["SeniorCitizen"] = df["SeniorCitizen"].astype(int)
df["Churn"] = df["Churn"].map({"Yes": 1, "No": 0})

df = df.drop_duplicates()
df = df.dropna(subset=["TotalCharges", "MonthlyCharges", "tenure", "Churn"])

print(df.info())
print(df["Churn"].value_counts(normalize=True))

2) الاستكشاف السريع واستخراج المؤشرات

الاستكشاف الإحصائي مهم لفهم العلاقات الأولية بين المتغيرات. هنا تظهر فائدة الإحصاء الوصفي والاستدلالي: مفاهيم لا غنى عنها لكل عالم بيانات والارتباط (Correlation): كيف تكتشف العلاقة الخفية بين المتغيرات (مثل السعر والطلب)؟. مثلاً، انخفاض مدة الاشتراك مع ارتفاع الشكاوى قد يكون مؤشراً قوياً على الإلغاء.

في كثير من الحالات، لا تكفي الأعمدة الأصلية وحدها. لذلك نحتاج إلى هندسة الميزات (Feature Engineering): كيف تستخرج بيانات جديدة من البيانات الحالية؟ لإنشاء متغيرات تمثل السلوك بصورة أدق.

df["AvgChargePerMonth"] = df["TotalCharges"] / (df["tenure"] + 1)
df["HasHighSupportCalls"] = (df["SupportCalls"] >= 3).astype(int)
df["IsNewCustomer"] = (df["tenure"] <= 6).astype(int)
df["UsageDropRatio"] = (df["UsageLastMonth"] / (df["UsageAvg3Months"] + 1)).round(3)

print(df[["AvgChargePerMonth", "HasHighSupportCalls", "IsNewCustomer", "UsageDropRatio"]].head())

تجهيز البيانات للنموذج

بما أن المسألة تصنيف ثنائي، يمكن الاعتماد على الانحدار اللوجستي (Logistic Regression): التنبؤ بالنتائج الثنائية كنقطة بداية ممتازة. لكن قبل ذلك يجب تنفيذ ترميز للمتغيرات الفئوية، ثم تطبيق إعداد البيانات للتدريب (Data Preprocessing): تحجيم البيانات (Scaling & Normalization) عندما يكون ذلك مناسباً للخوارزمية.

كما يجب إجراء تقسيم البيانات (Train/Test Split): لماذا يجب أن نختبر النموذج على بيانات لم يرها من قبل؟ حتى لا نقيس الأداء على نفس البيانات التي تعلّم منها النموذج.

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score

target = "Churn"
X = df.drop(columns=[target, "customerID"])
y = df[target]

numeric_features = ["tenure", "MonthlyCharges", "TotalCharges", "AvgChargePerMonth", "UsageDropRatio"]
categorical_features = ["Contract", "InternetService", "PaymentMethod"]

numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore"))
])

preprocessor = ColumnTransformer(transformers=[
    ("num", numeric_transformer, numeric_features),
    ("cat", categorical_transformer, categorical_features)
])

model = Pipeline(steps=[
    ("preprocessor", preprocessor),
    ("classifier", LogisticRegression(max_iter=1000, class_weight="balanced"))
])

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]

print(classification_report(y_test, y_pred))
print("ROC-AUC:", roc_auc_score(y_test, y_prob))

مقارنة النماذج وتحسين الدقة

رغم بساطة LogisticRegression، إلا أنه يوفر قابلية تفسير ممتازة. بعده يمكن تجربة شجرة القرارات (Decision Trees) أو الغابات العشوائية (Random Forest) لالتقاط العلاقات غير الخطية بين المتغيرات.

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score

rf_model = Pipeline(steps=[
    ("preprocessor", preprocessor),
    ("classifier", RandomForestClassifier(
        n_estimators=300,
        max_depth=10,
        min_samples_split=10,
        class_weight="balanced",
        random_state=42
    ))
])

rf_model.fit(X_train, y_train)
rf_pred = rf_model.predict(X_test)
rf_prob = rf_model.predict_proba(X_test)[:, 1]

print("F1 Score:", f1_score(y_test, rf_pred))
print("ROC-AUC:", roc_auc_score(y_test, rf_prob))

لا ينبغي تقييم النموذج بالاعتماد على Accuracy فقط. في مشاكل الإلغاء، تكون مقاييس مثل Recall وF1-Score وROC-AUC أكثر دلالة، لأنها تقيس قدرة النظام على التقاط العملاء المهددين فعلاً.

إذا كانت تكلفة خسارة العميل عالية، فمن الأفضل أحياناً خفض عتبة القرار threshold لزيادة اكتشاف العملاء المعرضين للإلغاء، حتى لو ارتفع عدد الإنذارات الكاذبة قليلاً.

التوسع إلى بيئات البيانات الضخمة

عندما تكبر البيانات إلى عشرات الملايين من السجلات، يصبح تنفيذ نفس المنطق عبر Apache Spark أكثر عملية من المعالجة المحلية. هنا ننتقل من نمط التحليل المكتبي إلى بنية ETL Pipeline موزعة، حيث تتم القراءة من مخزن بيانات، ثم التحويل، ثم التدريب على شكل مراحل قابلة للأتمتة.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("churn_prediction").getOrCreate()

sdf = spark.read.csv("customer_churn.csv", header=True, inferSchema=True)

sdf = sdf.withColumn("ChurnLabel", when(col("Churn") == "Yes", 1).otherwise(0))

indexers = [
    StringIndexer(inputCol="Contract", outputCol="ContractIndex", handleInvalid="keep"),
    StringIndexer(inputCol="InternetService", outputCol="InternetServiceIndex", handleInvalid="keep"),
    StringIndexer(inputCol="PaymentMethod", outputCol="PaymentMethodIndex", handleInvalid="keep")
]

assembler = VectorAssembler(
    inputCols=["tenure", "MonthlyCharges", "TotalCharges", "ContractIndex", "InternetServiceIndex", "PaymentMethodIndex"],
    outputCol="features_raw"
)

scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=False)
lr = LogisticRegression(featuresCol="features", labelCol="ChurnLabel", maxIter=50)

pipeline = Pipeline(stages=indexers + [assembler, scaler, lr])

train_df, test_df = sdf.randomSplit([0.8, 0.2], seed=42)
spark_model = pipeline.fit(train_df)
predictions = spark_model.transform(test_df)

predictions.select("ChurnLabel", "prediction", "probability").show(10, truncate=False)

دور قواعد البيانات والاستعلامات التحليلية

قبل الوصول إلى التدريب، كثيراً ما تُجمع البيانات من جداول متعددة: الفوترة، الدعم الفني، النشاط، والعقود. في هذه المرحلة تُستخدم مهارات دمج وتوحيد الجداول (Merge, Join, Concat) لبناء قاعدة بيانات تحليلية شاملة مع استعلامات SQL لاستخراج صورة موحدة لكل عميل.

query = """
SELECT
    c.customer_id,
    c.tenure,
    c.contract_type AS Contract,
    b.monthly_charges AS MonthlyCharges,
    b.total_charges AS TotalCharges,
    s.support_calls AS SupportCalls,
    u.usage_last_month AS UsageLastMonth,
    u.usage_avg_3m AS UsageAvg3Months,
    c.payment_method AS PaymentMethod,
    c.internet_service AS InternetService,
    c.churn_flag AS Churn
FROM customers c
LEFT JOIN billing b ON c.customer_id = b.customer_id
LEFT JOIN support_tickets s ON c.customer_id = s.customer_id
LEFT JOIN usage_metrics u ON c.customer_id = u.customer_id
"""
print(query)

أفضل تصميم تحليلي لمشكلة Churn هو فصل طبقة البيانات الخام عن الطبقة المنظفة، ثم إنشاء طبقة ميزات مستقرة تُغذّي النموذج بشكل يومي أو أسبوعي. هذا يقلل تكرار الأخطاء ويرفع قابلية المراقبة والإعادة.

خاتمة تنفيذية

هذا المشروع المصغر يوضح أن بناء نموذج للتنبؤ بإلغاء العملاء ليس مجرد تشغيل خوارزمية، بل سلسلة مترابطة تبدأ بفهم الأعمال، مروراً بتنظيف البيانات وتوليد الميزات، ثم اختيار النموذج المناسب وقياس أدائه بمقاييس صحيحة. وعندما تتوسع الشركة، يمكن نقل نفس المنهجية من Pandas إلى PySpark ضمن بنية هندسية أكثر نضجاً.

إذا طُبّق هذا النموذج داخل حملة احتفاظ ذكية، فسيمنح فرق التسويق وخدمة العملاء قدرة عملية على التدخل قبل وقوع الخسارة. وهذه هي القيمة الحقيقية لعلوم البيانات: تحويل السجلات الصامتة إلى قرارات قابلة للتنفيذ وعائد أعمال ملموس.

اترك تعليقاً

لن يتم نشر عنوان بريدك الإلكتروني. الحقول الإلزامية مشار إليها بـ *