Accueil » Notre premier notebook Spark dans Synapse

Notre premier notebook Spark dans Synapse

Notre premier notebook Spark dans Synapse

Synapse nous permet d’utiliser Apache Spark en tant que Runtime de processing afin de travailler nos différents datasets. Dans cet article, nous allons créer notre premier notebook Spark (et le pool Spark nécessaire) pour transformer nos premières données de la zone bronze pour commencer à alimenter notre zone silver.

Introduction et pool Spark

Un Notebook Spark a besoin d’un cluster Spark pour faire tourner le code écrit dans celui-ci. Dans Synapse, cela se matérialise par un « Spark pool » qui contiendra la définition de notre cluster Spark et qui porte la facturation du processing Spark. Nous n’avons pour l’instant jamais utilisé Spark et nous allons dans un premier temps devoir créer ce « Pool ».

Nous allons ici configurer un cluster Spark de petite taille, mais libre à vous de créer un cluster plus « gros ». Cependant, attention à la facture !

Tout se passe dans la partie management de Synapse ou nous allons faire un « + New » Apache Spark pool.

Création d’un pool Spark

La première section de configuration de notre pool nous propose la configuration basique de celui-ci. Nous allons lui fournir :

  • Un nom
  • Une taille de nœud (un cluster est composé de plusieurs unités appelées nœud) : commençons par un « petit / small », il sera toujours tant de modifier ce paramètre plus tard si besoin
  • La possibilité d’ « autoscale » : Si on autorise (ou non), notre cluster à augmenter ou réduire le nombre de nœuds de notre Cluster de façon automatique.
  • Nombre de nœuds : Si on a autorisé notre cluster à changer son nombre de nœuds automatiquement, nous allons ici lui poser des limites (hautes et basses)
  • Allocation dynamique d’exécuteur (l’exécuteur est un nœud particulier qui « exécute » du code) : Lors de l’exécution d’une application Spark, le cluster pourra choisir automatiquement le nombre d’exécuteurs nécessaire et les ajouter à la volée si l’on active ce paramètre.
Paramétrage de notre pool Spark

Il est possible de choisir une « taille maximale » des nœuds du cluster. On parle ici des vCores et de la RAM disponible pour chaque nœud du cluster. Ceci est très linéaire, et s’il impacte les performances de nos processus, il impacte aussi grandement le cout à l’heure d’utilisation ! attention donc à ne pas être trop gourmand !

Les différentes tailles de nœud Spark

Pour les paramètres additionnels, nous allons nous concentrer sur deux principaux éléments que sont la mise en pause automatique et la version de Spark.

Concernant la mise en pause automatique, cela permet à notre cluster Spark de s’éteindre lorsqu’il n’est pas utilisé pendant un certain laps de temps. Ce paramètre est particulièrement intéressant pour ne pas « payer » lorsqu’il n’y a pas de traitement en cours. Il faut cependant garder en tête qu’un cluster Spark ne démarre pas de façon instantané. Si nous autorisons celui-ci à s’éteindre, il faudra prendre en considération que pour les prochaines exécutions, il faudra qu’il se redémarre. Nous avons donc là un choix à faire entre réactivité et coût. Nous allons dans notre configuration autoriser notre cluster à s’éteindre après 15 minutes d’inactivité (Number of minutes idle).

Nous avons ensuite le choix de la version de Spark. Uniquement les versions supportées par Microsoft dans Synapse sont disponibles et il faut bien comprendre qu’il ne sera pas possible de changer la version de notre cluster plus tard. Nous pourrons simplement en créer un nouveau avec une autre version si besoin. Là encore, le choix peut s’avérer critique selon nos besoins mais dans mon cas, je vais utiliser la dernière version disponible à date.

Il y a d’autres paramètres additionnels que l’on peut configurer pour notre pool Spark. Nous ne nous y intéresserons pas dans cet article qui n’a pas la volonté de rentrer dans le détail de Spark.

Configuration avancée du pool Spark

Une fois créé, notre pool sera visible dans l’interface de management (et de monitoring) de nos pools Spark. Suivant les besoins de notre plateforme, nous pourrions avoir plusieurs clusters Spark.

Pool Spark disponible

Création d’un notebook

Le moyen le plus « basique » de créer un notebook Spark est d’utiliser la capacité de Synapse de générer automatiquement un notebook à partir de notre onglet de donnée.

Pour ce faire il y a un certain nombre de clic à faire, mais qui sont relativement évidents. Il nous suffit de naviguer jusqu’au fichier de données que nous cherchons à lire depuis nos données liées. De faire un clic droit sur celui-ci et de choisir de créer un nouveau Notebook qui chargera celui-ci dans un DataFrame.

Génération automatique d’un Notebook Spark
Notebook auto-généré par Synapse

La première chose qui nous saute aux yeux est le petit message d’information qui nous demande de sélectionner un pool Spark à attacher à notre Notebook !

En effet, comme évoqué, afin d’exécuter du code, nous avons besoin de savoir « où » l’exécuter. Et cela tombe bien, nous venons de créer un pool Spark !

Sélection du pool Spark à attacher à notre Notebook

Cette action nous démontre qu’il est possible de créer plusieurs pool Spark et que Synapse ne peut décider pour nous lequel utiliser. Les raisons peuvent être nombreuses allant de besoins de versions spécifiques, de tailles de nœud spécifique ou encore la séparation des ressources de certains processus voir la facturation.

Une foi sélectionné, nous pouvons déjà exécuter le code généré par Synapse pour avoir un premier aperçu de ce qui a été fait et si tout fonctionne bien ! Pour ce faire il suffit d’exécuter la cellule de code qui a été généré pour nous en cliquant sur le bouton « play » (ou encore « Run all »

Démarrage d’une session Spark

Après avoir demandé l’exécution de notre script, nous allons rentrer dans le monde merveilleux de Spark et de ses sessions. Nous remarquons que nous allons attendre un peu le temps que notre session (et cluster) démarrent.

Session Spark en train de démarrer

Après quelques minutes d’attentes, nous avons notre résultat et notre notebook nous indique les temps de démarrage de session et d’exécution de notre code.

Résultat de notre exécution

Ce qu’il est important de noter c’est que si notre 1ère exécution a été relativement longue due au démarrage de la session, si l’on exécute une deuxième fois notre cellule, celle-ci s’exécutera nettement plus vite, principalement par le fait que notre session restera en vie pendant un certain temps. Tant que notre notebook nous écrit « Ready » en haut à gauche, cela indique que notre session est prête et qu’il n’est donc pas nécessaire de recréer celle-ci ! Dans le cas présent, nous sommes passés de 3 minutes 27secondes à seulement 3 secondes. (il y a surement un peu de cache derrière, mais globalement, nous n’avons plus les 3 minutes et 6 secondes de démarrage de session !!

Transformation de donnée

Une fois la bonne exécution et notre session Spark démarrées, nous pouvons remarquer que notre notebook nous affiche une table donnée correspondant aux données lues dans notre fichier d’origine. Nous allons donc pouvoir commencer à écrire notre code Spark pour transformer nos données !

Pour cette partie de « transformation », nous allons rester très soft et nous allons simplement construire une vision globale d’une ligne de commande en faisant une jointure entre les lignes et les entêtes. Evidemment cette étape dépendra du besoin business derrière vos travaux et peut-être que des traitements plus complexes seront nécessaire.

Utilisation d’un Notebook

Si vous n’êtes pas familier des notebooks, il peut être un peu déroutant d’arriver sur ce type d’interface qui sera très différent d’un simple code source dans un fichier « .py ». En effet, un notebook peut être considéré comme une sorte de cahier de TP ou nous allons écrire du code, mais aussi des annotations ainsi que des résultats.

Nos différents blocs de code ou d’annotation sont appelle des « cellules » que nous pourrons définir comme du code ou du Markdown (pour nos annotations). Pour ajouter une cellule, il faut placer la sourie juste en dessous d’une cellule existante pour faire apparaitre deux petits + qui nous donne le choix d’ajouter soit une cellule de Code soit de Markdown.

Apparition des options de création de cellules dans un notebook

Nous n’allons pas nous attarder sur les cellules Markdown qui sont certes utiles, mais pas cruciales (d’un point de vue purement technique). Si ce format ne vous est pas familier, direction la Cheat Sheet : Markdown Cheat Sheet | Markdown Guide.

A savoir qu’il est toujours possible de modifier le type de cellule grâce au menu en haut à droite lorsque l’on est sur une cellule.

Menu d’actions sur une cellule

Imports et déclarations

Nous allons partir d’un notebook vierge (et donc simplement supprimer le code de la 1ère cellule générée, ou supprimer la cellule) pour démarrer « proprement ».

Comme dans tout code source, nous allons dans un premier temps écrire les différents imports nécessaires à notre script. Pour ce faire nous allons donc créer (ou réutiliser) une cellule de Code.

Pour notre transformation, nous aurons seulement besoin d’être capables de retourner une colonne spécifique de notre dataframe. Nous utiliserons la fonction « col » (pyspark.sql.functions.col — PySpark 3.5.3 documentation (apache.org)) que l’on trouvera grâce à l’import suivant :

from pyspark.sql.functions import col

Ensuite, nous allons définir nos différents noms de fichiers et chemins de fichiers sur notre lake.

# zone  urls
bronzeUrl = 'abfss://<name of your container>@<name of your storage account>.dfs.core.windows.net'
silverUrl = 'abfss://<name of your container>@<name of your storage account>.dfs.core.windows.net'

# source files
bronzeInvoiceLinesFile = '/staging/InvoiceLines/InvoiceLines.parquet'
bronzeInvoicesFile = '/staging/Invoices/Invoices.parquet'

# destination file
silverInvoicesFile = '/Invoices'

La transformation !

Pour commencer, nous allons avoir besoin de lire nos différents fichiers sources et de les stocker dans un dataframe. En nous servant du code que synapse nous avait généré plus tôt, nous allons remplacer le chemin de nos fichiers par nos variables.

dfInvoiceLines = spark.read.load(f'{bronzeUrl}{bronzeInvoiceLinesFile}', format='parquet')
dfInvoices = spark.read.load(f'{bronzeUrl}{bronzeInvoicesFile}', format='parquet')

Nous arrivons maintenant à notre premier sujet sensible. Nous voulons joindre deux dataframe qui si on les analyse un minimum comportent certaines colonnes qui ont le même nom. Si dans certains cas cela ne pose pas de problème, Spark ne nous laissera pas écrire un fichier parquet avec deux colonnes qui portent le même nom. Nous allons avoir besoin de les renommer à un moment donné.

D’autre part, même si dans le cas présent nous allons faire une jointure entre deux tables de la même base de donnée source. Et, que de ce fait le « lineage » est relativement simple, j’ai pour habitude de préfixer les noms de colonnes pour identifier plus facilement la provenance des informations et ainsi ne plus avoir de colonnes avec le même nom.

Nous n’allons évidemment pas faire ceci à la main en énumérant l’intégralité des colonnes pour plusieurs raisons :

  • Nous sommes fainéants
  • Les schémas pourraient changer et nous aurions besoin d’aller modifier notre code
  • Nous sommes fainéants !

La solution sera grâce à python d’itérer sur l’ensemble de nos colonnes (d’où le besoin de la fonction col importé plus haut), et d’utiliser la fonction alias pour changer le nom d’une colonne (en ajoutant « line_ » à toutes les colonnes de la table des lignes de factures) et d’assigner le dataframe ainsi généré à notre dataframe de départ.

dfInvoiceLines = dfInvoiceLines.select(*(col(x).alias('line_' + x) for x in dfInvoiceLines.columns))

C’est maintenant le moment tant attendu de la jointure. Nous allons simplement écrire un left outer join ou « jointure externe gauche » (un inner join pourrait fonctionner si l’on est certains de l’intégrité entre lignes et en-tête).

JoinCondition = [dfInvoiceLines.line_InvoiceID==dfInvoices.InvoiceID]
dfSilverInvoices = dfInvoiceLines.join(dfInvoices,JoinCondition,"left_outer")

Nos données en zone Silver

Et tout ceci ne servirait à rien sans l’étape finale qui consiste à écrire le fichier résultant dans notre zone de datalake Silver.

dfSilverInvoices.write.mode("overwrite").parquet(silverUrl + silverInvoicesFile)

Le Notebook final

Le format d’un notebook et ses différentes cellules n’étant pas le moyen le plus simple à partager en « code pur », voici une capture d’écran de ce à quoi doit ressembler votre notebook si vous avez copier l’ensemble des bouts de codes précédents dans l’ordre. La séparation des cellules et l’ajout des annotations en Markdown n’ont aucune importance dans notre exemple. La seule chose qui à de l’importance est l’ordre des cellules car notre Notebook sera exécuté cellule après cellule de haut en bas.

Notebook de transformation complet

Conclusion

Ce premier Notebook est très simple et le code Spark écrit n’est pas forcément robuste aux erreurs ni techniquement optimisé. Il a cependant le mérite de fonctionner pour nous permettre d’avancer dans la création de notre datalakehouse.

A partir de cet exemple, il est possible de dupliquer le code en changeant les noms de fichiers (ainsi que les règles de calculs si besoin) pour gérer les autres tables de faits de WWI. Il est possible de traiter les Orders et OrderLines sur le même schéma exactement.


Une réponse à “Notre premier notebook Spark dans Synapse”

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *