Synchroniser ses données avec Service Broker
Par Arian Papillon le dimanche 20 septembre 2015, 14:44 - Lien permanent
Encore une autre suite de la conférence du SQL Saturday : "Synchroniser ses données, c'est plus pas facile que c'est compliqué". Nous parlons ici de l'utilisation du Service Broker pour effectuer une synchronisation de données.
Nous avons déjà vu comment nous pouvions utiliser le Change Tracking ou le Change Data Capture pour synchroniser des données.
Ici, l'approche est un peu différente : la synchronisation de données se fera au fil de l'eau grâce à l'émission de messages asynchrones entre l'instance source et l'instance destination, tout cela orchestré par le Service Broker.
Pour définir le Service Broker : c'est un middleware orienté messages (MOM), entièrement intégré dans SQL Server : un outil de messagerie de base de données permettant de gérer des flux de messages de manière asynchrone entre bases ou instances SQL Server. Basé sur des files d'attentes, la remise des message est garantie (si le destinataire est hors ligne, le message va être conservé jusqu'à ce qu'on puisse le transmettre).
Le Service Broker est déjà utilisé en interne dans SQL Server pour certaines de ses fonctionnalités : citons la messagerie de bases de données (database mail) ou les notifications d'évènements.
Le Service Broker va donc pouvoir s'adresser à des scénarios de synchronisation différents : bases de données à structures différentes, la fonctionnalité d'envoi de données et d'instructions par des messages au fil de l'eau offrant toutes les possibilités de traitement personnalisé...
Autre avantage du Service Broker, il est disponible dès l'édition Standard. Sa mise en oeuvre se fait intégralement en code TSQL.
Voyons quels sont les composantes et métadonnées du service Broker :
Le Service Broker va nous permettre d'envoyer des messages d'un service émetteur (notre base source) vers un service destinataire (notre base destination, qui peut être sur une autre instance ou un autre serveur.
Pour faire le faire fonctionner et permettre les conversations entre nos deux services, nous avons un peu de configuration ainsi que différents objets à créer dans les bases de données source et destination :
Activation du Service Broker (sur les deux bases, source et destination)
-- Instance et Base source ALTER DATABASE SBSource SET ENABLE_BROKER; -- Instance et Base destination ALTER DATABASE SBDest SET ENABLE_BROKER;
Message Types
Les types de messages (message types) vont nous permettre de définir quels types de messages nous voulons transmettre : EMPTY, NONE, WELL_FORMED_XML, VALID_XML WHITH SCHEMA COLLECTION. Nous devons créer les types de message sur la source et la destination, ici nous choisirons une validation du contenu (XML) avec la clause WELL_FORMED_XML.
-- Côté source et destination à l'identique CREATE MESSAGE TYPE [DataModification_MessageType] VALIDATION = WELL_FORMED_XML ;
Contrats
Les contrats déterminent quel partenaire (initiateur ou destinataire) peut envoyer quel type(s) de message(s).
-- Côté source et destination à l'identique CREATE CONTRACT [DataModification_Contract] ( [DataModification_MessageType] SENT BY INITIATOR )
Files d'attentes
Les files d'attentes sont le stockage des messages reçus (ce sont des tables créées dans la base de données). Pour rendre plus lisible notre configuration, nous allons les nommer différemment côté émetteur et destinataire.
-- Côté source CREATE QUEUE DataModification_Queue WITH STATUS = ON -- Côté destination CREATE QUEUE DataSynchro_Queue WITH STATUS = ON
Services
Les services sont les points de connexion logiques entre lequels les partenaires vont établir une conversation pour se transmettre des messages. Dans la définition du service, on précise quelle est la file d'attente et le contrat qui s'y rapportent. Nous allons créer un service pour chaque partenaire (les noms sont nécessairement différents...). Nous octroyons au rôle public la permission d'envoi par le service.
-- Côté source : ce service initiera la communication avec le service destinataire.
CREATE SERVICE [DataModification_Service] ON QUEUE DataModification_Queue (DataModification_Contract) GRANT SEND ON SERVICE::DataModification_Service TO PUBLIC -- Côté destination : ce service recevra les messages concernant les données à synchroniser CREATE SERVICE [DataSynchro_Service] ON QUEUE DataSynchro_Queue (DataModification_Contract) grant send on service::DataSynchro_Service to PUBLIC
Configuration de la couche réseau : EndPoints et routes
Pour faire converser nos deux services, nous devons créer des points de connexion TCP.
Pour que le service local puisse localiser l'adresse et le port du point de connexion du service distant, nous allons créer des routes.
-- Côté source : endpoint et route vers le service distant CREATE ENDPOINT BrokerEndpoint STATE = STARTED AS TCP ( LISTENER_PORT = 4037 ) FOR SERVICE_BROKER ( AUTHENTICATION = WINDOWS, ENCRYPTION = DISABLED ); CREATE ROUTE DataModification_Route -- route to destination WITH SERVICE_NAME = 'DataSynchro_Service', ADDRESS = 'TCP://192.168.56.2:4038'; -- adresse et port du endpoint distant
-- Côté destination : endpoint et route vers le service distant CREATE ENDPOINT BrokerEndpoint STATE = STARTED AS TCP ( LISTENER_PORT = 4038 ) FOR SERVICE_BROKER ( AUTHENTICATION = WINDOWS, ENCRYPTION = DISABLED ) ; CREATE ROUTE DataModification_Route -- route to source WITH SERVICE_NAME = 'DataModification_Service', ADDRESS = 'TCP://192.168.56.1:4037' ;
Gestion des messages de synchronisation
Pour assurer la synchronisation de données dans notre exemple, nous avons besoin de différents éléments dans le message que nous allons transmettre à la base destinataire :
- le schéma et le nom de la table modifiée,
- l'action effectuée (insert, delete, update),
- la clé primaire de la table,
- la structure de la table,
- les lignes de données modifiées.
L'objectif pour le destinataire sera de reconstruire la commande TSQL à jouer pour assurer la synchronisation.
Constituons un message de test, avec pour exemple les 3 premières lignes de la table dbo.Person modifiées par un UPDATE :
DECLARE @Msg xml; SET @Msg = (SELECT 'dbo' AS '@TableSchema' ,'person' AS '@TableName' ,'update' AS '@Action' ,'BusinessEntityID' AS '@KeyCol' , ( SELECT column_name ,data_type ,CASE is_nullable WHEN 'YES' THEN 'true' ELSE 'false' END AS is_nullable ,CHARACTER_MAXIMUM_LENGTH AS Charlen FROM information_schema.columns [column] WHERE table_name = 'person' FOR XML AUTO,ROOT ('TableStructure') ,TYPE) , ( SELECT TOP (3) * FROM Person Row FOR XML AUTO,BINARY BASE64,ROOT ('TableData') ,TYPE) FOR XML PATH ('DataModification')) ; SELECT @Msg;
Ceci va nous produire un joli message, entièrement en XML, qui contient tous ces éléments :
<DataModification TableSchema="dbo" TableName="person" Action="update" KeyCol="BusinessEntityID"> <TableStructure> <column column_name="BusinessEntityID" data_type="int" is_nullable="false" /> <column column_name="PersonType" data_type="nchar" is_nullable="false" Charlen="2" /> <column column_name="FirstName" data_type="nvarchar" is_nullable="false" Charlen="50" /> <column column_name="LastName" data_type="nvarchar" is_nullable="false" Charlen="50" /> </TableStructure> <TableData> <Row BusinessEntityID="4" PersonType="EM" FirstName="Rob" LastName="Walters" /> <Row BusinessEntityID="5" PersonType="EM" FirstName="Gail" LastName="Erickson" /> <Row BusinessEntityID="6" PersonType="EM" FirstName="Jossef" LastName="Goldberg" /> </TableData> </DataModification>
Côté destinataire, c'est un peu plus compliqué : nous devons décompiler le XML et produire la commande TSQL qui va faire la synchronisation. Croyez-moi sur parole (et faites donc le test), le code ci-dessous produit la commande nécessaire pour faire l'UPDATE des 3 lignes concernées dans la table destination !...
-- Extraction et interprétation du message XML (contenu dans la variable @msg) DECLARE @XML_NATURE nvarchar (30) , @TABLE_SCHEMA sysname, @TABLE_NAME sysname, @SQL_ACTION nvarchar (30) , @KEY_COLUMN sysname, @ColumnList nvarchar (max) = '', @SQLCmd nvarchar (max) ; SELECT @XML_NATURE = Txml1.XdataHead.value ('fn:local-name(.)','NVARCHAR(128)') ,@TABLE_SCHEMA = Txml1.XdataHead.value ('@TableSchema','NVARCHAR(128)') ,@TABLE_NAME = Txml1.XdataHead.value ('@TableName','NVARCHAR(128)') ,@SQL_ACTION = Txml1.XdataHead.value ('@Action','NVARCHAR(128)') ,@KEY_COLUMN = Txml1.XdataHead.value ('@KeyCol','NVARCHAR(128)') FROM @Msg.nodes ('./*') AS Txml1 (XdataHead) ; -- structure de la table IF OBJECT_ID ('tempdb..#XMLColumns') IS NOT NULL BEGIN DROP TABLE #XMLColumns; END; SELECT x.value ('@column_name','sysname') AS column_name ,x.value ('@data_type','sysname') AS data_type ,x.value ('@is_nullable','VARCHAR(20)') AS is_nullable ,x.value ('@Charlen','VARCHAR(20)') AS Charlen INTO #XMLColumns FROM @Msg.nodes ('/DataModification/TableStructure/column') TempXML (x) ; -- extraction des lignes DECLARE @SQLExtract nvarchar (max) = 'SELECT '; SELECT @SQLExtract = @SQLExtract + ' x.value(''@' + column_name + ''', ''' + data_type + CASE WHEN Charlen IS NULL THEN '' ELSE '(' + Charlen + ')' END + '''' + ') AS [' + column_name + '],' FROM #XMLColumns; SET @SQLExtract = LEFT (@SQLExtract,LEN (@SQLExtract) - 1) ; SELECT @SQLExtract = @SQLExtract + ' FROM @Msg.nodes(''/DataModification/TableData/Row'') TempXML (x)'; -- Génération des commandes INSERT, UPDATE, DELETE IF @SQL_ACTION = 'insert' BEGIN SET @SQLCmd = 'INSERT ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' ' + @SQLExtract; END; IF @SQL_ACTION = 'update' BEGIN SELECT @ColumnList = @ColumnList + column_name + '=E.' + column_name + ',' FROM #XMLColumns WHERE column_name <> @KEY_COLUMN; SET @ColumnList = LEFT (@ColumnList,LEN (@ColumnList) - 1) ; SET @SQLCmd = 'UPDATE ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' SET ' + @ColumnList + ' FROM ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' AS T JOIN (' + @SQLExtract + ') AS E ON T.' + @KEY_COLUMN + '=E.' + @KEY_COLUMN; END; IF @SQL_ACTION = 'delete' BEGIN SET @SQLCmd = 'DELETE ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' FROM ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' AS T JOIN (' + @SQLExtract + ') AS E ON T.' + @KEY_COLUMN + '=E.' + @KEY_COLUMN; END; PRINT @SQLCmd -- soyons curieux -- Pour exécuter la commande -- EXEC sp_executeSQL @SQLCmd,N'@Msg xml',@Msg = @Msg;
Automatisation
Il nous faut bien entendu rendre cela automatique. Pour cela, 3 étapes :
- Créer des trigger sur INSERT, UPDATE et DELETE qui déclenchent l'envoi du message sur les modifications
- Créer sur le destinataire une procédure stockée de réception et traitement du message (qui intègrera le code vu plus haut)
- Configurer sur le destinataire l'activation automatique à la reception de messages pour notre file d'attente.
De plus, côté émetteur du message, il y a également réception de messages de contrôle. Une activation automatique avec une procédure stockée sera également nécessaire pour traiter les éventuelles erreurs et sortir ces messages de la file d'attente et éviter ainsi qu'ils ne s'y accumulent.
Je ne vais pas publier ici l'ensemble de ce code (triggers, activation), mais vous trouverez les scripts complets de la démo ici : Synchro_Service_Broker_Scripts.zip
Bonne synchronisation avec Service Broker...