Cet article était loin dans les cartons, mais l’article Alternative pipeline parametrization for Azure Synapse Analytics a motivé la priorisation de cet article. Merci @paul_eng pour le partage et la motivation d’écrire celui-ci !
Introduction
Que se soit lors de l’écriture du code d’un site web, d’un script ou de projet data, il est toujours important de rendre paramétrable l’exécution du code que l’on écrit. La majeur parti des langages, framework, IDE ou solution packagée utilisée en informatique proposent tous leur propre solution pour traiter ce sujet avec plus ou moins d’efficacité et d’élégance.
L’utilisation de paramètres permet de gérer certaines particularités dans le traitement général, mais reviens maintenant le problème de comment définir et initialiser ces paramètres sans avoir à modifier quoi que ce soit dans le code ou les configurations internes de chaque composant.
Je n’ai pas trouvé dans Synapse un moyen simple et intégré pour gérer les paramètres à l’extérieur de Synapse.
Dans notre équipe nous avons commencé à travailler avec des fichiers de configurations déposés dans le Lake, mais très vite leur manipulation s’est avérée délicate et leur multiplication n’a pas aidé à rendre le tout plus facile d’usage.
Il nous fallait donc trouver quelque chose de plus « User Friendly », que nous maitrisions, qui soit centralisé et extérieur à nos différents codes. Dans le monde de la « Data », nous parlons tous couramment le SQL (à minima SELECT/ INSERT / UPDATE) et donc la Base de données de paramètre nous est apparut comme une évidence. De plus grâce au cloud il est possible de monter une petite base SQL en quelques cliques pour quelques euros par mois !
Le sujet est maintenant amené, nous devons maintenant définir ce que nous voulons paramétrer et pourquoi !
Cas d’usage – Ingérer un sous-ensemble de données
Lors de l’alimentation d’un Datalake, la première étape est d’ingérer les données depuis des données sources. Ces actions répétées sur l’ensemble des sources de données sont répétitives et il existe donc de nombreuses façons d’automatiser ce processus. Cependant, dans de nombreux cas, cette automatisation rend homogène la façon d’ingérer les données :
- Ingestion de TOUTES les tables d’une base de données
- Ingestion de TOUS les fichiers d’un répertoire
- …
Imaginons maintenant que dans notre base de donnée source il y ait de nombreuses tables temporaires, de travail ou simplement non utilisé dans les projets actuels. Si l’on peut se dire qu’il est plus « simple » d’ingérer l’intégralité et de trier ensuite, il existe certains cas où ce n’est pas forcément pertinent.
Imaginons que nous travaillions pour WideWorldImporters et que j’ai pour mission de présenter un tableau de bord avec uniquement un suivi des articles facturés.
Je pourrais effectivement ingérer l’intégralité de ma base WWI mais honnêtement seulement 3 tables m’intéressent alors pourquoi ingérer l’intégralité de ma base qui comporte de nombreuses tables non utilisées pour notre usage ?
Le débat n’est pas pourquoi on voudrait faire cela, mais comment mettre en place un pipeline d’ingestion qui nous permette de paramétrer les données/tables que nous souhaitons à l’extérieur de nos pipelines Synapse.
Solution
Une des solutions (en tout cas celle retenue ici) est d’utiliser une table de paramétrage dans une base de données dédiée. Le pipeline aura donc pour processus d’aller dans un premier temps chercher et initialiser les paramètres dans la table dédiée et ensuite d’exécuter ces activités ou sous pipeline.
Les mains dedans !
Les ressources utilisées dans cette expérience sont:
- Un workspace Synapse (la base)
- Une base de donnée accessible depuis le workspace (l’objet de l’expérience)
- Un Azure data lake gen 2 (ou tout autre « répertoire de dépôt »)
- La base de données SQL WideWorldImporters accessible depuis le workspace synapse disponible sur le github des samples Microsoft : WideWorldImporters-Full.bak (github.com)
Vue d’ensemble
La solution consiste en seulement trois activités
- Lookup – pour la lecture du paramétrage
- ForEach – pour l’exécution « pour chaque ligne de paramétrage »
- CopyData – pour l’exécution de l’ingestion
Ce pipeline est volontairement « simple » pour présenter uniquement l’utilisation de la base de données de paramétrage.
La base de données (surtout la table !)
La base de données est très simple et comprend dans notre cas uniquement trois colonnes:
- StagingDirectoryName (le répertoire de destination de notre extraction dans le lake)
- FileName (le nom du fichier qui sera écrit dans le répertoire précédent)
- SourceQuery (la requête d’extraction de données)
Voici le DDL utilisé pour créer la table et les valeurs utilisées dans l’expérience :
USE [SynapseTraining]
GO
-- Assure we have a proper schema
IF NOT EXISTS ( SELECT 1 FROM sys.schemas WHERE name = N'param' )
EXEC('CREATE SCHEMA [param]');
GO
-- Droppring and creating the IngestionParam table
IF EXISTS ( select 1 from INFORMATION_SCHEMA.TABLES
where TABLE_SCHEMA = 'param' and TABLE_NAME = 'IngestionParam' )
DROP TABLE [param].[IngestionParam]
GO
CREATE TABLE [param].[IngestionParam](
[StagingDirectoryName] varchar(200),
[FileName] varchar(100),
[SourceQuery] varchar(500)
)
GO
-- Inserting sample values
INSERT INTO [param].[IngestionParam]([StagingDirectoryName], [FileName], [SourceQuery])
VALUES ('staging/Invoices/','Invoices.parquet','SELECT * FROM Sales.Invoices')
/*
Inserting more sample value to demonstrate the ease of usability
INSERT INTO [param].[IngestionParam]([StagingDirectoryName], [FileName], [SourceQuery])
VALUES ('staging/InvoiceLines/','InvoiceLines.parquet','SELECT * FROM Sales.InvoiceLines')
INSERT INTO [param].[IngestionParam]([StagingDirectoryName], [FileName], [SourceQuery])
VALUES ('staging/StockItems/','StockItems.parquet','SELECT * FROM Warehouse.StockItems')
*/
GO
Une fois exécutée, notre base de paramétrage est prête :
Le pipeline (et autres objets nécessaires)
Lookup
La première étape est donc de lire notre table de paramétrage et pour ce faire, on utilise une activité Lookup. Celle-ci utilise en dataset source la connexion à notre base de données de paramétrage et une requête très simple de lecture de notre table de paramétrage.
La prévisualisation des données extraites du Lookup doit nous remonter la ligne de donnée provenant de notre base de paramétrage.
ForEach
Maintenant, ce que nous souhaitons faire c’est « exécuter la suite » pour chaque ligne de paramétrage inséré dans notre base et donc renvoyé par le Lookup. C’est l’activité ForEach qui nous le permet en prenant en entrée chaque ligne renvoyée par le Lookup précédent. C’est dans l’onglet « Settings » que cela se passe en précisant les « Items ». Pour se faire, il faut ajouter du contenu dynamique et ajouter le « value array » de l’activité précédente.
Suivant le besoin il est possible d’utiliser les autres paramètres du ForEach pour faire un traitement séquentiel ou en batch, mais ce n’est pas nécessaire pour notre expérience.
Copy data
Maintenant que notre pipeline est prêt pour exécuter une activité pour chaque ligne de paramétrage, nous sommes prêts pour enfin exécuter notre copie ! Pour se faire, on ajoute une activité à l’intérieur du ForEach.
Pour notre usage, nous utilisons l’activité Copy data et nous avons deux choses à configurer :
- Source – pour configurer « Ce qui va être extrait »
- Sink – pour configurer ou et comment on stocke les données précédemment extraites.
A ce stade de notre développement, il faut se dire que grâce à notre Lookup et au ForEach, nous avons dans les paramètres de notre activité l’ensemble des valeurs (colones) d’une ligne de notre table de paramétrage dans la « variable » @item() ». Par exemple :
Au niveau de la source, une fois le dataset source connectée à notre base WWI configurée, nous avons uniquement à configurer le fait d’utiliser une requête que l’on récupère de notre variable @item() auquel on demande la valeur « SourceQuery » de notre table de paramétrage. C’est ce qui sera envoyé sur notre serveur pour extraction de données.
Du côté de la destination, nous devons maintenant préciser de manière dynamique à quel endroit déposer notre fichier. Pour ce faire, nous avons les deux valeurs de notre table « StagingDirectoryName » pour le répertoire et « FileName » pour le nom de fichier.
La subtilité à ce moment est d’avoir un dataset de destination paramétrable. Pour ce faire, dans un dataset de fichier, il suffit de configurer le « File path » avec des paramètres et ce dataset deviendra « générique » pour l’ensemble des fichiers que l’on veut écrire dans notre lake.
(Mes expériences utilisent principalement des fichiers parquets et si vous vous demandez pourquoi je vous invite à regarder cette petite vidéo : Synapse Espresso: CSV vs. Parquet? – YouTube)
Dans la destination du Copy data, il nous suffit maintenant d’attribuer les valeurs de notre « @item() » (provenant du ForEach) aux paramètres de notre dataset générique :
Première exécution
Notre pipeline est maintenant prêt. Une validation ou un publish est toujours de bon augure pour vérifier que nous n’avons rien oublié de critique.
A ce stade, nous avons donc une seule ligne dans notre table de paramétrage qui nous propose l’extraction de la table « Sales.Invoices » à aller déposer dans le répertoire « staging/Invoices/ » dans le fichier « Invoices.parquet ». Pour l’exemple, je démarrerais avec un datalake vide :
L’execution du pipeline nous permet d’observer l’execution de l’ensemble de nos activités :
Et nous pouvons vérifier que notre datalake contient maintenant nos données dans un fichier parquet :
Tout ça … pour ça ?
Il est vrai que pour l’instant, nous nous sommes un peu compliqué la vie et que pour ingérer une table dans notre datalake on aurait pu utiliser directement un Copy data.
Mais rappelons-nous du cas d’usage ! nous avons une base de données avec « beaucoup » de table, et nous voulons pouvoir en ingérer que quelques-unes sans avoir rien à toucher dans Synapse.
C’est maintenant que notre base de données de paramétrage va rentrer en action pour notre plus grand plaisir. Il me suffit d’exécuter une requête SQL (ou un script) pour changer le paramétrage de mon pipeline et finalement ingérer deux tables supplémentaires. Je vais donc exécuter les 2 requêtes précédemment en commentaire dans le script de démarrage pour obtenir le résultat suivant :
Sans aucune modification dans Synape, une nouvelle exécution du pipeline nous permet d’ingérer maintenant les trois tables présentes dans la table de paramétrage. On remarque que le Copy data est cette fois exécutée trois fois :
Nous avons maintenant dans notre datalake les trois fichiers répartis dans trois répertoires différents :
Et quoi d’autre ?
Cette construction nous a permis de paramétrer notre pipeline et d’ajouter facilement de nouvelles tables à ingérer tous en étant capable de garder notre lake organisé. Nous n’avons à l’instant que 3 paramètres dans notre table, mais en jouant un peu avec, on est capable de travailler sur d’autres cas d’usages.
Envie d’investiguer notre extraction sans « casser » nos fichiers existants ni notre arborescence ? On pourrait lancer une extraction dans un répertoire spécifique ? Pour ce faire, un simple update de notre table de paramétrage est nécessaire :
UPDATE [param].[IngestionParam]
SET [StagingDirectoryName] = ‘staging/Investigation/’
Oui, un UPDATE sans WHERE c’est moche et ne doit probablement pas être fait en production mais on est là pour l’expérience !
Nous sommes maintenant prêts à investiguer :
sans avoir cassé notre arborescence :
Conclusion
Dans cet article nous avons vu comment paramétrer et agir sur nos pipelines depuis l’extérieur de Synapse. Cette méthode n’est pas standard et de ce fait, il est nécessaire de s’interroger consciencieusement pour savoir si elle répond à votre besoin sans rajouter de complexité et de coûts superflus.
Pour aller plus loin
L’expérience présentée ici a été définie pour présenter « comment » paramétrer un pipeline depuis une base de données. Cependant il est évidemment possible d’aller encore plus loin dans la paramétrisation de notre pipeline. Quelques exemples :
- Pouvoir extraire des données depuis plusieurs bases différentes hébergées sur plusieurs serveurs différents
- Paramétrer une extraction en « Delta » (et loguer en base les différentes extractions et leurs paramètres)
- Définir quelle table doit être extraite à quelle fréquence
- Activer/Désactiver l’extraction d’une table par un Flag
- …
2 réponses à “Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse !”
[…] Cet article peut se suffire à lui-même en utilisant un fichier spécifique et adaptant en conséquence nos développements, mais il utilise la sortie générée dans l’article : Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse ! […]
[…] avoir alimenté notre zone bronze : Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse ! […]