Comment utiliser les Window Functions sur Spark

Window functions spark

Comment utiliser les Window Functions sur Spark

✍ Omar HAYAK / Temps de lecture 12 minutes

Si votre travail consiste à analyser des données, vous avez certainement rencontré des questions dont la réponse est intuitive mais difficile à exprimer en pur SQL/Spark. Si vous avez déjà essayé de calculer une moyenne glissante ou le rang d’une ligne, sans doute avez-vous pensé à ça : si seulement je pouvais itérer sur toutes les lignes de ma requête. Cette intuition ouvre les portes sur un labyrinthe de workarounds : écrire des jointures complexes, utiliser Excel ou faire le calcul hors base de données. Mais toutes les alternatives ne sont pas acceptables.

En outre, il existe les Window functions ou fonctions de fenêtrage qui améliorent considérablement l’expressivité de SQL et Spark. Cet article vous fera découvrir ces fonctions à travers des exemples pratiques.

Définition

Avant Spark 1.4, on pouvait distinguer deux types de fonctions :

  • Les fonctions built-in ou UDFs telles que substr ou round, qui prennent des valeurs d’une seule ligne en entrée et génèrent une valeur de retour unique pour chaque ligne en entrée.
  • Les fonctions d’agrégation, telles que sum ou max, fonctionnent sur un groupe de lignes et calculent une valeur de retour unique pour chaque groupe.

Bien que celles-ci soient très utiles dans la pratique, il existe des opérations qui ne peuvent pas être exprimées en utilisant uniquement ces types de fonctions. Plus précisément, il n’y avait aucun moyen pour opérer sur un groupe de lignes tout en renvoyant une seule valeur pour chaque ligne en entrée.
Cette limitation complexifie l’exécution de diverses tâches de traitement des données, tel que le calcul d’une moyenne glissante, le rang d’une ligne ou l’accès aux valeurs d’une ligne avant ou après la ligne actuelle. Heureusement pour les utilisateurs de Spark SQL, les window functions introduites par Spark 1.4 comblent cette lacune.

Une window function (fonction de fenêtrage) calcule une valeur de retour pour chaque ligne d’une table à partir d’un groupe de lignes appelé Frame. Chaque ligne d’entrée peut être associée à un Frame unique. Cette caractéristique fondamentale rend les fonctions de fenêtrage plus puissantes. Cela permet aux utilisateurs d’exprimer diverses tâches de traitement de données difficiles (voir impossibles) à exprimer de manière concise.

Prenons cet exemple tiré du chapitre 3.5 Window Functions de la documentation PostgreSQL:

import sys
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t

spark = SparkSession.builder \
     .master("local") \
     .appName("WindowsAreGood") \
     .getOrCreate()

schema = t.StructType([
 t.StructField('depName', t.StringType(), False),
 t.StructField('empNo', t.IntegerType(), False),
 t.StructField('salary', t.IntegerType(), False),
])

data = [
 ("sales", 1, 5000),
 ("personnel", 2, 3900),
 ("sales", 3, 4800),
 ("sales", 4, 4800),
 ("personnel", 5, 3500),
 ("develop", 7, 4200),
 ("develop", 8, 6000),
 ("develop", 9, 4500),
 ("develop", 10, 5200),
 ("develop", 11, 5200)
]
sdf = spark.createDataFrame(data, schema=schema)

Ce code fourni le DataFrame suivant:

+---------+-----+------+
|  depName|empNo|salary|
+---------+-----+------+
|    sales|    1|  5000|
|personnel|    2|  3900|
|    sales|    3|  4800|
|    sales|    4|  4800|
|personnel|    5|  3500|
|  develop|    7|  4200|
|  develop|    8|  6000|
|  develop|    9|  4500|
|  develop|   10|  5200|
|  develop|   11|  5200|
+---------+-----+------+

On souhaite calculer le salaire moyen par département et sans utiliser les fonctions de fenêtrage :

sdf.groupBy('depName').agg(f.avg('salary').alias('avg')).show()
+---------+-----------------+
|  depName|              avg|
+---------+-----------------+
|  develop|           5020.0|
|    sales|4866.666666666667|
|personnel|           3700.0|
+---------+-----------------+

Pour obtenir le même résultat avec les window functions, on utilisera le module spark Window. Ce dernier contient les outils nécessaires pour manipuler les window functions, notamment l’objet WindowSpec qu’on va utiliser pour définir le Frame. Dans cet exemple, un frame est l’ensemble des lignes du même département. Plus de détails dans le chapitre suivant.

byDepName = Window.partitionBy('depName')
sdf.withColumn("avg", f.avg('salary').over(byDepName)).show()
+---------+-----+------+-----------------+
|  depName|empNo|salary|              avg|
+---------+-----+------+-----------------+
|  develop|    7|  4200|           5020.0|
|  develop|    8|  6000|           5020.0|
|  develop|    9|  4500|           5020.0|
|  develop|   10|  5200|           5020.0|
|  develop|   11|  5200|           5020.0|
|    sales|    1|  5000|4866.666666666667|
|    sales|    3|  4800|4866.666666666667|
|    sales|    4|  4800|4866.666666666667|
|personnel|    2|  3900|           3700.0|
|personnel|    5|  3500|           3700.0|
+---------+-----+------+-----------------+

Cet exemple montre exactement la différence entre le fonctionnement d’une fonction d’agrégation et une fonction de fenêtrage. Certes, le fond des deux résultats est identique, mais le format est différent. Une fonction de fenêtrage ne regroupe pas les lignes et conserve leurs identités distinctes.
Pour obtenir le même format à l’aide d’une agrégation, il faut ajouter une jointure avec le DataFrame initial pour chaque agrégation.

WindowSpec

WindowSpec est une spécification qui définit quelles lignes sont incluses dans le frame, c’est-à-dire l’ensemble des lignes associées à la ligne actuelle. WindowSpec prend les éléments suivants lors de sa création :

  • Partition : définit les enregistrements dans la même partition. Sans partition définie, tous les enregistrements appartiennent à une seule partition.
  • Ordre : définit la façon dont les enregistrements dans une partition sont ordonnés, ce qui définit à son tour la position d’un enregistrement dans une partition.
  • Cadre : définit les lignes à inclure dans la fenêtre de la ligne actuelle, en fonction de la position relative par rapport à la ligne actuelle. Par exemple : « Les trois lignes précédant la ligne actuelle vers la ligne actuelle » décrit un cadre comprenant la ligne d’entrée actuelle et trois lignes apparaissant avant.

En pratique, on utilise les fonctions suivantes pour définir les spécifications d’une fenêtre :

orderBy :

Crée un WindowSpec avec l’ordre défini.

partitionBy :

Crée un WindowSpec avec le partitionnement défini.

rowsBetween :

Crée un WindowSpec avec les limites du cadre définies, de start(inclus) à end(inclus). Les deux start et end sont des positions par rapport à la ligne actuelle, en fonction de sa position dans la partition.

windowSpec = Window.rowsBetween(-2, 1)
sdf.withColumn("first_empNo", f.first("empNo").over(windowSpec))\
    .withColumn("last_empNo", f.last("empNo").over(windowSpec))\
    .withColumn("frame_size", f.count("empNo").over(windowSpec))\
    .show()
+---------+-----+------+-----------+----------+----------+
|  depName|empNo|salary|first_empNo|last_empNo|frame_size|
+---------+-----+------+-----------+----------+----------+
|    sales|    1|  5000|          1|         2|         2|
|personnel|    2|  3900|          1|         3|         3|
|    sales|    3|  4800|          1|         4|         4|
|    sales|    4|  4800|          2|         5|         4|<= currentRow - 2
|personnel|    5|  3500|          3|         7|         4|<= currentRow - 1
|  develop|    7|  4200|          4|         8|         4|<= currentRow
|  develop|    8|  6000|          5|         9|         4|<= currentRow + 1
|  develop|    9|  4500|          7|        10|         4|
|  develop|   10|  5200|          8|        11|         4|
|  develop|   11|  5200|          9|        11|         3|
+---------+-----+------+-----------+----------+----------+

Dans l’exemple ci-dessus, la fenêtre définit un frame de 4 lignes:

  • la ligne courante
  • les deux lignes précédentes
  • la ligne suivante

À l’exception des lignes aux extrémités, le frame est plus petit.

+---------+-----+------+-----------+----------+----------+
|  depName|empNo|salary|first_empNo|last_empNo|frame_size|
+---------+-----+------+-----------+----------+----------+
|    sales|    1|  5000|          1|         2|         2|<= currentRow
|personnel|    2|  3900|          1|         3|         3|<= currentRow + 1
|    sales|    3|  4800|          1|         4|         4|
|    sales|    4|  4800|          2|         5|         4|
|personnel|    5|  3500|          3|         7|         4|
|  develop|    7|  4200|          4|         8|         4|
|  develop|    8|  6000|          5|         9|         4|
|  develop|    9|  4500|          7|        10|         4|
|  develop|   10|  5200|          8|        11|         4|
|  develop|   11|  5200|          9|        11|         3|
+---------+-----+------+-----------+----------+----------+

rangeBetween :

Crée un WindowSpec avec les limites du frame définies, de start(inclus) à end(inclus). Les deux start et end sont relatifs à la ligne actuelle, en fonction de la valeur de la colonne dans ORDER BY. Par conséquent, rangeBetween ne peut être utilisée que dans la définition d’une WindowSpec ordonnée.

windowSpec = Window.orderBy("salary").rangeBetween(-1000, 500)
sdf.withColumn("range", f.concat(f.lit("["), f.col("salary")-1000,f.lit(","),f.col("salary")+500, f.lit("]"))) \
    .withColumn("frame_first", f.first("salary").over(windowSpec))\
    .withColumn("frame_last", f.last("salary").over(windowSpec))\
    .withColumn("frame_count", f.count("empNo").over(windowSpec))\
    .show()
+---------+-----+------+-----------+-----------+----------+-----------+
|  depName|empNo|salary|      range|frame_first|frame_last|frame_count|
+---------+-----+------+-----------+-----------+----------+-----------+
|personnel|    5|  3500|[2500,4000]|       3500|      3900|          2|
|personnel|    2|  3900|[2900,4400]|       3500|      4200|          3|<= inRange
|  develop|    7|  4200|[3200,4700]|       3500|      4500|          4|<= inRange
|  develop|    9|  4500|[3500,5000]|       3500|      5000|          7|<= inRange
|    sales|    3|  4800|[3800,5300]|       3900|      5200|          8|<= inRange
|    sales|    4|  4800|[3800,5300]|       3900|      5200|          8|<= currentRow
|    sales|    1|  5000|[4000,5500]|       4200|      5200|          7|<= inRange
|  develop|   10|  5200|[4200,5700]|       4200|      5200|          7|<= inRange
|  develop|   11|  5200|[4200,5700]|       4200|      5200|          7|<= inRange
|  develop|    8|  6000|[5000,6500]|       5000|      6000|          4|
+---------+-----+------+-----------+-----------+----------+-----------+

Dans l’exemple ci-dessus, la fenêtre définit un frame dont les éléments sont ordonnés par revenu. La taille du frame dépend de la valeur du salaire de la ligne courante.
Si la valeur du salaire est 4800, alors toutes les lignes dont la valeur du salaire sont comprises entre 4800 - 1000 et 4800 + 500 (Càd 3800 et 5300) seront incluses dans le frame.

Pour définir un frame dont les bornes sont “infinies”, on utilise les valeurs suivantes:

  • currentRow : valeur représentant la ligne actuelle utilisée pour définir les limites du frame
  • unboundedFollowing : valeur représentant la dernière ligne d’une partition (équivalente à “UNBOUNDED FOLLOWING” dans SQL)
  • unboundedPreceding : valeur représentant la première ligne d’une partition (équivalente à “UNBOUNDED PRECEDING” dans SQL)

Après avoir défini une WindowSpec, il ne reste que choisir/créer une window function.

Window Functions

Spark SQL prend en charge les window functions suivantes :

  • les fonctions de classement : rank, dense_rank, percent_rank, ntile, row_number
  • les fonctions analytiques : cume_dist, first, last, lag, lead
  • les fonctions d’agrégation : les utilisateurs peuvent utiliser n’importe quelle fonction d’agrégation existante en tant que fonction de fenêtrage. (sum, avg, min, max et count )
  • User-defined window functions avec pandas_udf:

Pour plus de détails sur ces fonctions, veuillez consulter la documentation de Spark.

Pour utiliser les window functions, l’utilisateur doit associer la fonction à une fenêtre :

  • Ajout d’une clause OVER après une fonction prise en charge dans SQL, par exemple avg (salary) OVER (...);
    ou
  • Appel de la méthode over() sur une fonction prise en charge dans l’API DataFrame, par exemple rank().over(windowSpec)

Exemple

On veut répondre aux questions suivantes :

  • Quel est le top 2 des salaires dans chaque département ?
  • Pour chaque salarié dans un même département, quelle est la différence de revenu par rapport au salarié le mieux payé ?

Pour répondre à la première question, nous devons classer les salariés dans un département en fonction de leurs revenus et sélectionner les deux mieux payés. Vous trouverez ci-dessous le code utilisé pour résoudre cette question en utilisant la fonction de fenêtrage dense_rank:

windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').desc())
sdf.withColumn("rank", f.dense_rank().over(windowSpec)).where(f.col('rank') <= 2).show()
+---------+-----+------+----+
|  depName|empNo|salary|rank|
+---------+-----+------+----+
|  develop|    8|  6000|   1|
|  develop|   10|  5200|   2|
|  develop|   11|  5200|   2|
|    sales|    1|  5000|   1|
|    sales|    3|  4800|   2|
|    sales|    4|  4800|   2|
|personnel|    2|  3900|   1|
|personnel|    5|  3500|   2|
+---------+-----+------+----+

En Spark SQL ça donne :

SELECT *
FROM (
 SELECT
 depName,
 empNo,
 salary,
 dense_rank() OVER (PARTITION BY depName ORDER BY salary DESC) as rank
 FROM depSalaries) tmp
WHERE
 rank <= 2

Essayez d’écrire une requête SQL équivalente sans fenêtrage pour se rendre compte à quel point c’est pénible.

Pour la deuxième question, nous devons trouver le revenu le plus élevé dans un même département et pour chaque ligne. Voici un programme pour répondre à cette question.

windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').desc())
sdf.withColumn("diff", f.max(f.col('salary')).over(windowSpec) - f.col('salary')).show()
+---------+-----+------+----+
|  depName|empNo|salary|diff|
+---------+-----+------+----+
|  develop|    8|  6000|   0|
|  develop|   10|  5200| 800|
|  develop|   11|  5200| 800|
|  develop|    9|  4500|1500|
|  develop|    7|  4200|1800|
|    sales|    1|  5000|   0|
|    sales|    3|  4800| 200|
|    sales|    4|  4800| 200|
|personnel|    2|  3900|   0|
|personnel|    5|  3500| 400|
+---------+-----+------+----+

Sans utiliser les fenêtres, il faut trouver le revenu le plus élevé dans chaque département. Puis joindre cet ensemble de données dérivées à la table d’origine pour calculer les différences de revenus.

max_salary = sdf.groupBy('depName').agg(f.max(f.col('salary')).alias('max'))
sdf.join(max_salary, 'depName').withColumn("diff", f.col('max') - f.col('salary')).show()
+---------+-----+------+----+----+
|  depName|empNo|salary| max|diff|
+---------+-----+------+----+----+
|  develop|    7|  4200|6000|1800|
|  develop|    8|  6000|6000|   0|
|  develop|    9|  4500|6000|1500|
|  develop|   10|  5200|6000| 800|
|  develop|   11|  5200|6000| 800|
|    sales|    1|  5000|5000|   0|
|    sales|    3|  4800|5000| 200|
|    sales|    4|  4800|5000| 200|
|personnel|    2|  3900|3900|   0|
|personnel|    5|  3500|3900| 400|
+---------+-----+------+----+----+

Un autre exemple à démontrer la puissance des fonctions de fenêtrage est le calcul d’une somme cumulée.

windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').asc())\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
sdf.withColumn("sum_cum", f.sum(f.col('salary')).over(windowSpec)).show()
+---------+-----+------+-----+
|  depName|empNo|salary|  cum|
+---------+-----+------+-----+
|  develop|    7|  4200| 4200|
|  develop|    9|  4500| 8700|
|  develop|   10|  5200|13900|
|  develop|   11|  5200|19100|
|  develop|    8|  6000|25100|
|    sales|    3|  4800| 4800|
|    sales|    4|  4800| 9600|
|    sales|    1|  5000|14600|
|personnel|    5|  3500| 3500|
|personnel|    2|  3900| 7400|
+---------+-----+------+-----+

Cette fois, on a utilisé la fonction rowsBetween pour créer un frame délimité par la ligne actuelle currentRow et toutes les lignes précédentes Window.unboundedPreceding ().
Spark ignore automatiquement les lignes précédentes et suivantes, si la ligne actuelle est la première ou la dernière ligne.

LEAD & LAG

lag et lead peuvent être utilisés lorsque nous voulons obtenir un résultat relatif entre les lignes: lag signifie obtenir la valeur de la ligne précédente; lead signifie obtenir la valeur de la ligne suivante.

overCategory = Window.partitionBy('depName').orderBy(f.col('salary').desc())
df = sdf.withColumn("lead", f.lead('salary', 1).over(overCategory))\
    .withColumn("lag", f.lag('salary', 1).over(overCategory))
df.show()
+---------+-----+------+----+----+
|  depName|empNo|salary|lead| lag|
+---------+-----+------+----+----+
|  develop|    8|  6000|5200|null|
|  develop|   10|  5200|5200|6000|
|  develop|   11|  5200|4500|5200|
|  develop|    9|  4500|4200|5200|
|  develop|    7|  4200|null|4500|
|    sales|    1|  5000|4800|null|
|    sales|    3|  4800|4800|5000|
|    sales|    4|  4800|null|4800|
|personnel|    2|  3900|3500|null|
|personnel|    5|  3500|null|3900|
+---------+-----+------+----+----+

Remarquez la première ligne de la colonne lag de chaque groupe a la valeur nulle, et la dernière ligne de la colonne lead de chaque groupe a la valeur nulle.
Après avoir calculé la différence, nous pouvons trouver des valeurs aberrantes qui présentent un énorme écart salarial.

df.fillna(0).withColumn("higher_than_next", f.col('salary') - f.col('lead'))\
.withColumn("lower_than_previous", f.col('lag') - f.col('salary'))\
.show()
+---------+-----+------+----+----+----------------+-------------------+
|  depName|empNo|salary|lead| lag|higher_than_next|lower_than_previous|
+---------+-----+------+----+----+----------------+-------------------+
|  develop|    8|  6000|5200|   0|             800|              -6000|
|  develop|   10|  5200|5200|6000|               0|                800|
|  develop|   11|  5200|4500|5200|             700|                  0|
|  develop|    9|  4500|4200|5200|             300|                700|
|  develop|    7|  4200|   0|4500|            4200|                300|
|    sales|    1|  5000|4800|   0|             200|              -5000|
|    sales|    3|  4800|4800|5000|               0|                200|
|    sales|    4|  4800|   0|4800|            4800|                  0|
|personnel|    2|  3900|3500|   0|             400|              -3900|
|personnel|    5|  3500|   0|3900|            3500|                400|
+---------+-----+------+----+----+----------------+-------------------+

Références

Omar HAYAK