Pour une utilisation de Spark plus verte

Nous voici arrivés au dernier article de cette série sur le Green IT. Cette fois-ci, nous allons voir comment optimiser nos calculs via Spark.

Pour rappel, la principale source de pollution dans l’IT étant la consommation énergétique et le matériel, nous nous concentrerons sur l’optimisation de l’utilisation de Spark afin de limiter sa consommation en mémoire, qui peut être gourmande.

Pour ma part, je code en Python (donc PySpark). C’est donc sur PySpark que seront basés mes exemples. On sera également dans un environnement Delta Lake. L’important reste la logique ; tout le reste sera à adapter à vos usages.

Limiter l’utilisation des RDD

Il y a quelques années, cette problématique se posait encore, mais aujourd’hui, elle est généralement résolue, sauf pour certains développeurs aguerris (certains iraient même jusqu’à coder sur Vim par plaisir). L’utilisation des DataFrames est recommandée, car ils sont automatiquement optimisés, plus simples à manipuler et réduisent le risque d’erreur. À l’inverse, les RDD (Resilient Distributed Datasets), qui constituent l’ancien mode de fonctionnement de Spark, offrent une grande flexibilité, mais nécessitent une gestion entièrement manuelle. Cela empêche toute optimisation automatique et peut entraîner une surconsommation de ressources. Ainsi, les RDD ne devraient être utilisés qu’en dernier recours, les DataFrames étant généralement plus performants et efficients.

Filtrer dès le début du DataFrame et en une étape

Tout comme pour les requêtes SQL, il est primordial de filtrer dès le début dans Spark. Cela vous permettra de ne travailler qu’avec un sous-ensemble de données, ce qui est plus efficace et économe en ressources. Ne laissez pas tout le DataFrame défiler si vous n’en avez besoin que d’une portion. Mais à ce stade, si vous avez bien suivi les articles précédents, vous le savez déjà.

Aussi, plus vous effectuez de transformations et d’agrégations en une seule étape, moins vous solliciterez les ressources. Regrouper les étapes de transformation permet d’optimiser les calculs.

Si l’on filtre dès l’arrivée du DataFrame uniquement les données nécessaires et que l’on effectue les calculs dans la foulée, c’est gagné !

df = spark.createDataFrame(data, columns)
df_filtre = df.filter(col("Date") > "2025-01-01")
df_calcul = df_filtre.groupBy("Produit").agg(sum("Ventes").alias("VentesTotales"))
df_final = df_calcul.filter(col("VentesTotales") > 1000)

On fera plutôt :

df_final = (
df.filter(col("Date") > "2025-01-01")
.groupBy("Produit").agg(
sum(when(col("Ventes") > 1000, col("Ventes")).otherwise(0)).alias("VentesTotales")
)
.filter(col("VentesTotales") > 0) # Filtrer les produits dont les ventes totales > 0
)

Attention toutefois à garder de la lisibilité et à rester dans le clean code.

Éviter les collect()

Le collect() est à éviter dans notre quête d’optimiser l’utilisation de la mémoire.

Le collect() ramène toutes les données du cluster vers le driver, c’est-à-dire sur le nœud principal qui exécute le programme. On fait donc une croix sur le parallélisme et le calcul distribué, qui sont l’un des points forts de Spark, avec un risque de out of memory dans le cas où le DataFrame est volumineux, et la perte de nombreux autres avantages offerts par l’utilisation de Spark. De plus, comme toutes les données des clusters vont vers le driver, cela crée une consommation excessive de bande passante.

Il existe plusieurs alternatives telles que show(), take(n), foreach() ou même écrire directement dans un fichier à part qui conservera le parallélisme. Cela dépendra donc de votre besoin. Dans tous les cas, le collect() est à utiliser avec modération.

Spark UI ou df.explain

Bien sûr, comme on parle d’optimisation des développements, il faut un moyen pour analyser le déroulé de nos requêtes/devs.

Pour cela, je vais vous proposer deux solutions :

  • Spark UI (ou interface utilisateur de Spark) est une interface web interactive qui permet de surveiller l’exécution des applications Spark en temps réel.
    Cela permet de voir les points de blocage, les endroits du calcul qui prennent le plus de temps et de trouver des pistes d’optimisation.
  • df.explain, qui affiche le plan d’exécution de la requête sous-jacente effectuée sur le DataFrame.
    Pour le calcul d’un DataFrame, on pourra voir les différentes étapes de calcul via le logical plan (qui montre globalement comment cela prévoit d’être traité) et le physical plan (qui montre comment cela est traité de manière plus précise et concrète).

En bref :

  • Pour optimiser votre notebook/script, utilisez Spark UI qui vous montrera un déroulé plus global.
  • Pour un tuning plus fin au niveau du DataFrame en lui-même, utilisez df.explain.

Bien gérer la répartition de la mémoire

La force de Spark, c’est sa capacité à gérer les calculs in memory.

Afin d’éviter que Spark recalcule à chaque fois un DataFrame, on peut directement le mettre en cache avec la fonction cache().

Par exemple :

df_filtered = df.filter(df["year"] > 2020).cache()
df_filtered.count()

Va mettre df_filtered en cache dans la mémoire vive pour le récupérer directement.

Une autre fonction proche est persist(), qui va permettre de choisir le niveau de persistance avec le niveau MEMORY_ONLY, MEMORY_AND_DISK et DISK_ONLY.

Ces techniques vous permettront de diminuer la consommation mémoire et de gagner du temps dans vos jobs. Cependant, attention :

  • Le cache/persist ne prend effet qu’après une action (ex : count(), show()).
  • Une fois la mise en cache non nécessaire, libérez la ressource du cache avec unpersist().

Avoir la méthode adaptée pour écrire dans les tables

Le but de nos flux est souvent l’écriture d’une table ou la génération d’un fichier. Comme vu précédemment, notamment dans l’article sur le SQL, cette partie ne doit pas être négligée, considérant son impact à long terme (les données vont y rester un moment) et son impact sur tous les flux suivants qui s’appuieront sur cette donnée.

Il faut donc écrire vos tables en gardant en tête ces possibilités :

  • Partitionner (partitionBy) pour organiser les données par colonnes spécifiques et réduire les coûts de lecture.
df.write.partitionBy("year", "month").format("parquet").mode("overwrite").save("/path/to/output")
  • Utiliser un format de fichiers optimisé comme Parquet, ORC ou Delta Lake, et les compresser.
    Comme vu dans les articles précédents, choisir le bon format est primordial. Dans le cas de grosses volumétries de données, pensez à la compression, qui peut faire gagner de l’espace disque et du temps de chargement. À voir au cas par cas.
  • Repartitionner (repartition ou coalesce) pour ajuster le nombre de partitions avant l’écriture.
    • repartition() : pour créer un nombre plus grand de partitions de manière uniforme.
    • coalesce() : pour réduire le nombre de partitions, utile lorsque vous souhaitez les combiner avant l’écriture.
  • Choisir le mode d’écriture adapté :
    • overwrite : écrase les fichiers existants.
    • append : ajoute les données aux fichiers existants.
    • ignore : ne fait rien si les fichiers existent.
    • errorIfExists : lève une erreur si les fichiers existent (par défaut).
      Si vous ne voulez pas tout recréer, vous pouvez aussi utiliser la méthode du merge :
delta_table.alias("oldData") \
.merge(
df_new_data.alias("newData"),
"oldData.id = newData.id"
) \
.whenMatchedUpdate(set={
"name": "newData.name",
"age": "newData.age"
}) \
.whenNotMatchedInsert(values={
"id": "newData.id",
"name": "newData.name",
"age": "newData.age"
}) \
.execute()

Dans ce cas, on met à jour la table via l’ID, ce qui évite de tout supprimer et recréer.

L’optimisation des fichiers avec VACUUM et OPTIMIZE

Après avoir travaillé sur l’alimentation des tables, voyons comment optimiser leur stockage et lecture dans un Delta Lake.

Pour cela, on dispose de deux commandes :

  • OPTIMIZE : améliore la performance de lecture en fusionnant plusieurs petits fichiers en fichiers plus grands, ce qui réduit la surcharge liée à la gestion des métadonnées et aux lectures multiples. À utiliser sur les tables ayant beaucoup de petites écritures.
delta_table.optimize().executeCompaction()
  • VACUUM : supprime les fichiers obsolètes laissés après des opérations de suppression ou de mise à jour. Un fichier Delta contenant l’historique de ses transactions dans ses métadonnées, VACUUM permet de libérer de l’espace disque (en conservant les 7 derniers jours par défaut).
delta_table.vacuum()

Conclusion

C’est ainsi que nous finissons notre tour d’horizon du Green IT dans la data. Nous avons pu voir les bonnes pratiques à adopter côté DevOps, architecture, workflows, SQL et enfin Spark.

Comme dit dans l’introduction, ces articles n’ont pas pour vocation de donner une liste exhaustive de bonnes pratiques. Ils ont été faits pour vous donner des clés et des pistes de réflexion, qui doivent être adaptées à vos cas spécifiques.

S’il y a une chose à retenir :
Si vous voulez aller vers une data orientée Green IT, il faut bannir une approche “qui peut le plus, peut le moins”. Veillez à toujours adapter la puissance à vos besoins réels et à évaluer l’impact de chaque action pour une consommation plus responsable.

Ces autres articles pourraient aussi vous intéresser…

Vous souhaitez être averti·e des prochaines publications ?

Inscrivez-vous à notre newsletter et soyez les premiers informés de nos actualités, conseils d’experts, projets innovants et événements exclusifs.

Retour en haut