مشروع مصغر: تنظيف وتحليل ملف يحتوي على 10,000 سجل مبيعات وتلخيص الأرباح
مشروع مصغر: تنظيف وتحليل ملف يحتوي على 10,000 سجل مبيعات وتلخيص الأرباح
عند التعامل مع ملف مبيعات يضم 10,000 سجل، فإن القيمة الحقيقية لا تكمن في مجرد قراءة الملف، بل في تحويله إلى مصدر موثوق لاتخاذ القرار. هذه العملية تبدأ من فحص جودة البيانات، ثم تنظيفها، ثم توحيد أنواعها، وأخيراً استخراج مؤشرات ربحية دقيقة يمكن الاعتماد عليها في التقارير التشغيلية والإدارية.
هذا المشروع المصغر يجمع بين مفاهيم التحليل العملي وهندسة البيانات، لأن ملف المبيعات غالباً لا يكون مثالياً عند الاستلام. قد نجد قيماً مفقودة، سجلات مكررة، تواريخ مكتوبة بصيغ مختلفة، أو أعمدة رقمية مخزنة كنصوص. لذلك فإن بناء مسار عمل واضح يشبه ETL Pipeline مصغر يعد خطوة أساسية قبل أي تلخيص للأرباح.
إذا كنت قد اطلعت سابقاً على مدخل إلى علوم البيانات: كيف تحول الأرقام العشوائية إلى قرارات استراتيجية؟ فستلاحظ هنا كيف ننتقل من المفهوم النظري إلى تطبيق عملي مباشر على بيانات مبيعات واقعية. كما أن هذا المثال يبني بشكل طبيعي على مكتبة Pandas (1): قراءة واستدعاء البيانات من ملفات CSV و Excel برمجياً وتنظيف البيانات (Data Cleaning): اكتشاف ومعالجة القيم المفقودة (Missing Values).
فهم بنية ملف المبيعات قبل بدء المعالجة
نفترض أن الملف يحتوي على أعمدة مثل: رقم الطلب، التاريخ، اسم المنتج، الفئة، المنطقة، الكمية، سعر البيع، التكلفة، والخصم. في هذه المرحلة لا يجب البدء فوراً بالحسابات، بل أولاً التحقق من شكل الجدول، أسماء الأعمدة، وأنواع البيانات داخل كل حقل في DataFrame.
أي خطأ في النوع قد يفسد النتائج النهائية. على سبيل المثال، إذا كان عمود السعر مخزناً كسلسلة نصية بدلاً من قيمة عددية، فإن عمليات الجمع والمتوسط لن تعطي نتائج صحيحة. كذلك فإن التاريخ إذا لم يتحول إلى نوع زمني مناسب فلن نستطيع استخراج الربح الشهري أو اليومي بكفاءة.
الخطوات الأولية لفحص الجودة
- قراءة الملف والتأكد من الترميز الصحيح.
- فحص أول الصفوف وآخرها لاكتشاف تشوهات واضحة.
- استعراض أنواع الأعمدة باستخدام أدوات الفحص الهيكلي.
- حساب عدد القيم المفقودة في كل عمود.
- فحص السجلات المكررة بناءً على رقم الطلب أو مزيج من الحقول.
import pandas as pd
import numpy as np
df = pd.read_csv("sales_10000.csv")
print(df.head())
print(df.info())
print(df.isnull().sum())
print("Duplicate rows:", df.duplicated().sum())
تنظيف البيانات وبناء نسخة صالحة للتحليل
مرحلة التنظيف ليست خطوة تجميلية، بل هي صميم التحليل الموثوق. عند وجود قيم مفقودة في أعمدة حرجة مثل الكمية أو السعر أو التكلفة، يجب اتخاذ قرار واضح: هل سيتم حذف السجل، أم تعويض القيمة، أم إرساله إلى قائمة مراجعة مستقلة؟ هنا تظهر أهمية التفكير التحليلي لا مجرد تنفيذ أوامر برمجية.
وإذا كنت مهتماً بالتوسع في هذا الجانب، فالمقالان تنظيف البيانات (Data Cleaning): اكتشاف ومعالجة القيم المفقودة (Missing Values) ومعالجة البيانات المكررة والمشوهة (Duplicates & Outliers) باستخدام بايثون يقدمان خلفية ممتازة تدعم هذا المشروع.
قرارات التنظيف المقترحة
- إزالة السجلات المكررة بالكامل.
- تحويل الأعمدة الرقمية إلى نوع رقمي مع تجاهل القيم الفاسدة.
- تحويل عمود التاريخ إلى صيغة زمنية موحدة.
- استبعاد السجلات ذات الكمية أو السعر غير المنطقي.
- ملء الخصم المفقود بالقيمة صفر إذا كان منطق العمل يسمح بذلك.
df = df.drop_duplicates()
numeric_cols = ["quantity", "unit_price", "cost_price", "discount"]
for col in numeric_cols:
df[col] = pd.to_numeric(df[col], errors="coerce")
df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
df["discount"] = df["discount"].fillna(0)
df = df.dropna(subset=["order_date", "quantity", "unit_price", "cost_price"])
df = df[(df["quantity"] > 0) & (df["unit_price"] >= 0) & (df["cost_price"] >= 0)]
في معمارية البيانات الاحترافية، من الأفضل عدم الكتابة فوق البيانات الخام مباشرة. احتفظ دائماً بنسخة
raw، ثم أنشئ طبقةcleaned، ثم طبقةanalytics. هذا الأسلوب يجعل التتبع والمراجعة وإعادة التشغيل أكثر أماناً وكفاءة.
اشتقاق مؤشرات المبيعات والأرباح
بعد تنظيف البيانات، تبدأ مرحلة الاشتقاق. هنا لا نكتفي بإجمالي المبيعات، بل نحسب الإيراد الفعلي، تكلفة البضائع، قيمة الخصم، ثم الربح الصافي لكل سجل. هذه الأعمدة المشتقة تجعل الجدول مناسباً لعمليات Aggregation اللاحقة.
من المهم أيضاً إضافة أعمدة زمنية مثل الشهر والسنة، لأن الإدارة نادراً ما تكتفي برقم إجمالي واحد. المطلوب غالباً هو معرفة أين يتحسن الربح، ومتى ينخفض، وأي منطقة أو فئة تحقق أفضل هامش.
df["gross_sales"] = df["quantity"] * df["unit_price"]
df["total_cost"] = df["quantity"] * df["cost_price"]
df["discount_value"] = df["gross_sales"] * df["discount"]
df["net_sales"] = df["gross_sales"] - df["discount_value"]
df["profit"] = df["net_sales"] - df["total_cost"]
df["year"] = df["order_date"].dt.year
df["month"] = df["order_date"].dt.month
df["month_name"] = df["order_date"].dt.strftime("%B")
summary = df[["gross_sales", "discount_value", "net_sales", "total_cost", "profit"]].sum()
print(summary)
تلخيص الأرباح حسب المنتج والمنطقة والفترة الزمنية
القيمة التحليلية الحقيقية تظهر عند الانتقال من مستوى السجل الفردي إلى مستوى التلخيص. باستخدام groupby يمكن بناء تقارير متعددة الأبعاد بسرعة عالية. وهذا يرتبط مباشرة بما تم شرحه في التجميع والتلخيص (Groupby & Aggregation): إنشاء تقارير إحصائية برمجية.
أمثلة على التقارير المهمة
- إجمالي الربح حسب الفئة.
- أفضل 10 منتجات من حيث صافي الربح.
- الأداء الشهري للمبيعات بعد الخصومات.
- مقارنة المناطق الأعلى مبيعاً مع المناطق الأعلى ربحية.
profit_by_category = df.groupby("category", as_index=False)["profit"].sum() \
.sort_values("profit", ascending=False)
top_products = df.groupby("product_name", as_index=False)["profit"].sum() \
.sort_values("profit", ascending=False).head(10)
monthly_profit = df.groupby(["year", "month"], as_index=False)[["net_sales", "profit"]].sum() \
.sort_values(["year", "month"])
profit_by_region = df.groupby("region", as_index=False)[["net_sales", "profit"]].sum() \
.sort_values("profit", ascending=False)
print(profit_by_category)
print(top_products)
print(monthly_profit)
print(profit_by_region)
استخدام SQL للتحقق وإنتاج تقارير موازية
حتى في المشاريع الصغيرة، من المفيد التفكير بعقلية قواعد البيانات. يمكن تحميل النتائج إلى محرك تحليلي واستخدام SQL لمراجعة الأرقام أو تشغيل تقارير دورية. هذا مفيد عندما يعمل فريق التحليل بالتعاون مع فرق التقارير أو BI.
query = """
SELECT
region,
category,
ROUND(SUM(net_sales), 2) AS total_net_sales,
ROUND(SUM(profit), 2) AS total_profit
FROM sales_cleaned
GROUP BY region, category
ORDER BY total_profit DESC;
"""
print(query)
عند تضخم البيانات من 10,000 سجل إلى ملايين السجلات، تصبح الفهارس، التقسيم الزمني، وتقليل عمليات
full scanعوامل حاسمة في تحسين الأداء. كما أن تلخيص البيانات في جداول مرحلية يقلل كلفة الاستعلامات المتكررة بشكل كبير.
متى ننتقل من Pandas إلى Apache Spark؟
في ملف بحجم 10,000 سجل، تكفي Pandas عادة بشكل ممتاز. لكن من منظور هندسة البيانات الضخمة، من المفيد تصميم المنطق بطريقة قابلة للنقل لاحقاً إلى Apache Spark إذا أصبح الملف جزءاً من تدفق يومي ضخم أو متعدد المصادر.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
spark = SparkSession.builder.appName("SalesProfitSummary").getOrCreate()
sales_df = spark.read.option("header", True).csv("sales_10000.csv")
clean_df = (
sales_df.dropDuplicates()
.withColumn("quantity", col("quantity").cast("double"))
.withColumn("unit_price", col("unit_price").cast("double"))
.withColumn("cost_price", col("cost_price").cast("double"))
.withColumn("discount", col("discount").cast("double"))
)
result_df = (
clean_df
.withColumn("gross_sales", col("quantity") * col("unit_price"))
.withColumn("total_cost", col("quantity") * col("cost_price"))
.withColumn("discount_value", col("gross_sales") * col("discount"))
.withColumn("net_sales", col("gross_sales") - col("discount_value"))
.withColumn("profit", col("net_sales") - col("total_cost"))
)
profit_by_region = result_df.groupBy("region").agg(
spark_sum("net_sales").alias("total_net_sales"),
spark_sum("profit").alias("total_profit")
)
profit_by_region.show()
خاتمة تنفيذية
هذا المشروع المصغر يوضح أن تحليل ملف مبيعات من 10,000 سجل ليس مجرد تمرين على الجمع والطرح، بل نموذج عملي متكامل يبدأ بفحص الهيكل، ثم التنظيف، ثم الاشتقاق، ثم التلخيص، ثم التفكير في القابلية للتوسع. بهذه الطريقة تتحول البيانات الخام إلى تقرير أرباح موثوق وقابل للبناء عليه.
وعندما يتم تنفيذ هذه الخطوات بمنهجية صحيحة، يصبح من السهل اكتشاف المنتجات الأعلى ربحية، المناطق الأضعف أداءً، وتأثير الخصومات على الهامش. وهذا هو جوهر العمل الحقيقي في Data Science وBig Data Engineering: بناء نتائج دقيقة، قابلة للتفسير، ومفيدة لاتخاذ القرار التجاري.