Synchroniser ses données avec Service Broker

Encore une autre suite de la conférence du SQL Saturday : "Synchroniser ses données, c'est plus pas facile que c'est compliqué". Nous parlons ici de l'utilisation du Service Broker pour effectuer une synchronisation de données.

Nous avons déjà vu comment nous pouvions utiliser le Change Tracking ou le Change Data Capture pour synchroniser des données.

Ici, l'approche est un peu différente : la synchronisation de données se fera au fil de l'eau grâce à l'émission de messages asynchrones entre l'instance source et l'instance destination, tout cela orchestré par le Service Broker.

Pour définir le Service Broker : c'est un middleware orienté messages (MOM), entièrement intégré dans SQL Server : un outil de messagerie de base de données permettant de gérer des flux de messages de manière asynchrone entre bases ou instances SQL Server. Basé sur des files d'attentes, la remise des message est garantie (si le destinataire est hors ligne, le message va être conservé jusqu'à ce qu'on puisse le transmettre).

Le Service Broker est déjà utilisé en interne dans SQL Server pour certaines de ses fonctionnalités : citons la messagerie de bases de données (database mail) ou les notifications d'évènements.

Le Service Broker va donc pouvoir s'adresser à des scénarios de synchronisation différents : bases de données à structures différentes, la fonctionnalité d'envoi de données et d'instructions par des messages au fil de l'eau offrant toutes les possibilités de traitement personnalisé...

Autre avantage du Service Broker, il est disponible dès l'édition Standard. Sa mise en oeuvre se fait intégralement en code TSQL.

Voyons quels sont les composantes et métadonnées du service Broker :

Le Service Broker va nous permettre d'envoyer des messages d'un service émetteur (notre base source) vers un service destinataire (notre base destination, qui peut être sur une autre instance ou un autre serveur.

Pour faire le faire fonctionner et permettre les conversations entre nos deux services, nous avons un peu de configuration ainsi que différents objets à créer dans les bases de données source et destination :

Activation du Service Broker (sur les deux bases, source et destination)

-- Instance et Base source
ALTER DATABASE SBSource SET ENABLE_BROKER;
-- Instance et Base destination
ALTER DATABASE SBDest SET ENABLE_BROKER;

Message Types

Les types de messages (message types) vont nous permettre de définir quels types de messages nous voulons transmettre : EMPTY, NONE, WELL_FORMED_XML, VALID_XML WHITH SCHEMA COLLECTION. Nous devons créer les types de message sur la source et la destination, ici nous choisirons une validation du contenu (XML) avec la clause WELL_FORMED_XML.

-- Côté source et destination à l'identique
CREATE MESSAGE TYPE [DataModification_MessageType]
VALIDATION = WELL_FORMED_XML ;

Contrats

Les contrats déterminent quel partenaire (initiateur ou destinataire) peut envoyer quel type(s) de message(s).

-- Côté source et destination à l'identique
CREATE CONTRACT [DataModification_Contract]
(
[DataModification_MessageType]
SENT BY INITIATOR
)

Files d'attentes

Les files d'attentes sont le stockage des messages reçus (ce sont des tables créées dans la base de données). Pour rendre plus lisible notre configuration, nous allons les nommer différemment côté émetteur et destinataire.

-- Côté source
CREATE QUEUE DataModification_Queue 
WITH STATUS = ON
-- Côté destination
CREATE QUEUE DataSynchro_Queue 
WITH STATUS = ON

Services

Les services sont les points de connexion logiques entre lequels les partenaires vont établir une conversation pour se transmettre des messages. Dans la définition du service, on précise quelle est la file d'attente et le contrat qui s'y rapportent. Nous allons créer un service pour chaque partenaire (les noms sont nécessairement différents...). Nous octroyons au rôle public la permission d'envoi par le service.

-- Côté source : ce service initiera la communication avec le service destinataire.

CREATE SERVICE [DataModification_Service] 
ON QUEUE DataModification_Queue (DataModification_Contract)
GRANT SEND ON SERVICE::DataModification_Service TO PUBLIC
-- Côté destination : ce service recevra les messages concernant les données à synchroniser
CREATE SERVICE [DataSynchro_Service] 
ON QUEUE DataSynchro_Queue (DataModification_Contract)
grant send on service::DataSynchro_Service to PUBLIC

Configuration de la couche réseau : EndPoints et routes

Pour faire converser nos deux services, nous devons créer des points de connexion TCP.

Pour que le service local puisse localiser l'adresse et le port du point de connexion du service distant, nous allons créer des routes.

 -- Côté source : endpoint et route vers le service distant
CREATE ENDPOINT BrokerEndpoint STATE = STARTED AS TCP ( LISTENER_PORT = 4037 )
    FOR SERVICE_BROKER ( AUTHENTICATION = WINDOWS, ENCRYPTION = DISABLED );
CREATE ROUTE DataModification_Route  -- route to destination
WITH
SERVICE_NAME = 'DataSynchro_Service',
ADDRESS = 'TCP://192.168.56.2:4038';  -- adresse et port du endpoint distant
-- Côté destination : endpoint et route vers le service distant
CREATE ENDPOINT BrokerEndpoint
    STATE = STARTED
    AS TCP ( LISTENER_PORT = 4038 )
    FOR SERVICE_BROKER ( AUTHENTICATION = WINDOWS, ENCRYPTION = DISABLED ) ;
CREATE ROUTE DataModification_Route   -- route to source
    WITH
    SERVICE_NAME = 'DataModification_Service',
    ADDRESS = 'TCP://192.168.56.1:4037' ;

Gestion des messages de synchronisation

Pour assurer la synchronisation de données dans notre exemple, nous avons besoin de différents éléments dans le message que nous allons transmettre à la base destinataire :

  • le schéma et le nom de la table modifiée,
  • l'action effectuée (insert, delete, update),
  • la clé primaire de la table,
  • la structure de la table,
  • les lignes de données modifiées.

L'objectif pour le destinataire sera de reconstruire la commande TSQL à jouer pour assurer la synchronisation.

Constituons un message de test, avec pour exemple les 3 premières lignes de la table dbo.Person modifiées par un UPDATE :

DECLARE @Msg xml;
SET @Msg = (SELECT 'dbo' AS '@TableSchema'
                  ,'person' AS '@TableName'
                  ,'update' AS '@Action'
                  ,'BusinessEntityID' AS '@KeyCol'
                  , (
                     SELECT column_name
                           ,data_type
                           ,CASE is_nullable
                            WHEN 'YES' THEN 'true' ELSE 'false'
                            END AS is_nullable
                           ,CHARACTER_MAXIMUM_LENGTH AS Charlen
                       FROM information_schema.columns [column]
                       WHERE table_name = 'person'
                       FOR XML AUTO,ROOT ('TableStructure') ,TYPE) 
                  , (
                     SELECT TOP (3) *
                       FROM Person Row
                       FOR XML AUTO,BINARY BASE64,ROOT ('TableData') ,TYPE)
                    FOR XML PATH ('DataModification')) ; 
SELECT @Msg;

Ceci va nous produire un joli message, entièrement en XML, qui contient tous ces éléments :

<DataModification TableSchema="dbo" TableName="person" Action="update" KeyCol="BusinessEntityID">
  <TableStructure>
    <column column_name="BusinessEntityID" data_type="int" is_nullable="false" />
    <column column_name="PersonType" data_type="nchar" is_nullable="false" Charlen="2" />
    <column column_name="FirstName" data_type="nvarchar" is_nullable="false" Charlen="50" />
    <column column_name="LastName" data_type="nvarchar" is_nullable="false" Charlen="50" />
  </TableStructure>
  <TableData>
    <Row BusinessEntityID="4" PersonType="EM" FirstName="Rob" LastName="Walters" />
    <Row BusinessEntityID="5" PersonType="EM" FirstName="Gail" LastName="Erickson" />
    <Row BusinessEntityID="6" PersonType="EM" FirstName="Jossef" LastName="Goldberg" />
  </TableData>
</DataModification>

Côté destinataire, c'est un peu plus compliqué : nous devons décompiler le XML et produire la commande TSQL qui va faire la synchronisation. Croyez-moi sur parole (et faites donc le test), le code ci-dessous produit la commande nécessaire pour faire l'UPDATE des 3 lignes concernées dans la table destination !...

-- Extraction et interprétation du message XML (contenu dans la variable @msg)

DECLARE @XML_NATURE nvarchar (30) ,
        @TABLE_SCHEMA sysname,
        @TABLE_NAME sysname,
        @SQL_ACTION nvarchar (30) ,
        @KEY_COLUMN sysname,
        @ColumnList nvarchar (max) = '',
        @SQLCmd nvarchar (max) ;

SELECT @XML_NATURE = Txml1.XdataHead.value ('fn:local-name(.)','NVARCHAR(128)') 
      ,@TABLE_SCHEMA = Txml1.XdataHead.value ('@TableSchema','NVARCHAR(128)') 
      ,@TABLE_NAME = Txml1.XdataHead.value ('@TableName','NVARCHAR(128)') 
      ,@SQL_ACTION = Txml1.XdataHead.value ('@Action','NVARCHAR(128)') 
      ,@KEY_COLUMN = Txml1.XdataHead.value ('@KeyCol','NVARCHAR(128)')
  FROM @Msg.nodes ('./*') AS Txml1 (XdataHead) ;

-- structure de la table
IF OBJECT_ID ('tempdb..#XMLColumns') IS NOT NULL
    BEGIN
        DROP TABLE #XMLColumns;
    END;
SELECT x.value ('@column_name','sysname') AS column_name
      ,x.value ('@data_type','sysname') AS data_type
      ,x.value ('@is_nullable','VARCHAR(20)') AS is_nullable
      ,x.value ('@Charlen','VARCHAR(20)') AS Charlen INTO #XMLColumns
  FROM @Msg.nodes ('/DataModification/TableStructure/column') TempXML (x) ;

-- extraction des lignes
DECLARE @SQLExtract nvarchar (max) = 'SELECT ';
SELECT @SQLExtract = @SQLExtract + '
x.value(''@' + column_name + ''', ''' + data_type + CASE
                                                    WHEN Charlen IS NULL THEN '' ELSE '(' + Charlen + ')'
                                                    END + '''' + ') AS [' + column_name + '],'
  FROM #XMLColumns;
SET @SQLExtract = LEFT (@SQLExtract,LEN (@SQLExtract) - 1) ;
SELECT @SQLExtract = @SQLExtract + ' FROM @Msg.nodes(''/DataModification/TableData/Row'') TempXML (x)';

-- Génération des commandes INSERT, UPDATE, DELETE
IF @SQL_ACTION = 'insert'
    BEGIN
        SET @SQLCmd = 'INSERT ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' ' + @SQLExtract;
    END;
IF @SQL_ACTION = 'update'
    BEGIN
        SELECT @ColumnList = @ColumnList + column_name + '=E.' + column_name + ','
          FROM #XMLColumns
          WHERE column_name <> @KEY_COLUMN;
        SET @ColumnList = LEFT (@ColumnList,LEN (@ColumnList) - 1) ;
        SET @SQLCmd = 'UPDATE ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' SET ' + @ColumnList + ' FROM ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' AS T JOIN (' + @SQLExtract + ') AS E ON T.' + @KEY_COLUMN + '=E.' + @KEY_COLUMN;
    END;
IF @SQL_ACTION = 'delete'
    BEGIN
        SET @SQLCmd = 'DELETE ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' FROM ' + @TABLE_SCHEMA + '.' + @TABLE_NAME + ' AS T JOIN (' + @SQLExtract + ') AS E ON T.' + @KEY_COLUMN + '=E.' + @KEY_COLUMN;
    END;

PRINT @SQLCmd -- soyons curieux
-- Pour exécuter la commande 
-- EXEC sp_executeSQL @SQLCmd,N'@Msg xml',@Msg = @Msg;

Automatisation

Il nous faut bien entendu rendre cela automatique. Pour cela, 3 étapes : 

  • Créer des trigger sur INSERT, UPDATE et DELETE qui déclenchent l'envoi du message sur les modifications
  • Créer sur le destinataire une procédure stockée de réception et traitement du message (qui intègrera le code vu plus haut)
  • Configurer sur le destinataire l'activation automatique à la reception de messages pour notre file d'attente.

De plus, côté émetteur du message, il y a également réception de messages de contrôle. Une activation automatique avec une procédure stockée sera également nécessaire pour traiter les éventuelles erreurs et sortir ces messages de la file d'attente et éviter ainsi qu'ils ne s'y accumulent.

Je ne vais pas publier ici l'ensemble de ce code (triggers, activation), mais vous trouverez les scripts complets de la démo ici : Synchro_Service_Broker_Scripts.zip

Bonne synchronisation avec Service Broker...