Après avoir alimenté notre zone bronze : Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse ! ;
Après avoir alimenté notre zone sliver : Notre premier notebook Spark dans Synapse ;
Nous allons maintenant alimenter notre zone gold et créer notre pipeline d’intégration complet de l’ingestion de nos données On-Prem jusqu’à l’écriture de notre zone de donnée « Gold ».
Alimentation de la zone Gold
A la manière de ce que nous avons fait pour notre premier notebook (Notre premier notebook Spark dans Synapse), nous allons créer un nouveau notebook très simple qui va sélectionner et renommer proprement nos données de la zone silver pour les écrire dans la zone Gold.
Dans un nouveau Notebook, n’ayant pas d’import à faire, nous commençons par déclarer nos différents chemins :
silverUrl = 'abfss://<name of your container>@<name of your storage account>.dfs.core.windows.net'
goldUrl = 'abfss://<name of your container>@<name of your storage account>.dfs.core.windows.net'
silverInvoicesFile = '/Invoices'
goldSaleFile = '/Sale'
Nous allons cette fois-ci lire la zone Silver
dfSilverInvoices = spark.read.load(f'{silverUrl}{silverInvoicesFile}', format='parquet')
Nous arrivons au moment où nous allons utiliser la fonction selectExpr (pyspark.sql.DataFrame.selectExpr — PySpark 3.5.3 documentation), pour sélectionner et renommer nos colonnes de fichiers parquet comme nous le ferions dans un SELECT SQL.
dfGoldSale = dfSilverInvoices.selectExpr(
'InvoiceID',
'line_InvoiceLineID as InvoiceLineID',
'line_StockItemID as StockItemID',
'CustomerID',
'BillToCustomerID',
'OrderID',
'InvoiceDate',
'line_Description as Description',
'line_Quantity as Quantity',
'line_UnitPrice as UnitPrice',
'line_TaxRate as TaxRate',
'line_TaxAmount as TaxAmount',
'line_LineProfit as Profit'
)
selectExpr prend en argument une expression SQL. C’est ainsi que nous pouvons utiliser la syntaxe SQL pour renommer nos colonnes en utilisant le mot clef « as ».
Nous pourrions faire ici certaines transformations business mais pour notre objectif nous nous arrêterons ici et allons écrire notre dataset dans notre zone gold.
dfGoldSale.write.mode("overwrite").parquet(goldUrl+goldSaleFile)
Voici l’aperçu du notebook final :
Création du pipeline
Une fois tous nos traitements développés, nous avons maintenant pour mission de les exécuter à la chaine. Pour ce faire, nous allons créer un pipeline d’intégration qui exécutera nos différents traitements dans l’ordre et suivant les conditions définies.
Nous avons rendez-vous dans la section « intégration » de Synapse ou nous allons designer notre processus d’intégration.
Afin de rester le plus futur proof possible, nous n’allons pas directement exécuter nos traitements les uns à la suite des autres mais plutôt des sous-ensemble de traitements. Cela implique que pour notre pipeline d’intégration, nous n’allons pas exécuter des notebook les uns après les autres mais plutôt d’autres pipelines. Nous allons faire une sorte de pipelineception, qui nous permettra de modifier et tester uniquement un sous-ensemble lors de nos travaux futurs.
Création des « sous-pipelines »
Dans notre projet toujours très simpliste, nous allons donc créer un pipeline qui enchainera trois sous-pipelines qui alimenteront chacun une des zones de notre datalake.
Alimentation Bronze
Pour cette partie, nous allons encapsuler les travaux d’un précédent article qui traite justement de notre intégration de donnée en zone bronze : Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse !
Après avoir donné un nom et une description à notre pipeline :
Nous allons ajouter les traitements à exécuter. Dans le cas présent, nous avons déjà un pipeline qui alimente notre zone bronze. Nous allons donc ajouter celui-ci dans la zone d’édition de notre pipeline. Nous avons deux possibilités pour ce faire.
La 1ère consiste à cliquer / déplacer notre pipeline existant dans la zone de notre pipeline :
La 2ème consiste à cliquer / déplacer l’activité « Execute pipeline » depuis le panneau d’activité :
Puis d’aller paramétrer cette activité pour invoquer notre pipeline existant :
La seule différence dans cette deuxième approche est que notre activité à un nom générique, que l’on peu malgré tout modifier manuellement (comme pour toutes les activités) dans l’onglet général de notre activité :
Notre pipeline pour la zone bronze ne contient pour l’instant rien de plus, nous pouvons donc passer à la suite.
Alimentation Sliver
Pour nos chargements de zone silver, nous allons exécuter les notebook Spark tels que créés dans un article précédant : Notre premier notebook Spark dans Synapse
De la même manière que nous avons fait pour ajouter l’activité d’exécution de pipeline, nous allons ajouter nos différents notebook :
Sans autre action de notre part, les notebook s’exécuteront en parallèle. C’est-à-dire qu’ils démarreront en même temps… à condition d’avoir assez de ressource sur notre cluster Spark !
En effet, chaque notebook va correspondre à une application Spark, qui va s’exécuter sur notre pool et va avoir besoin d’un driver et d’un exécuteur au minimum. Par défaut un notebook est configuré pour utiliser 2 exécuteurs, ce qui demande au total 3 nœuds de cluster.
En ayant un cluster Spark avec seulement 3 nœuds au maximum, nous allons remarquer que nos notebook ne seront pas vraiment exécutés en parallèle.
Notebooks presque en parallèles
Afin de lancer l’exécution de notre pipeline et de le tester en condition réelle, nous allons devoir le publier dans le mode « Live » de synapse. Un trigger exécute toujours le code publié. Autrement, il faut « débuguer » le pipeline.
Puis il suffit de lancer un « trigger maintenant »
A l’exécution de notre pipeline, nous pouvons remarquer que nos notebook semblent s’exécuter en même temps. Les deux sont marqués « In Progress » avec les mêmes dates/heures de démarrage.
Cependant, la vérité est que nos activités n’ont fait que demander à notre cluster l’exécution de nos notebook (donc des applications Spark). Si l’on regarde cette fois dans le monitoring des applications Spark, nous allons voir que l’une de nos applications est « Queued ». Elle est en fait mise dans la file d’exécution et ne sera traitée que lorsque notre cluster aura la capacité de la traiter.
Une fois la première application finie, la deuxième prendra automatiquement le relais.
Véritable exécution en parallèle
Afin de voir une réelle exécution en parallèle de nos notebook, nous allons dans un premier temps augmenter le nombre de nœuds maximum de notre pool Spark à 6 afin de combler le besoin de deux notebook en parallèle.
Pour autoriser plus de nœuds à notre cluster, nous allons ouvrir le panneau de configuration dédié à partir de la partie management de notre cluster Spark.
Il nous suffit d’autoriser jusqu’à 6 nœuds à notre pool dans ce menu :
Après un petit temps de propagation, nous avons maintenant notre pool Spark autorisant jusqu’à 6 nœuds :
Maintenant nous allons relancer notre pipeline et dans la vue de monitoring de nos applications Spark nous allons voir que nos deux Notebook sont maintenant lancées en parallèle.
Exécution en séquentielle
Imaginons maintenant que nous souhaitions forcer une exécution séquentielle de nos pipelines. Cela peut se faire pour nous assurer d’un ordre d’exécution. Nous allons dans ce cas nous servir des « sorties » d’activités. Si l’on passe notre souris sur une activité, on remarque que trois nouvelles icônes apparaissent à côté de la coche verte qui est toujours présente.
Ces sorties vont nous permettre de lier les activités les unes avec les autres. Par exemple, si l’on fait un lien entre la sortie « On success » d’un notebook pour aller à un autre notebook, cela se traduira par : « Si ce notebook fini avec un état OK, alors, exécute le notebook suivant ».
Ce lien se créer en faisant un cliquer / déposer de la sortie du premier notebook, jusqu’au notebook suivant :
Une fois créé, ce lien séquentiel se matérialisera par une flèche (verte pour une sortie « On success ») entre les deux activités :
Nous venons donc de créer un lien fort nécessitant le succès du premier notebook pour exécuter le second. Autrement dit, le deuxième notebook ne se lancera que si le premier s’est complètement exécuté ET n’est pas en erreur.
Lors du monitoring de l’exécution, nous voyons cette fois que seule notre première activité s’est lancée. La deuxième n’est même pas en attente :
Ensuite, lors de la fin de notre premier notebook (sans erreurs), le deuxième se lance automatiquement :
Alimentation Gold
Pour notre zone gold, nous allons simplement ajouter le notebook créé au début de cet article (ici).
Pipeline général
Maintenant que nos sous-pipelines sont créés, nous pouvons créer notre pipeline maitre, qui appellera tous les autres.
En suivant exactement le même schéma que précédemment, nous allons ajouter toutes nos activités :
Dans le monitoring de nos pipelines, nous pouvons maintenant suivre les exécutions de notre pipeline et ses sous-pipelines.
Conclusion
Nous avons créé un pipeline simple d’alimentation complet de notre datalake en utilisant des sous-pipelines. Nous pouvons maintenant compléter l’alimentation de nos différentes zones et ajouter petit à petit chaque notebook dans le pipeline dédié.
Ici, nous n’avons utilisé uniquement des sorties « On success » et des activités d’exécution de pipeline et de notebook, mais il est évident que pour un datalake de production nous devrons gérer les cas d’erreurs avec les sorties « On Fail » par exemple. Dans tous les cas nous pouvons continuer notre construction pour avoir une alimentation complète grâce à l’exécution d’un seul pipeline !