Accueil » Zone Gold et Pipeline d’intégration synapse

Zone Gold et Pipeline d’intégration synapse

Zone Gold et Pipeline d’intégration synapse

Après avoir alimenté notre zone bronze : Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse ! ;

Après avoir alimenté notre zone sliver : Notre premier notebook Spark dans Synapse ;

Nous allons maintenant alimenter notre zone gold et créer notre pipeline d’intégration complet de l’ingestion de nos données On-Prem jusqu’à l’écriture de notre zone de donnée « Gold ».

Alimentation de la zone Gold

A la manière de ce que nous avons fait pour notre premier notebook (Notre premier notebook Spark dans Synapse), nous allons créer un nouveau notebook très simple qui va sélectionner et renommer proprement nos données de la zone silver pour les écrire dans la zone Gold.

Dans un nouveau Notebook, n’ayant pas d’import à faire, nous commençons par déclarer nos différents chemins :

silverUrl = 'abfss://<name of your container>@<name of your storage account>.dfs.core.windows.net'
goldUrl = 'abfss://<name of your container>@<name of your storage account>.dfs.core.windows.net'

silverInvoicesFile = '/Invoices'
goldSaleFile = '/Sale'

Nous allons cette fois-ci lire la zone Silver

dfSilverInvoices = spark.read.load(f'{silverUrl}{silverInvoicesFile}', format='parquet')

Nous arrivons au moment où nous allons utiliser la fonction selectExpr (pyspark.sql.DataFrame.selectExpr — PySpark 3.5.3 documentation), pour sélectionner et renommer nos colonnes de fichiers parquet comme nous le ferions dans un SELECT SQL.

dfGoldSale = dfSilverInvoices.selectExpr(
    'InvoiceID',
    'line_InvoiceLineID as InvoiceLineID',
    'line_StockItemID as StockItemID',
    'CustomerID',
    'BillToCustomerID',
    'OrderID',
    'InvoiceDate',
    'line_Description as Description',
    'line_Quantity as Quantity',
    'line_UnitPrice as UnitPrice',
    'line_TaxRate as TaxRate',
    'line_TaxAmount as TaxAmount',
    'line_LineProfit as Profit'
)

selectExpr prend en argument une expression SQL. C’est ainsi que nous pouvons utiliser la syntaxe SQL pour renommer nos colonnes en utilisant le mot clef « as ».

Nous pourrions faire ici certaines transformations business mais pour notre objectif nous nous arrêterons ici et allons écrire notre dataset dans notre zone gold.

dfGoldSale.write.mode("overwrite").parquet(goldUrl+goldSaleFile)

Voici l’aperçu du notebook final :

Notebook de transformation complet

Création du pipeline

Une fois tous nos traitements développés, nous avons maintenant pour mission de les exécuter à la chaine. Pour ce faire, nous allons créer un pipeline d’intégration qui exécutera nos différents traitements dans l’ordre et suivant les conditions définies.

Nous avons rendez-vous dans la section « intégration » de Synapse ou nous allons designer notre processus d’intégration.

Afin de rester le plus futur proof possible, nous n’allons pas directement exécuter nos traitements les uns à la suite des autres mais plutôt des sous-ensemble de traitements. Cela implique que pour notre pipeline d’intégration, nous n’allons pas exécuter des notebook les uns après les autres mais plutôt d’autres pipelines. Nous allons faire une sorte de pipelineception, qui nous permettra de modifier et tester uniquement un sous-ensemble lors de nos travaux futurs.

Création des « sous-pipelines »

Dans notre projet toujours très simpliste, nous allons donc créer un pipeline qui enchainera trois sous-pipelines qui alimenteront chacun une des zones de notre datalake.

Pipeline complet

Alimentation Bronze

Pour cette partie, nous allons encapsuler les travaux d’un précédent article qui traite justement de notre intégration de donnée en zone bronze : Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse !

Création d’un nouveau pipeline

Après avoir donné un nom et une description à notre pipeline :

Définition du nom et de la description d’un pipeline

Nous allons ajouter les traitements à exécuter. Dans le cas présent, nous avons déjà un pipeline qui alimente notre zone bronze. Nous allons donc ajouter celui-ci dans la zone d’édition de notre pipeline. Nous avons deux possibilités pour ce faire.

La 1ère consiste à cliquer / déplacer notre pipeline existant dans la zone de notre pipeline :

Cliquer / déposer d’un pipeline
Pipeline ajouté à notre pipeline

La 2ème consiste à cliquer / déplacer l’activité « Execute pipeline » depuis le panneau d’activité :

Cliquer / déplacer d’une activité « Execute pipeline »

Puis d’aller paramétrer cette activité pour invoquer notre pipeline existant :

Définition du pipeline à invoquer d’une activité « Execute pipeline »

La seule différence dans cette deuxième approche est que notre activité à un nom générique, que l’on peu malgré tout modifier manuellement (comme pour toutes les activités) dans l’onglet général de notre activité :

Nommage d’une activité de pipeline

Notre pipeline pour la zone bronze ne contient pour l’instant rien de plus, nous pouvons donc passer à la suite.

Alimentation Sliver

Pour nos chargements de zone silver, nous allons exécuter les notebook Spark tels que créés dans un article précédant : Notre premier notebook Spark dans Synapse

De la même manière que nous avons fait pour ajouter l’activité d’exécution de pipeline, nous allons ajouter nos différents notebook :

Ajout des notebook à un pipeline

Sans autre action de notre part, les notebook s’exécuteront en parallèle. C’est-à-dire qu’ils démarreront en même temps… à condition d’avoir assez de ressource sur notre cluster Spark !

En effet, chaque notebook va correspondre à une application Spark, qui va s’exécuter sur notre pool et va avoir besoin d’un driver et d’un exécuteur au minimum. Par défaut un notebook est configuré pour utiliser 2 exécuteurs, ce qui demande au total 3 nœuds de cluster.

En ayant un cluster Spark avec seulement 3 nœuds au maximum, nous allons remarquer que nos notebook ne seront pas vraiment exécutés en parallèle.

Pool Spark avec un maximum 3 nœuds

Notebooks presque en parallèles

Afin de lancer l’exécution de notre pipeline et de le tester en condition réelle, nous allons devoir le publier dans le mode « Live » de synapse. Un trigger exécute toujours le code publié. Autrement, il faut « débuguer » le pipeline.

Puis il suffit de lancer un « trigger maintenant »

Démarrage d’un pipeline en condition normale

A l’exécution de notre pipeline, nous pouvons remarquer que nos notebook semblent s’exécuter en même temps. Les deux sont marqués « In Progress » avec les mêmes dates/heures de démarrage.

Activités de pipeline démarrés en même temps

Cependant, la vérité est que nos activités n’ont fait que demander à notre cluster l’exécution de nos notebook (donc des applications Spark). Si l’on regarde cette fois dans le monitoring des applications Spark, nous allons voir que l’une de nos applications est « Queued ». Elle est en fait mise dans la file d’exécution et ne sera traitée que lorsque notre cluster aura la capacité de la traiter.

Une fois la première application finie, la deuxième prendra automatiquement le relais.

La deuxième application démarre après la fin de la précédente.

Véritable exécution en parallèle

Afin de voir une réelle exécution en parallèle de nos notebook, nous allons dans un premier temps augmenter le nombre de nœuds maximum de notre pool Spark à 6 afin de combler le besoin de deux notebook en parallèle.

Pour autoriser plus de nœuds à notre cluster, nous allons ouvrir le panneau de configuration dédié à partir de la partie management de notre cluster Spark.

Spark pool à maximum 6 nœuds

Il nous suffit d’autoriser jusqu’à 6 nœuds à notre pool dans ce menu :

Configuration du nombre de nœuds maximum de notre pool

Après un petit temps de propagation, nous avons maintenant notre pool Spark autorisant jusqu’à 6 nœuds :

Pool Spark avec un maximum 3 nœuds

Maintenant nous allons relancer notre pipeline et dans la vue de monitoring de nos applications Spark nous allons voir que nos deux Notebook sont maintenant lancées en parallèle.

Exécution en parallèle de deux notebook Spark

Exécution en séquentielle

Imaginons maintenant que nous souhaitions forcer une exécution séquentielle de nos pipelines. Cela peut se faire pour nous assurer d’un ordre d’exécution. Nous allons dans ce cas nous servir des « sorties » d’activités. Si l’on passe notre souris sur une activité, on remarque que trois nouvelles icônes apparaissent à côté de la coche verte qui est toujours présente.

Visualisation des sorties d’activités

Ces sorties vont nous permettre de lier les activités les unes avec les autres. Par exemple, si l’on fait un lien entre la sortie « On success » d’un notebook pour aller à un autre notebook, cela se traduira par : « Si ce notebook fini avec un état OK, alors, exécute le notebook suivant ».

Ce lien se créer en faisant un cliquer / déposer de la sortie du premier notebook, jusqu’au notebook suivant :

Création d’un lien séquentiel entre deux notebooks

Une fois créé, ce lien séquentiel se matérialisera par une flèche (verte pour une sortie « On success ») entre les deux activités :

Notebooks liés par un lien séquentiel de succès

Nous venons donc de créer un lien fort nécessitant le succès du premier notebook pour exécuter le second. Autrement dit, le deuxième notebook ne se lancera que si le premier s’est complètement exécuté ET n’est pas en erreur.

Lors du monitoring de l’exécution, nous voyons cette fois que seule notre première activité s’est lancée. La deuxième n’est même pas en attente :

Exécution du premier notebook

Ensuite, lors de la fin de notre premier notebook (sans erreurs), le deuxième se lance automatiquement :

Exécution du deuxième notebook

Alimentation Gold

Pour notre zone gold, nous allons simplement ajouter le notebook créé au début de cet article (ici).

Pipeline général

Maintenant que nos sous-pipelines sont créés, nous pouvons créer notre pipeline maitre, qui appellera tous les autres.

En suivant exactement le même schéma que précédemment, nous allons ajouter toutes nos activités :

Pipeline d’alimentation du Datalake complet

Dans le monitoring de nos pipelines, nous pouvons maintenant suivre les exécutions de notre pipeline et ses sous-pipelines.

Exécution de l’intégralité de notre pipeline et ses sous pipelines

Conclusion

Nous avons créé un pipeline simple d’alimentation complet de notre datalake en utilisant des sous-pipelines. Nous pouvons maintenant compléter l’alimentation de nos différentes zones et ajouter petit à petit chaque notebook dans le pipeline dédié.

Ici, nous n’avons utilisé uniquement des sorties « On success » et des activités d’exécution de pipeline et de notebook, mais il est évident que pour un datalake de production nous devrons gérer les cas d’erreurs avec les sorties « On Fail » par exemple. Dans tous les cas nous pouvons continuer notre construction pour avoir une alimentation complète grâce à l’exécution d’un seul pipeline !


Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *