كيفية استخدام PySpark في معالجة البيانات وتعلّم الآلة

دقائق القراءة: 8

مقدمة إلى PySpark ولماذا يحظى بهذه الأهمية؟

يُعد PySpark الواجهة البرمجية التي تتيح استخدام Apache Spark عبر لغة Python، وهو خيار شائع لدى فرق هندسة البيانات وعلوم البيانات عند التعامل مع مجموعات بيانات كبيرة أو تنفيذ مهام تحليلية وتعلّم آلي على نطاق واسع. تكمن أهميته في أنه يجمع بين سهولة كتابة الشيفرات بلغة مألوفة مثل Python، وبين قوة المعالجة الموزعة التي يوفرها Spark.

في الأصل، كُتب Apache Spark بلغة Scala، لكن مجتمع المشروع وفّر طبقة تكامل قوية تُمكّن المطورين من استخدامه عبر PySpark. وبهذا أصبح بالإمكان إدارة البيانات، وبناء DataFrame، والعمل مع RDD، واستخدام مكتبات تعلّم الآلة دون مغادرة بيئة Python.

واجهة توضيحية لمكتبة بايسبارك واستخدامها في معالجة البيانات الضخمة وتعلم الآلة

متى تحتاج إلى PySpark بدلاً من الأدوات التقليدية؟

إذا كنت تتعامل مع بيانات صغيرة أو متوسطة، فقد تكفيك مكتبات مثل pandas. لكن عندما تبدأ البيانات بالنمو إلى عشرات أو مئات الجيجابايت، تصبح المعالجة على جهاز واحد محدودة بسبب الذاكرة والقدرة الحاسوبية. هنا يظهر دور PySpark بوضوح، لأنه يسمح بتوزيع العمل على أكثر من عقدة داخل بيئة عنقودية Cluster.

فعلى سبيل المثال، قد يتمكن جهازك المحلي من تحميل جزء من البيانات فقط، لكن عند التعامل مع بيانات بحجم كبير جداً، تصبح المعالجة الموزعة ضرورة وليست رفاهية. ومن أبرز ما يقدمه Apache Spark في هذا السياق:

  • تنفيذ أسرع للعمليات التحليلية مقارنةً بالأساليب التقليدية.
  • القدرة على معالجة البيانات الضخمة بشكل موزع.
  • دعم لغات متعددة مثل Python وJava وScala وR.
  • دمج إمكانات SQL وStreaming والتحليلات المتقدمة ضمن منصة واحدة.
  • إمكانية التشغيل على البيئات السحابية مثل AWS وDatabricks.

الموضوعات الأساسية التي يغطيها تعلم PySpark

عند البدء مع PySpark، من المفيد ترتيب التعلّم على مراحل واضحة، لأن المنظومة واسعة وتشمل أكثر من مجرد قراءة الملفات أو تصفية الصفوف. عادةً ما يمر المسار العملي بالمحاور التالية:

  • مقدمة حول PySpark وطريقة التثبيت.
  • التعامل مع PySpark DataFrame.
  • معالجة القيم المفقودة Missing Values.
  • التجميع باستخدام GroupBy والدوال التجميعية.
  • استخدام مكتبة MLlib لتعلّم الآلة.
  • التطبيق العملي على منصات سحابية مثل Databricks.
  • تنفيذ نماذج مثل الانحدار الخطي Linear Regression.

تثبيت PySpark وتهيئة بيئة العمل

إنشاء بيئة مستقلة

من الأفضل إنشاء بيئة افتراضية جديدة قبل تثبيت PySpark لتجنّب التعارض مع الحزم الأخرى. بعد ذلك يمكن تثبيت المكتبة عبر الأمر التالي:

pip install pyspark

وللتأكد من نجاح التثبيت يمكنك تجربة الاستيراد داخل مفسر Python أو داخل دفتر Jupyter Notebook:

import pyspark

إذا ظهرت أخطاء، فغالباً يكون السبب مرتبطاً بالبيئة أو توافق الإصدارات، لذلك يُنصح بالعمل ضمن بيئة جديدة ونظيفة.

بدء جلسة Spark

قبل تنفيذ أي عملية فعلية في PySpark، يجب إنشاء جلسة عمل من خلال الكائن SparkSession، لأنه البوابة الأساسية للتعامل مع القراءة والتحويل والمعالجة.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("practice") \
    .getOrCreate()

في التشغيل المحلي، ستجد عادةً عقدة رئيسية واحدة فقط. أما في البيئات السحابية أو العنقودية، فيمكنك توزيع المهام على عدة عقد بحسب البنية المتاحة.

قراءة ملفات CSV باستخدام PySpark

قراءة البيانات في PySpark تشبه الفكرة العامة في مكتبات تحليل البيانات الأخرى، لكن الصياغة تختلف قليلاً. فيما يلي مثال أساسي لقراءة ملف CSV:

df = spark.read.csv("test1.csv")

لكن هذا الأسلوب وحده قد يجعل PySpark يتعامل مع الصف الأول كبيانات عادية، ويُنشئ أسماء أعمدة افتراضية مثل _c0 و_c1. لذلك من الأفضل تحديد أن الملف يحتوي على ترويسة Header:

df = spark.read.option("header", True).csv("test1.csv")

ولتحسين اكتشاف أنواع البيانات تلقائياً، يمكنك تفعيل inferSchema:

df = spark.read.csv("test1.csv", header=True, inferSchema=True)

بعدها يمكنك عرض البيانات باستخدام:

df.show()

فحص بنية البيانات

من المهم بعد القراءة التأكد من أسماء الأعمدة وأنواعها. يوفر PySpark الأمر printSchema() لهذا الغرض:

df.printSchema()

كما يمكنك معرفة أسماء الأعمدة عبر:

df.columns

ومعرفة أنواع البيانات بصورة مختصرة باستخدام:

df.dtypes

التعامل مع DataFrame في PySpark

يُعد DataFrame أحد أهم هياكل البيانات في PySpark، وهو قريب من مفهوم DataFrame في pandas، لكن مع قدرات مهيأة للعمل الموزع.

استعراض الصفوف الأولى

df.head()
df.show()

اختيار عمود واحد أو عدة أعمدة

df.select("name").show()
df.select("name", "experience").show()

إرجاع select() يكون أيضاً من نوع DataFrame، وهذا مفيد لأنه يسمح بإكمال سلسلة من العمليات عليه لاحقاً.

الحصول على وصف إحصائي سريع

إذا أردت معاينة المؤشرات الوصفية الأساسية للأعمدة الرقمية، يمكنك استخدام:

df.describe().show()

هذا الأمر يعرض قيماً مثل المتوسط والانحراف المعياري والحدين الأدنى والأعلى، وهو مفيد جداً في المراجعة الأولية للبيانات.

إضافة الأعمدة وحذفها وإعادة تسميتها

إضافة عمود جديد

يسمح PySpark بإنشاء أعمدة مشتقة باستخدام الدالة withColumn(). مثلاً إذا أردت إنشاء عمود جديد يمثل الخبرة بعد سنتين:

df = df.withColumn("experience_after_2_years", df["experience"] + 2)

ثم عرضه:

df.show()

حذف عمود

df = df.drop("experience_after_2_years")

إعادة تسمية عمود

df = df.withColumnRenamed("name", "new_name")

هذه العمليات أساسية جداً في تهيئة البيانات قبل إدخالها إلى نماذج التعلّم الآلي.

معالجة القيم المفقودة في PySpark

التعامل مع القيم المفقودة خطوة محورية في أي مشروع بيانات. ويوفر PySpark أكثر من أسلوب لحذف القيم الناقصة أو تعويضها.

حذف الصفوف أو الأعمدة

لحذف عمود كامل:

df.drop("name").show()

أما لحذف الصفوف التي تحتوي على قيم مفقودة:

df.na.drop().show()

يمكن كذلك التحكم في طريقة الحذف عبر معاملات مثل how وthresh وsubset.

استخدام how لتحديد منطق الحذف

df.na.drop(how="any").show()
df.na.drop(how="all").show()
  • any: حذف الصف إذا احتوى على أي قيمة مفقودة.
  • all: حذف الصف فقط إذا كانت جميع قيمه مفقودة.

استخدام thresh

يسمح thresh بتحديد الحد الأدنى لعدد القيم غير المفقودة التي يجب أن تبقى في الصف:

df.na.drop(thresh=2).show()

استخدام subset

إذا أردت تطبيق الحذف على أعمدة محددة فقط:

df.na.drop(subset=["experience"]).show()

تعويض القيم المفقودة

يمكن استبدال القيم الفارغة مباشرة بقيمة نصية أو رقمية:

df.na.fill("Missing Values").show()

أو على أعمدة محددة:

df.na.fill("Missing Values", ["experience", "age"]).show()

التعويض بالمتوسط أو الوسيط

في الأعمدة الرقمية، من الأفضل غالباً التعويض بقيمة إحصائية مناسبة مثل المتوسط mean أو الوسيط median. ويمكن تنفيذ ذلك عبر Imputer:

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["age", "experience", "salary"],
    outputCols=["{}_imputed".format(c) for c in ["age", "experience", "salary"]]
).setStrategy("mean")

df = imputer.fit(df).transform(df)

هذا الأسلوب مناسب جداً عند تجهيز البيانات للنماذج، لأنه يقلل تأثير القيم المفقودة دون خسارة عدد كبير من الصفوف.

تصفية البيانات باستخدام filter()

التصفية من العمليات اليومية في تحليل البيانات، سواء كنت تريد استخراج فئة محددة من المستخدمين أو تطبيق شروط على الرواتب أو الفواتير أو السلوكيات.

مثال على التصفية بشرط واحد

df.filter("salary <= 20000").show()

كما يمكنك بعد التصفية اختيار أعمدة معينة فقط:

df.filter("salary <= 20000").select("name", "age").show()

استخدام شروط منطقية متعددة

df.filter((df["salary"] <= 20000) & (df["salary"] >= 15000)).show()

ويمكن استبدال العامل & بالعامل | عند الحاجة إلى شرط OR.

التصفية العكسية

لتنفيذ شرط عكسي باستخدام النفي NOT:

df.filter(~(df["salary"] <= 20000)).show()

هذه المرونة تجعل PySpark أداة قوية جداً في تنظيف البيانات واستخراج الشرائح التحليلية المطلوبة.

استخدام GroupBy والدوال التجميعية

عند الرغبة في فهم البيانات على مستوى الفئات أو الأقسام أو المستخدمين، تصبح عمليات groupBy() ضرورية. وهي تُستخدم عادةً مع دوال مثل sum() وmean() وcount() وmax().

تجميع حسب الاسم

df.groupBy("name").sum().show()

يفيد هذا النوع من التحليل عندما تريد حساب مجموع الرواتب أو إجمالي السجلات المرتبطة بكل شخص.

تجميع حسب القسم

df.groupBy("department").sum().show()
df.groupBy("department").mean().show()
df.groupBy("department").count().show()

بهذا يمكنك الإجابة عن أسئلة مهمة مثل:

  • أي قسم يحصل على أعلى إجمالي رواتب؟
  • كم عدد الموظفين في كل قسم؟
  • ما متوسط الأجور في كل إدارة؟

استخدام الدوال التجميعية مباشرة

df.agg({"salary": "sum"}).show()

هذا النمط مفيد عندما تحتاج إلى قيمة إجمالية واحدة سريعة دون تجميع حسب فئة معينة.

تعلّم الآلة في PySpark عبر MLlib

لا يقتصر دور PySpark على تنظيف البيانات وتحويلها، بل يمتد إلى بناء نماذج تعلّم آلي باستخدام مكتبة MLlib. وهي مكتبة قوية توفر خوارزميات للانحدار والتصنيف والتجميع ومعالجة الخصائص.

لماذا تحتاج إلى تجهيز الخصائص أولاً؟

في PySpark ML، لا يتم تمرير الأعمدة المستقلة إلى النموذج كلٌّ على حدة كما هو شائع في بعض الأدوات التقليدية، بل تُدمج غالباً في عمود واحد متجهي باستخدام VectorAssembler.

from pyspark.ml.feature import VectorAssembler

featureassembler = VectorAssembler(
    inputCols=["age", "experience"],
    outputCol="independent_features"
)

output = featureassembler.transform(training)

بعد ذلك يتم اختيار العمود المتجهي مع العمود الهدف:

finalized_data = output.select("independent_features", "salary")

تقسيم البيانات إلى تدريب واختبار

train_data, test_data = finalized_data.randomSplit([0.75, 0.25])

تطبيق الانحدار الخطي

from pyspark.ml.regression import LinearRegression

regressor = LinearRegression(
    featuresCol="independent_features",
    labelCol="salary"
)

regressor = regressor.fit(train_data)

بعد التدريب يمكنك عرض المعاملات coefficients والحد الثابت intercept، ثم تقييم أداء النموذج على بيانات الاختبار.

توليد التنبؤات وقياس الأداء

pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

كما يمكنك الحصول على مقاييس مثل:

  • r2
  • meanAbsoluteError
  • meanSquaredError

هذه المقاييس تساعدك على فهم جودة النموذج قبل اعتماده أو تطويره.

التعامل مع المتغيرات الفئوية في نماذج PySpark

عند بناء نموذج أكثر واقعية، ستجد أعمدة نصية مثل الجنس أو اليوم أو الحالة أو نوع العميل. هذه القيم لا يمكن إدخالها مباشرة إلى النموذج، لذا يجب تحويلها إلى تمثيل رقمي.

استخدام StringIndexer

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(
    inputCols=["smoker", "day", "time"],
    outputCols=["smoker_indexed", "day_indexed", "time_indexed"]
)

df = indexer.fit(df).transform(df)

بعد تحويل الأعمدة الفئوية إلى تمثيل رقمي، يمكنك ضمها مع بقية الخصائص في VectorAssembler ثم تمريرها إلى نموذج الانحدار أو أي خوارزمية أخرى.

العمل على Databricks لتشغيل PySpark على السحابة

إذا كنت تريد بيئة أكثر احترافية من التشغيل المحلي، فغالباً ستجد Databricks خياراً ممتازاً. المنصة مبنية لتسهيل تحليل البيانات وبناء النماذج وتشغيل أحمال Spark على بنية سحابية مرنة.

ما الذي يميز Databricks؟

  • إنشاء عناقيد Clusters بسهولة.
  • تشغيل دفاتر تفاعلية شبيهة بـ Jupyter.
  • ربط مباشر بخدمات سحابية مثل AWS وAzure وGoogle Cloud.
  • إدارة ملفات البيانات ومصادرها من مكان واحد.
  • دعم مكتبات إضافية وتجارب MLflow.

ماذا يمكنك أن تفعل في النسخة المجانية؟

النسخة المجتمعية Community Edition مناسبة للتعلّم والتجارب الأساسية، إذ تتيح لك:

  • إنشاء دفتر جديد.
  • رفع ملفات البيانات.
  • إنشاء عنقود واحد للتجارب.
  • تشغيل أوامر PySpark مباشرة.

وعند رفع ملف بيانات، يمكنك قراءته داخل الدفتر بالطريقة المعتادة:

df = spark.read.csv(file_location, header=True, inferSchema=True)
df.show()

هذه الخطوة مفيدة جداً لتجربة نفس أوامر PySpark التي تستخدمها محلياً، لكن داخل بيئة أقرب إلى الاستخدام العملي في الشركات.

أفضل ممارسات عند تعلم واستخدام PySpark

  1. ابدأ ببيانات صغيرة لفهم البنية والأوامر قبل الانتقال إلى البيانات الضخمة.
  2. احرص على فهم DataFrame جيداً قبل الدخول في MLlib.
  3. تعامل مع القيم المفقودة والأنواع النصية مبكراً في خط المعالجة.
  4. استخدم printSchema() باستمرار لتفادي الأخطاء الناتجة عن الأنواع غير المتوقعة.
  5. جرّب أوامر التصفية والتجميع بكثرة لأنها أساس المعالجة الفعلية.
  6. عند العمل السحابي، تعلّم ربط Spark بمخازن مثل S3 لتوسيع قدراتك العملية.

هل PySpark مناسب للمبتدئين؟

نعم، لكنه يصبح أسهل كثيراً إذا كنت تملك خلفية جيدة في Python وشيئاً من الخبرة في التعامل مع البيانات باستخدام pandas. فالكثير من المفاهيم متشابهة، لكن يجب الانتباه إلى أن PySpark مصمم أساساً للمعالجة الموزعة، لذلك قد تختلف بعض السلوكيات والأداء عن الأدوات المحلية المعتادة.

كما أن تعلّم PySpark لا يعني حفظ الأوامر فقط، بل فهم متى تستخدمه، ولماذا هو أنسب في بعض المشاريع من الحلول الأخرى.

الخلاصة التقنية

PySpark ليس مجرد مكتبة لمعالجة البيانات، بل هو بوابة عملية لدخول عالم البيانات الضخمة وتعلّم الآلة على نطاق واسع. قوته الحقيقية تظهر عند الجمع بين سهولة Python وكفاءة Apache Spark في التنفيذ الموزع. إذا كنت تعمل على مشاريع تحليل بيانات كبيرة، أو تخطط لبناء مسار مهني في هندسة البيانات وعلومها، فإن إتقان PySpark يمنحك أساساً تقنياً قوياً وقابلاً للتوسع في البيئات المحلية والسحابية معاً.

اترك تعليقاً

لن يتم نشر عنوان بريدك الإلكتروني. الحقول الإلزامية مشار إليها بـ *