001package fr.ifremer.adagio.synchro.service.data;
002
003/*
004 * #%L
005 * Tutti :: Persistence
006 * $Id: DataSynchroServiceImpl.java 1573 2014-02-04 16:41:40Z tchemit $
007 * $HeadURL: http://svn.forge.codelutin.com/svn/tutti/trunk/tutti-persistence/src/main/java/fr/ifremer/adagio/core/service/technical/synchro/DataSynchroServiceImpl.java $
008 * %%
009 * Copyright (C) 2012 - 2014 Ifremer
010 * %%
011 * This program is free software: you can redistribute it and/or modify
012 * it under the terms of the GNU Affero General Public License as published by
013 * the Free Software Foundation, either version 3 of the License, or
014 * (at your option) any later version.
015 * 
016 * This program is distributed in the hope that it will be useful,
017 * but WITHOUT ANY WARRANTY; without even the implied warranty of
018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
019 * GNU General Public License for more details.
020 * 
021 * You should have received a copy of the GNU Affero General Public License
022 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
023 * #L%
024 */
025
026import static org.nuiton.i18n.I18n.t;
027
028import java.io.File;
029import java.sql.Connection;
030import java.sql.DriverManager;
031import java.sql.ResultSet;
032import java.sql.SQLException;
033import java.sql.Timestamp;
034import java.util.Date;
035import java.util.List;
036import java.util.Map;
037import java.util.Properties;
038import java.util.Set;
039
040import javax.sql.DataSource;
041
042import org.apache.commons.collections4.CollectionUtils;
043import org.apache.commons.io.IOUtils;
044import org.apache.commons.lang3.StringUtils;
045import org.apache.commons.lang3.time.DateUtils;
046import org.apache.commons.logging.Log;
047import org.apache.commons.logging.LogFactory;
048import org.hibernate.cfg.Configuration;
049import org.hibernate.cfg.Environment;
050import org.hibernate.dialect.Dialect;
051import org.nuiton.util.TimeLog;
052import org.springframework.beans.factory.annotation.Autowired;
053import org.springframework.context.annotation.Lazy;
054import org.springframework.jdbc.datasource.DataSourceUtils;
055import org.springframework.stereotype.Service;
056
057import com.google.common.base.Preconditions;
058import com.google.common.base.Predicate;
059import com.google.common.collect.Lists;
060import com.google.common.collect.Maps;
061
062import fr.ifremer.adagio.synchro.config.SynchroConfiguration;
063import fr.ifremer.adagio.synchro.dao.DaoUtils;
064import fr.ifremer.adagio.synchro.dao.SynchroTableDao;
065import fr.ifremer.adagio.synchro.dao.SynchroTableDaoImpl;
066import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor;
067import fr.ifremer.adagio.synchro.meta.SynchroColumnMetadata;
068import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata;
069import fr.ifremer.adagio.synchro.meta.SynchroJoinMetadata;
070import fr.ifremer.adagio.synchro.meta.SynchroMetadataUtils;
071import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata;
072import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata.TableInsertStrategy;
073import fr.ifremer.adagio.synchro.service.SynchroBaseService;
074import fr.ifremer.adagio.synchro.service.SynchroContext;
075import fr.ifremer.adagio.synchro.service.SynchroResult;
076import fr.ifremer.adagio.synchro.service.SynchroServiceUtils;
077import fr.ifremer.adagio.synchro.service.SynchroPendingOperationBuffer;
078import fr.ifremer.adagio.synchro.type.ProgressionModel;
079
080/**
081 * Created on 1/14/14.
082 * 
083 * @author Benoit Lavenier <benoit.lavenier@e-is.pro>
084 * @since 3.5.2
085 */
086@Service("dataSynchroService")
087@Lazy
088public class DataSynchroServiceImpl extends SynchroBaseService implements DataSynchroService {
089
090    /** Logger. */
091    private static final Log log =
092            LogFactory.getLog(DataSynchroServiceImpl.class);
093
094    private static final TimeLog TIME =
095            new TimeLog(DataSynchroServiceImpl.class);
096
097    public DataSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) {
098        super(dataSource, config);
099    }
100
101    public DataSynchroServiceImpl() {
102        super();
103    }
104
105    @Override
106    public SynchroContext createSynchroContext(File sourceDbDirectory) {
107
108        String dbName = config.getDbName();
109        Properties targetConnectionProperties = config.getConnectionProperties();
110
111        Properties sourceConnectionProperties = new Properties(targetConnectionProperties);
112        sourceConnectionProperties.setProperty(Environment.URL,
113                DaoUtils.getJdbcUrl(sourceDbDirectory, dbName));
114
115        Set<String> tableToIncludes = config.getImportDataTablesIncludes();
116
117        SynchroContext context = SynchroContext.newContext(
118                tableToIncludes,
119                sourceConnectionProperties,
120                targetConnectionProperties,
121                new SynchroResult());
122        return context;
123    }
124
125    @Override
126    public SynchroContext createSynchroContext(Properties sourceConnectionProperties) {
127
128        Properties targetConnectionProperties = config.getConnectionProperties();
129
130        Set<String> tableToIncludes = config.getImportDataTablesIncludes();
131
132        SynchroContext context = SynchroContext.newContext(
133                tableToIncludes,
134                sourceConnectionProperties,
135                targetConnectionProperties,
136                new SynchroResult());
137        return context;
138    }
139
140    @Override
141    public void prepare(SynchroContext synchroContext) {
142
143        Preconditions.checkNotNull(synchroContext);
144
145        Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties();
146        Preconditions.checkNotNull(sourceConnectionProperties);
147
148        Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties();
149        Preconditions.checkNotNull(targetConnectionProperties);
150
151        Set<String> tableNames = synchroContext.getTableNames();
152        Predicate<String> tableFilter = synchroContext.getTableFilter();
153        if (CollectionUtils.isEmpty(tableNames) && tableFilter == null) {
154            log.info(t("adagio.persistence.synchronizeData.prepare.noTableFilter"));
155        }
156
157        SynchroResult result = synchroContext.getResult();
158        Preconditions.checkNotNull(result);
159
160        result.setLocalUrl(getUrl(targetConnectionProperties));
161        result.setRemoteUrl(getUrl(sourceConnectionProperties));
162
163        Connection targetConnection = null;
164        Connection sourceConnection = null;
165        try {
166
167            ProgressionModel progressionModel = result.getProgressionModel();
168            progressionModel.setMessage(t("adagio.persistence.synchronizeData.prepare.step1"));
169
170            // create target connection
171            targetConnection = createConnection(targetConnectionProperties);
172
173            progressionModel.setMessage(t("adagio.persistence.synchronizeData.prepare.step2"));
174
175            // create source Connection
176            sourceConnection = createConnection(sourceConnectionProperties);
177
178            // load metas
179            SynchroDatabaseMetadata targetMeta =
180                    SynchroDatabaseMetadata.loadDatabaseMetadata(
181                            targetConnection,
182                            getDialect(targetConnectionProperties),
183                            getConfiguration(targetConnectionProperties),
184                            synchroContext,
185                            tableNames,
186                            tableFilter,
187                            null, /* no column filter */
188                            true /* load join metadata */);
189
190            SynchroDatabaseMetadata sourceMeta =
191                    SynchroDatabaseMetadata.loadDatabaseMetadata(
192                            sourceConnection,
193                            getDialect(sourceConnectionProperties),
194                            getConfiguration(sourceConnectionProperties),
195                            synchroContext,
196                            tableNames,
197                            tableFilter,
198                            null, /* no column filter */
199                            true /* load join metadata */);
200
201            progressionModel.setMessage(t("adagio.persistence.synchronizeData.prepare.step3"));
202
203            // check schema
204            SynchroServiceUtils.checkSchemas(sourceMeta, targetMeta, true, false, result);
205
206            if (result.isSuccess()) {
207
208                // prepare model (compute update date, count rows to update,...)
209                Set<String> rootTableNames = targetMeta.getLoadedRootTableNames();
210                if (rootTableNames.size() == 0 && log.isWarnEnabled()) {
211                    log.warn(t("adagio.persistence.synchronizeData.prepare.noRootTable"));
212                }
213
214                for (String tableName : rootTableNames) {
215
216                    long t0 = TimeLog.getTime();
217
218                    progressionModel.setMessage(t("adagio.persistence.synchronizeData.prepare.step4", tableName));
219
220                    SynchroTableMetadata sourceTable = sourceMeta.getTable(tableName);
221                    SynchroTableMetadata targetTable = targetMeta.getTable(tableName);
222
223                    if (log.isDebugEnabled()) {
224                        log.debug("Prepare table: " + tableName);
225                    }
226                    prepareRootTable(
227                            sourceTable,
228                            targetTable,
229                            sourceConnection,
230                            targetConnection,
231                            result);
232
233                    TIME.log(t0, "prepare table " + tableName);
234                }
235
236                long totalRows = result.getTotalRows();
237                if (log.isInfoEnabled()) {
238                    log.info("Total rows to update: " + totalRows);
239                }
240                targetConnection.rollback();
241            }
242        } catch (SQLException e) {
243            try {
244                if (targetConnection != null) {
245                    targetConnection.rollback();
246                }
247            } catch (SQLException e1) {
248
249                // ignore the rolback error
250            }
251            result.setError(e);
252        } finally {
253            releaseConnection(sourceConnection);
254            releaseConnection(targetConnection);
255        }
256    }
257
258    @Override
259    public void synchronize(SynchroContext synchroContext) {
260
261        Preconditions.checkNotNull(synchroContext);
262
263        Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties();
264        Preconditions.checkNotNull(sourceConnectionProperties);
265
266        Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties();
267        Preconditions.checkNotNull(targetConnectionProperties);
268
269        Set<String> tableNames = synchroContext.getTableNames();
270        Predicate<String> tableFilter = synchroContext.getTableFilter();
271
272        SynchroResult result = synchroContext.getResult();
273        Preconditions.checkNotNull(result);
274
275        Connection targetConnection = null;
276        Connection sourceConnection = null;
277        try {
278
279            // create source Connection
280            sourceConnection = createConnection(sourceConnectionProperties);
281
282            // create target connection
283            targetConnection = createConnection(targetConnectionProperties);
284
285            // Create column filter (exclude missing optional column)
286            Predicate<SynchroColumnMetadata> columnFilter = null;
287            if (!result.getMissingOptionalColumnNameMaps().isEmpty()) {
288                columnFilter = SynchroMetadataUtils.newExcludeColumnPredicate(result.getMissingOptionalColumnNameMaps());
289            }
290
291            // load metas
292            SynchroDatabaseMetadata dbMetas =
293                    SynchroDatabaseMetadata.loadDatabaseMetadata(
294                            targetConnection,
295                            getDialect(targetConnectionProperties),
296                            getConfiguration(targetConnectionProperties),
297                            synchroContext,
298                            tableNames,
299                            tableFilter,
300                            columnFilter,
301                            true /* load join metadata */
302                            );
303
304            // set total in progression model
305            ProgressionModel progressionModel = result.getProgressionModel();
306            progressionModel.setTotal(result.getTotalRows());
307
308            // For all root table
309            for (String tableName : dbMetas.getLoadedRootTableNames()) {
310
311                SynchroTableMetadata table = dbMetas.getTable(tableName);
312                long t0 = TimeLog.getTime();
313
314                progressionModel.setMessage(t("adagio.persistence.synchronizeData.synchronize.step1", tableName));
315
316                if (log.isInfoEnabled()) {
317                    log.info("Synchronize root table: " + tableName);
318                }
319                long countToUpdate = result.getNbRows(tableName);
320
321                if (countToUpdate > 0) {
322
323                    SynchroPendingOperationBuffer tableBuffer = new SynchroPendingOperationBuffer(tableName);
324
325                    synchronizeRootTable(
326                            table,
327                            synchroContext,
328                            sourceConnection,
329                            targetConnection,
330                            result,
331                            tableBuffer);
332
333                    TIME.log(t0, "synchronize table " + tableName);
334
335                    // Retrieve parent table context. If empty (=no row updated) then exit
336                    if (!tableBuffer.isEmpty()) {
337                        Set<Integer> updatedRemoteIds = tableBuffer.getRemoteIdsMap().keySet();
338
339                        // Synchronize childs tables
340                        synchronizeChildTables(table, updatedRemoteIds, synchroContext, sourceConnection, targetConnection, result, true);
341                    }
342                }
343            }
344
345            if (log.isInfoEnabled()) {
346                long totalInserts = result.getTotalInserts();
347                long totalUpdates = result.getTotalUpdates();
348                log.info("Total rows to treat: " + result.getTotalRows());
349                log.info("Total rows inserted: " + totalInserts);
350                log.info("Total rows  updated: " + totalUpdates);
351                log.info("Total rows  treated: " + (totalInserts + totalUpdates));
352            }
353
354            progressionModel.setMessage(t("adagio.persistence.synchronizeData.synchronize.step2"));
355
356            targetConnection.commit();
357
358        } catch (SQLException e) {
359            try {
360                if (targetConnection != null) {
361                    targetConnection.rollback();
362                }
363            } catch (SQLException e1) {
364
365                // ignore the rollback error
366            }
367            result.setError(e);
368        } catch (Exception e) {
369            try {
370                if (targetConnection != null) {
371                    targetConnection.rollback();
372                }
373            } catch (SQLException e1) {
374
375                // ignore the rollback error
376            }
377            result.setError(e);
378        } finally {
379            releaseConnection(sourceConnection);
380            releaseConnection(targetConnection);
381        }
382    }
383
384    protected void prepareRootTable(
385            SynchroTableMetadata sourceTable,
386            SynchroTableMetadata targetTable,
387            Connection sourceConnection,
388            Connection targetConnection,
389            SynchroResult result) throws SQLException {
390
391        String tablePrefix = sourceTable.getTableLogPrefix();
392        String tableName = sourceTable.getName();
393
394        SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, targetTable, false);
395        SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, sourceTable, false);
396
397        try {
398            // get last updateDate used by target db
399            Timestamp updateDate = targetDao.getLastUpdateDate();
400
401            if (updateDate != null) {
402
403                // just inscrements of 1 milisecond to not having same
404                updateDate = new Timestamp(DateUtils.setMilliseconds(updateDate, 0).getTime());
405                updateDate = new Timestamp(DateUtils.addSeconds(updateDate, 1).getTime());
406            }
407
408            long countToUpdate = sourceDao.countDataToUpdate(updateDate);
409
410            if (log.isInfoEnabled()) {
411                log.info(String.format("%s nb rows to update: %s", tablePrefix, countToUpdate));
412            }
413
414            result.setUpdateDate(tableName, updateDate);
415            result.addRows(tableName, (int) countToUpdate);
416        } finally {
417            IOUtils.closeQuietly(targetDao);
418            IOUtils.closeQuietly(sourceDao);
419        }
420    }
421
422    protected void synchronizeRootTable(
423            SynchroTableMetadata table,
424            SynchroContext context,
425            Connection sourceConnection,
426            Connection targetConnection,
427            SynchroResult result,
428            SynchroPendingOperationBuffer tableBuffer) throws SQLException {
429
430        String tableName = table.getName();
431
432        result.getProgressionModel().setMessage(t("adagio.persistence.synchronizeData.synchronizeTable", tableName));
433
434        SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, table, false);
435        SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, table, true);
436
437        // get last updateDate used by target db
438        Date updateDate = result.getUpdateDate(tableName);
439
440        ResultSet dataToUpdate = null;
441        try {
442            // get data to update from source db
443            dataToUpdate = sourceDao.getDataToUpdate(updateDate);
444
445            updateTableUsingRemoteId(
446                    targetDao,
447                    dataToUpdate,
448                    result,
449                    tableBuffer);
450        } finally {
451            DaoUtils.closeSilently(dataToUpdate);
452
453            IOUtils.closeQuietly(targetDao);
454            IOUtils.closeQuietly(sourceDao);
455        }
456    }
457
458    protected void synchronizeChildTable(
459            SynchroTableMetadata table,
460            String joinColumnName,
461            Set<Integer> joinColumnIds,
462            Connection sourceConnection,
463            Connection targetConnection,
464            SynchroResult result,
465            SynchroPendingOperationBuffer tableBuffer) throws SQLException {
466
467        String tableName = table.getName();
468
469        result.getProgressionModel().setMessage(t("adagio.persistence.synchronizeData.synchronizeTable", tableName));
470
471        SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, table, false);
472        SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, table, true);
473
474        try {
475            ResultSet dataToUpdate = sourceDao.getDataByFk(joinColumnName, joinColumnIds);
476
477            try {
478                // Table with a REMOTE_ID (and ID)
479                if (table.isWithRemoteIdColumn()) {
480
481                    // small table update strategy
482                    updateTableUsingRemoteId(
483                            targetDao,
484                            dataToUpdate,
485                            result,
486                            tableBuffer);
487                }
488
489                // Association tables, ...
490                else {
491                    updateTableNoRemoteId(
492                            targetDao,
493                            dataToUpdate,
494                            result,
495                            tableBuffer);
496                }
497
498            } finally {
499                DaoUtils.closeSilently(dataToUpdate);
500            }
501
502        } finally {
503            IOUtils.closeQuietly(targetDao);
504            IOUtils.closeQuietly(sourceDao);
505        }
506    }
507
508    /**
509     * To update the content of the given {@code table} on the target db,
510     * from the given {@code incomingData} of the source db.
511     * <p/>
512     * The algorithm use remote_id : for each row of the {@code incomingData}, if exists on target table, then do an
513     * update, otherwise do a insert.
514     * <p/>
515     * 
516     * @param synchroContext
517     *            Synchronization context
518     * @param targetDao
519     *            connection on the target db
520     * @param incomingData
521     *            data to update from the source db
522     * @param result
523     *            where to store operation results
524     * @throws SQLException
525     *             if any sql errors
526     */
527    protected void updateTableUsingRemoteId(
528            SynchroTableDao targetDao,
529            ResultSet incomingData,
530            SynchroResult result,
531            SynchroPendingOperationBuffer tableBuffer) throws SQLException {
532
533        SynchroTableMetadata table = targetDao.getTable();
534        Preconditions.checkArgument(table.isWithRemoteIdColumn());
535        boolean enableGeneratedIdFirst = table.getInsertStrategy() == TableInsertStrategy.GENERATE_ID_FIRST;
536
537        String tableName = table.getName();
538        String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
539
540        // get existing ids in the target db
541        Map<Integer, Integer> existingRemoteIdsMap = targetDao.getExistingRemoteIdsMap();
542        if (log.isDebugEnabled()) {
543            log.debug(tablePrefix + " existing rows: " + existingRemoteIdsMap.size());
544        }
545
546        result.addTableName(tableName);
547
548        int countR = 0;
549
550        // boolean hasChildTables = table.hasChildJoins();
551        // Map<Integer, Integer> updatedRemoteIds = null;
552        // if (hasChildTables) {
553        // updatedRemoteIds = Maps.newHashMap();
554        // }
555
556        while (incomingData.next()) {
557
558            Integer remoteId = table.getId(incomingData);
559            Integer localId = existingRemoteIdsMap.get(remoteId);
560            boolean doUpdate = localId != null;
561
562            if (doUpdate) {
563                List<Object> pk = Lists.<Object> newArrayList(localId);
564                targetDao.executeUpdate(pk, incomingData);
565
566            } else {
567                if (enableGeneratedIdFirst) {
568                    localId = targetDao.executeInsertAndReturnId(incomingData);
569                    // updatedRemoteIds.put(remoteId, localId);
570                }
571                else {
572                    targetDao.executeInsert(incomingData);
573                }
574            }
575
576            countR++;
577
578            reportProgress(result, targetDao, countR, tablePrefix);
579        }
580
581        targetDao.flush();
582
583        int insertCount = targetDao.getInsertCount();
584        int updateCount = targetDao.getUpdateCount();
585
586        result.addInserts(tableName, insertCount);
587        result.addUpdates(tableName, updateCount);
588
589        if (log.isInfoEnabled()) {
590            log.info(String.format("%s done: %s (inserts: %s, updates: %s)", tablePrefix, insertCount + updateCount, insertCount, updateCount));
591        }
592
593        if (log.isDebugEnabled()) {
594            log.debug(String.format("%s INSERT count: %s", tablePrefix, insertCount));
595            log.debug(String.format("%s UPDATE count: %s", tablePrefix, updateCount));
596        }
597
598        result.getProgressionModel().increments(countR % 1000);
599    }
600
601    /**
602     * To update the content of the given {@code table} on the target db,
603     * from the given {@code incomingData} of the source db.
604     * The algorithm use a standard update strategy, using primary key.
605     * 
606     * @param synchroContext
607     * @param targetDao
608     * @param incomingData
609     * @param result
610     * @throws SQLException
611     */
612    protected void updateTableNoRemoteId(
613            SynchroTableDao targetDao,
614            ResultSet incomingData,
615            SynchroResult result,
616            SynchroPendingOperationBuffer tableBuffer) throws SQLException {
617        SynchroTableMetadata table = targetDao.getTable();
618        Preconditions.checkArgument(!table.isWithRemoteIdColumn());
619
620        String tableName = table.getName();
621        String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
622
623        // get existing ids in the target db
624        Set<String> existingIds = targetDao.getExistingPrimaryKeys();
625        if (log.isDebugEnabled()) {
626            log.debug(tablePrefix + " existing rows: " + existingIds.size());
627        }
628
629        result.addTableName(tableName);
630
631        int countR = 0;
632
633        boolean hasChildTables = table.hasChildJoins();
634        List<List<Object>> updatedPks = null;
635        if (hasChildTables) {
636            updatedPks = Lists.newArrayList();
637        }
638
639        while (incomingData.next()) {
640            List<Object> pk = targetDao.getPk(incomingData);
641            String pkStr = table.toPkStr(pk);
642
643            boolean doUpdate = existingIds.contains(pkStr);
644
645            if (doUpdate) {
646
647                targetDao.executeUpdate(pk, incomingData);
648
649            } else {
650
651                targetDao.executeInsert(incomingData);
652            }
653
654            if (hasChildTables) {
655                updatedPks.add(pk);
656            }
657
658            countR++;
659
660            reportProgress(result, targetDao, countR, tablePrefix);
661        }
662
663        targetDao.flush();
664
665        // Put in context (to be used by child join tables)
666        if (hasChildTables && !updatedPks.isEmpty()) {
667            tableBuffer.addPks(updatedPks);
668        }
669
670        int insertCount = targetDao.getInsertCount();
671        int updateCount = targetDao.getUpdateCount();
672
673        result.addInserts(tableName, insertCount);
674        result.addUpdates(tableName, updateCount);
675
676        if (log.isInfoEnabled()) {
677            log.info(String.format("%s done: %s (inserts: %s, updates: %s)", tablePrefix, insertCount + updateCount, insertCount, updateCount));
678        }
679
680        if (log.isDebugEnabled()) {
681            log.debug(String.format("%s INSERT count: %s", tablePrefix, insertCount));
682            log.debug(String.format("%s UPDATE count: %s", tablePrefix, updateCount));
683        }
684
685        result.getProgressionModel().increments(countR % 1000);
686    }
687
688    Connection createConnection(Properties connectionProperties) throws SQLException {
689        return createConnection(
690                connectionProperties.getProperty(Environment.URL),
691                connectionProperties.getProperty(Environment.USER),
692                connectionProperties.getProperty(Environment.PASS));
693    }
694
695    String getUrl(Properties connectionProperties) {
696        return connectionProperties.getProperty(Environment.URL);
697    }
698
699    Dialect getDialect(Properties connectionProperties) {
700        return Dialect.getDialect(connectionProperties);
701    }
702
703    Configuration getConfiguration(Properties connectionProperties) {
704        return new Configuration().setProperties(connectionProperties);
705    }
706
707    Connection createConnection(String jdbcUrl,
708            String user,
709            String password) throws SQLException {
710        Preconditions.checkArgument(StringUtils.isNotBlank(jdbcUrl));
711
712        // If same URL as datasource, use the dataSource
713        if (jdbcUrl.equals(config.getJdbcURL()) && this.dataSource != null) {
714            return DataSourceUtils.getConnection(this.dataSource);
715        }
716
717        Connection connection = DriverManager.getConnection(jdbcUrl,
718                user,
719                password);
720        connection.setAutoCommit(false);
721        return connection;
722    }
723
724    void releaseConnection(Connection connection) {
725        DaoUtils.closeSilently(connection);
726    }
727
728    protected Properties getRemoteProperties(File dbDirectory) {
729        Properties sourceConnectionProperties = new Properties();
730        SynchroConfiguration config = SynchroConfiguration.getInstance();
731
732        String jdbcUrl = DaoUtils.getJdbcUrl(dbDirectory,
733                config.getDbName());
734
735        DaoUtils.fillConnectionProperties(sourceConnectionProperties,
736                jdbcUrl,
737                config.getJdbcUsername(),
738                config.getJdbcPassword());
739        return sourceConnectionProperties;
740    }
741
742    protected void synchronizeChildTables(
743            SynchroTableMetadata parentTable,
744            Set<Integer> parentRemoteIds,
745            SynchroContext context,
746            Connection sourceConnection,
747            Connection targetConnection,
748            SynchroResult result,
749            boolean enableLogCount) throws SQLException {
750
751        Preconditions.checkNotNull(parentTable);
752        Preconditions.checkNotNull(parentRemoteIds);
753        Preconditions.checkArgument(!parentRemoteIds.isEmpty());
754
755        List<SynchroTableMetadata> updatedTables = Lists.newArrayList();
756        List<SynchroPendingOperationBuffer> updatedTablesBuffers = Lists.newArrayList();
757
758        for (SynchroJoinMetadata join : parentTable.getChildJoins()) {
759            long t0 = TimeLog.getTime();
760
761            SynchroTableMetadata table = join.getTargetTable();
762            String tableName = table.getName();
763            if (log.isInfoEnabled()) {
764                log.info(String.format("Synchronize table: %s (as child of %s)", tableName, parentTable.getName()));
765            }
766
767            SynchroPendingOperationBuffer penginOperationBuffer = new SynchroPendingOperationBuffer(tableName);
768
769            // Retrieve the table to update, from the join
770            String joinColumnName = join.getTargetColumn().getName();
771
772            synchronizeChildTable(
773                    table,
774                    joinColumnName,
775                    parentRemoteIds,
776                    sourceConnection,
777                    targetConnection,
778                    result,
779                    penginOperationBuffer);
780
781            TIME.log(t0, "synchronize table " + tableName);
782
783            if (!penginOperationBuffer.isEmpty()) {
784                updatedTables.add(table);
785                updatedTablesBuffers.add(penginOperationBuffer);
786            }
787        }
788
789        // Recursive call, for each child of the processed child tables
790        for (int i = 0; i < updatedTables.size(); i++) {
791            SynchroTableMetadata table = updatedTables.get(i);
792            SynchroPendingOperationBuffer tableBuffer = updatedTablesBuffers.get(i);
793
794            Set<Integer> updatedRemoteIds = tableBuffer.getRemoteIdsMap().keySet();
795            synchronizeChildTables(table, updatedRemoteIds, context, sourceConnection, targetConnection, result, false);
796        }
797    }
798}