001package fr.ifremer.adagio.synchro.service.referential;
002
003/*
004 * #%L
005 * Tutti :: Persistence
006 * $Id: ReferentialSynchroServiceImpl.java 1573 2014-02-04 16:41:40Z tchemit $
007 * $HeadURL: http://svn.forge.codelutin.com/svn/tutti/trunk/tutti-persistence/src/main/java/fr/ifremer/adagio/core/service/technical/synchro/ReferentialSynchroServiceImpl.java $
008 * %%
009 * Copyright (C) 2012 - 2014 Ifremer
010 * %%
011 * This program is free software: you can redistribute it and/or modify
012 * it under the terms of the GNU Affero General Public License as published by
013 * the Free Software Foundation, either version 3 of the License, or
014 * (at your option) any later version.
015 * 
016 * This program is distributed in the hope that it will be useful,
017 * but WITHOUT ANY WARRANTY; without even the implied warranty of
018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
019 * GNU General Public License for more details.
020 * 
021 * You should have received a copy of the GNU Affero General Public License
022 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
023 * #L%
024 */
025
026import static org.nuiton.i18n.I18n.t;
027
028import java.io.File;
029import java.sql.Connection;
030import java.sql.DriverManager;
031import java.sql.PreparedStatement;
032import java.sql.ResultSet;
033import java.sql.SQLException;
034import java.sql.Timestamp;
035import java.util.Date;
036import java.util.List;
037import java.util.Map;
038import java.util.Properties;
039import java.util.Set;
040
041import javax.sql.DataSource;
042
043import org.apache.commons.collections4.CollectionUtils;
044import org.apache.commons.io.IOUtils;
045import org.apache.commons.lang3.StringUtils;
046import org.apache.commons.lang3.time.DateUtils;
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049import org.hibernate.cfg.Environment;
050import org.hibernate.dialect.Dialect;
051import org.nuiton.util.TimeLog;
052import org.springframework.beans.factory.annotation.Autowired;
053import org.springframework.context.annotation.Lazy;
054import org.springframework.jdbc.datasource.DataSourceUtils;
055import org.springframework.stereotype.Service;
056
057import com.google.common.base.Preconditions;
058import com.google.common.base.Predicate;
059import com.google.common.collect.Maps;
060
061import fr.ifremer.adagio.synchro.config.SynchroConfiguration;
062import fr.ifremer.adagio.synchro.dao.DaoUtils;
063import fr.ifremer.adagio.synchro.dao.SynchroTableDao;
064import fr.ifremer.adagio.synchro.dao.SynchroTableDaoImpl;
065import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor;
066import fr.ifremer.adagio.synchro.meta.SynchroColumnMetadata;
067import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata;
068import fr.ifremer.adagio.synchro.meta.SynchroMetadataUtils;
069import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata;
070import fr.ifremer.adagio.synchro.service.SynchroBaseService;
071import fr.ifremer.adagio.synchro.service.SynchroContext;
072import fr.ifremer.adagio.synchro.service.SynchroResult;
073import fr.ifremer.adagio.synchro.service.SynchroServiceUtils;
074import fr.ifremer.adagio.synchro.type.ProgressionModel;
075
076/**
077 * Created on 1/14/14.
078 * 
079 * @author Tony Chemit <chemit@codelutin.com>
080 * @since 3.0
081 */
082@Service("referentialSynchroService")
083@Lazy
084public class ReferentialSynchroServiceImpl extends SynchroBaseService implements ReferentialSynchroService {
085
086    /** Logger. */
087    private static final Log log =
088            LogFactory.getLog(ReferentialSynchroServiceImpl.class);
089
090    private static final TimeLog TIME =
091            new TimeLog(ReferentialSynchroServiceImpl.class);
092
093    @Autowired
094    public ReferentialSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) {
095        super(dataSource, config);
096    }
097
098    public ReferentialSynchroServiceImpl() {
099        super();
100    }
101
102    @Override
103    public SynchroContext createSynchroContext(File sourceDbDirectory) {
104
105        String dbName = config.getDbName();
106        Properties targetConnectionProperties = config.getConnectionProperties();
107
108        Properties sourceConnectionProperties = new Properties(targetConnectionProperties);
109        sourceConnectionProperties.setProperty(Environment.URL,
110                DaoUtils.getJdbcUrl(sourceDbDirectory, dbName));
111
112        Set<String> tableToIncludes = config.getImportReferentialTablesIncludes();
113
114        SynchroContext context = SynchroContext.newContext(
115                tableToIncludes,
116                sourceConnectionProperties,
117                targetConnectionProperties,
118                new SynchroResult());
119        return context;
120    }
121
122    @Override
123    public SynchroContext createSynchroContext(Properties sourceConnectionProperties) {
124
125        Properties targetConnectionProperties = config.getConnectionProperties();
126
127        Set<String> tableToIncludes = config.getImportReferentialTablesIncludes();
128
129        SynchroContext context = SynchroContext.newContext(
130                tableToIncludes,
131                sourceConnectionProperties,
132                targetConnectionProperties,
133                new SynchroResult());
134        return context;
135    }
136
137    @Override
138    public void prepare(SynchroContext synchroContext) {
139        Preconditions.checkNotNull(synchroContext);
140
141        Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties();
142        Preconditions.checkNotNull(sourceConnectionProperties);
143
144        Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties();
145        Preconditions.checkNotNull(targetConnectionProperties);
146
147        Set<String> tableNames = synchroContext.getTableNames();
148        Predicate<String> tableFilter = synchroContext.getTableFilter();
149        if (CollectionUtils.isEmpty(tableNames) && tableFilter == null) {
150            log.info(t("adagio.persistence.synchronizeReferential.prepare.noTableFilter"));
151        }
152
153        SynchroResult result = synchroContext.getResult();
154        Preconditions.checkNotNull(result);
155
156        result.setLocalUrl(DaoUtils.getUrl(targetConnectionProperties));
157        result.setRemoteUrl(DaoUtils.getUrl(sourceConnectionProperties));
158
159        Connection targetConnection = null;
160        Connection sourceConnection = null;
161        try {
162
163            ProgressionModel progressionModel = result.getProgressionModel();
164            progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step1"));
165
166            // create target connection
167            targetConnection = createConnection(targetConnectionProperties);
168
169            progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step2"));
170
171            // create source Connection
172            sourceConnection = createConnection(sourceConnectionProperties);
173
174            // load metas
175            SynchroDatabaseMetadata targetMeta =
176                    SynchroDatabaseMetadata.loadDatabaseMetadata(
177                            targetConnection,
178                            DaoUtils.getDialect(targetConnectionProperties),
179                            DaoUtils.getConfiguration(targetConnectionProperties),
180                            synchroContext,
181                            tableNames,
182                            tableFilter,
183                            null /* no column filter */,
184                            false /* do not load joins */);
185
186            SynchroDatabaseMetadata sourceMeta =
187                    SynchroDatabaseMetadata.loadDatabaseMetadata(
188                            sourceConnection,
189                            DaoUtils.getDialect(sourceConnectionProperties),
190                            DaoUtils.getConfiguration(sourceConnectionProperties),
191                            synchroContext,
192                            tableNames,
193                            tableFilter,
194                            null /* no column filter */,
195                            false /* do not load joins */);
196
197            progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step3"));
198
199            // check schema
200            SynchroServiceUtils.checkSchemas(sourceMeta, targetMeta, true, true, result);
201
202            if (result.isSuccess()) {
203
204                // prepare model (compute update date, count rows to update,...)
205
206                for (String tableName : targetMeta.getLoadedTableNames()) {
207
208                    long t0 = TimeLog.getTime();
209
210                    progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step4", tableName));
211
212                    SynchroTableMetadata sourceTable = sourceMeta.getTable(tableName);
213
214                    SynchroTableMetadata targetTable = targetMeta.getTable(tableName);
215
216                    prepareTable(
217                            sourceTable,
218                            targetTable,
219                            synchroContext,
220                            targetConnection,
221                            sourceConnection,
222                            result);
223
224                    TIME.log(t0, "prepare table " + tableName);
225                }
226
227                long totalRows = result.getTotalRows();
228                if (log.isInfoEnabled()) {
229                    log.info("Total rows to update: " + totalRows);
230                }
231                targetConnection.rollback();
232            }
233        } catch (SQLException e) {
234            try {
235                if (targetConnection != null) {
236                    targetConnection.rollback();
237                }
238            } catch (SQLException e1) {
239
240                // ignore the rollback error
241            }
242            result.setError(e);
243        } finally {
244            closeSilently(sourceConnection);
245            closeSilently(targetConnection);
246        }
247    }
248
249    @Override
250    public void synchronize(SynchroContext synchroContext) {
251        Preconditions.checkNotNull(synchroContext);
252
253        Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties();
254        Preconditions.checkNotNull(sourceConnectionProperties);
255
256        Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties();
257        Preconditions.checkNotNull(targetConnectionProperties);
258
259        Set<String> tableNames = synchroContext.getTableNames();
260        Predicate<String> tableFilter = synchroContext.getTableFilter();
261
262        SynchroResult result = synchroContext.getResult();
263        Preconditions.checkNotNull(result);
264
265        Connection targetConnection = null;
266        Connection sourceConnection = null;
267        try {
268
269            // create target connection
270            targetConnection = createConnection(targetConnectionProperties);
271
272            // create source Connection
273            sourceConnection = createConnection(sourceConnectionProperties);
274
275            // Create column filter (exclude missing optional column)
276            Predicate<SynchroColumnMetadata> columnFilter = null;
277            if (!result.getMissingOptionalColumnNameMaps().isEmpty()) {
278                columnFilter = SynchroMetadataUtils.newExcludeColumnPredicate(result.getMissingOptionalColumnNameMaps());
279            }
280
281            Dialect targetDialect = DaoUtils.getDialect(targetConnectionProperties);
282
283            // load metas
284            SynchroDatabaseMetadata dbMetas =
285                    SynchroDatabaseMetadata.loadDatabaseMetadata(
286                            targetConnection,
287                            DaoUtils.getDialect(targetConnectionProperties),
288                            DaoUtils.getConfiguration(targetConnectionProperties),
289                            synchroContext,
290                            tableNames,
291                            tableFilter,
292                            columnFilter,
293                            false /* do not load join metadata */);
294
295            // set total in progression model
296            ProgressionModel progressionModel = result.getProgressionModel();
297            progressionModel.setTotal(result.getTotalRows());
298
299            // prepare target (desactivate constraints)
300            prepareSynch(targetConnection);
301
302            try {
303
304                for (String tableName : dbMetas.getLoadedTableNames()) {
305
306                    long t0 = TimeLog.getTime();
307
308                    progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.synchronize.step1", tableName));
309
310                    SynchroTableMetadata table = dbMetas.getTable(tableName);
311
312                    if (log.isInfoEnabled()) {
313                        log.info("Synchronize table: " + tableName);
314                    }
315                    long countToUpdate = result.getNbRows(tableName);
316
317                    if (countToUpdate > 0) {
318
319                        synchronizeTable(
320                                table,
321                                targetConnection,
322                                sourceConnection,
323                                result);
324                    }
325
326                    TIME.log(t0, "synchronize table " + tableName);
327                }
328                if (log.isInfoEnabled()) {
329                    long totalInserts = result.getTotalInserts();
330                    long totalUpdates = result.getTotalUpdates();
331                    log.info("Total rows to treat: " + result.getTotalRows());
332                    log.info("Total rows inserted: " + totalInserts);
333                    log.info("Total rows  updated: " + totalUpdates);
334                    log.info("Total rows  treated: " + (totalInserts + totalUpdates));
335                }
336            } finally {
337                releaseSynch(targetConnection);
338            }
339
340            progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.synchronize.step2"));
341
342            targetConnection.commit();
343
344        } catch (SQLException e) {
345            try {
346                if (targetConnection != null) {
347                    targetConnection.rollback();
348                }
349            } catch (SQLException e1) {
350
351                // ignore the rolback error
352            }
353            result.setError(e);
354        } finally {
355            closeSilently(sourceConnection);
356            closeSilently(targetConnection);
357        }
358    }
359
360    protected void prepareTable(
361            SynchroTableMetadata sourceTable,
362            SynchroTableMetadata targetTable,
363            SynchroContext context,
364            Connection targetConnection,
365            Connection sourceConnection,
366            SynchroResult result) throws SQLException {
367
368        String tableName = sourceTable.getName();
369        String tablePrefix = sourceTable.getTableLogPrefix();
370
371        if (log.isDebugEnabled()) {
372            log.debug("Prepare table: " + tableName);
373        }
374
375        SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, targetTable, false);
376        SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, sourceTable, false);
377
378        try {
379            long targetCount = targetDao.count();
380
381            // get last updateDate used by target db
382            Timestamp updateDate = null;
383
384            if (targetCount < 50000) {
385
386                // only use the update date on small table, for big table we will re-insert all the table content
387                updateDate = targetDao.getLastUpdateDate();
388
389                if (updateDate != null) {
390
391                    // just inscrements of 1 milisecond to not having same
392                    // TODO BL : attention, a cause des transactions parfois longues sur le serveur Oracle, il faut
393                    // peut-etre justement prendre une date inférieure au max(update_date) ?? genre max(update_date) -
394                    // 2h ?
395                    // Ou mieux : stocker puis utiliser une date de dernière mise à jour (systimestamp côté serveur)
396                    updateDate = new Timestamp(DateUtils.setMilliseconds(updateDate, 0).getTime());
397                    updateDate = new Timestamp(DateUtils.addSeconds(updateDate, 1).getTime());
398                }
399            }
400
401            long countToUpdate = sourceDao.countDataToUpdate(updateDate);
402
403            if (log.isInfoEnabled()) {
404                log.info(String.format("%s nb rows to update: %s", tablePrefix, countToUpdate));
405            }
406
407            result.setUpdateDate(tableName, updateDate);
408            result.addRows(tableName, (int) countToUpdate);
409        } finally {
410            IOUtils.closeQuietly(targetDao);
411            IOUtils.closeQuietly(sourceDao);
412        }
413
414    }
415
416    protected void synchronizeTable(
417            SynchroTableMetadata table,
418            Connection targetConnection,
419            Connection sourceConnection,
420            SynchroResult result) throws SQLException {
421
422        String tableName = table.getName();
423
424        result.getProgressionModel().setMessage(t("adagio.persistence.synchronizeReferential.synchronizeTable", tableName));
425
426        SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, table, false);
427        SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, table, true);
428
429        // get last updateDate used by target db
430        Date updateDate = result.getUpdateDate(tableName);
431
432        // get table count
433        long count = targetDao.count();
434
435        boolean bigTable = count > 50000;
436
437        // get data to update from source db
438        ResultSet dataToUpdate = sourceDao.getDataToUpdate(
439                bigTable ? null : updateDate);
440
441        try {
442
443            if (bigTable) {
444
445                // big table update strategy
446                updateBigTable(
447                        targetDao,
448                        sourceDao,
449                        dataToUpdate,
450                        result);
451            } else {
452
453                // small table update strategy
454                updateTable(targetDao,
455                        dataToUpdate,
456                        result);
457            }
458            dataToUpdate.close();
459        } finally {
460
461            IOUtils.closeQuietly(targetDao);
462            IOUtils.closeQuietly(sourceDao);
463            DaoUtils.closeSilently(dataToUpdate);
464        }
465    }
466
467    /**
468     * To update the content of the given {@code table} on the target db,
469     * from the given {@code incomingData} of the source db.
470     * <p/>
471     * The algorithm is pretty simple, for each row of the {@code incomingData}, if exists on target table, then do an
472     * update, otherwise do a insert.
473     * <p/>
474     * As an update query is more expensive, we won't use this method for table with a lot of rows, we will prefer to
475     * use the {@code updateBigTable} method instead.
476     * 
477     * @param targetDao
478     *            connection on the target db
479     * @param incomingData
480     *            data to update from the source db
481     * @param result
482     *            where to store operation results
483     * @throws SQLException
484     *             if any sql errors
485     */
486    protected void updateTable(SynchroTableDao targetDao,
487            ResultSet incomingData,
488            SynchroResult result) throws SQLException {
489
490        SynchroTableMetadata table = targetDao.getTable();
491
492        String tableName = table.getName();
493        String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
494
495        // get existing ids in the target db
496        Set<String> existingIds = targetDao.getExistingPrimaryKeys();
497
498        if (log.isDebugEnabled()) {
499            log.debug(tablePrefix + " existing rows: " + existingIds.size());
500        }
501
502        result.addTableName(tableName);
503
504        int countR = 0;
505
506        while (incomingData.next()) {
507
508            List<Object> pk = table.getPk(incomingData);
509            String pkStr = table.toPkStr(pk);
510
511            boolean doUpdate = existingIds.contains(pkStr);
512
513            if (doUpdate) {
514
515                targetDao.executeUpdate(pk, incomingData);
516
517            } else {
518
519                targetDao.executeInsert(incomingData);
520            }
521
522            countR++;
523
524            reportProgress(result, targetDao, countR, tablePrefix);
525        }
526
527        targetDao.flush();
528
529        int insertCount = targetDao.getInsertCount();
530        int updateCount = targetDao.getUpdateCount();
531
532        result.addInserts(tableName, insertCount);
533        result.addUpdates(tableName, updateCount);
534        if (log.isInfoEnabled()) {
535            log.info(String.format("%s done: %s (inserts: %s, updates: %s)", tablePrefix, countR, insertCount, updateCount));
536        }
537
538        if (log.isDebugEnabled()) {
539            log.debug(String.format("%s INSERT count: %s", tablePrefix, result.getNbInserts(tableName)));
540            log.debug(String.format("%s UPDATE count: %s", tablePrefix, result.getNbUpdates(tableName)));
541        }
542
543        result.getProgressionModel().increments(countR % 1000);
544    }
545
546    /**
547     * To update the content of the given {@code table} (with a lot of rows) on
548     * the target db, from the given {@code incomingData} of the source db.
549     * <p/>
550     * We can't use the simple algorithm, since update queries cost too much and is not acceptable when talking on huge
551     * numbers of rows.
552     * <p/>
553     * Here is what to do :
554     * <ul>
555     * <li>Get form the target db the data which are not in source db, keep them</li>
556     * <li>Delete target table content</li>
557     * <li>Insert source table in target table</li>
558     * <li>Insert the saved extra rows from original table</li>
559     * </ul>
560     * In that way we will only perform some insert queries.
561     * 
562     * @param dbMetas
563     * @param targetDao
564     *            connection on the target db
565     * @param sourceDao
566     *            connection on the target db
567     * @param incomingData
568     *            data to update from the source db
569     * @param interceptor
570     * @param result
571     *            where to store operation results @throws SQLException if any sql errors
572     */
573    protected void updateBigTable(
574            SynchroTableDao targetDao,
575            SynchroTableDao sourceDao,
576            ResultSet incomingData,
577            SynchroResult result) throws SQLException {
578
579        SynchroTableMetadata table = targetDao.getTable();
580        String tableName = targetDao.getTable().getName();
581
582        result.addTableName(tableName);
583
584        String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
585
586        // get existing ids in the target db
587        Set<String> existingIds = targetDao.getExistingPrimaryKeys();
588
589        if (log.isDebugEnabled()) {
590            log.debug(tablePrefix + " target existing rows: " + existingIds.size());
591        }
592
593        Set<String> sourceExistingIds = sourceDao.getExistingPrimaryKeys();
594
595        if (log.isDebugEnabled()) {
596            log.debug(tablePrefix + " source existing rows: " + sourceExistingIds.size());
597        }
598
599        existingIds.removeAll(sourceExistingIds);
600
601        if (log.isDebugEnabled()) {
602            log.debug(tablePrefix + " target existing rows not in source: " + existingIds.size());
603        }
604        if (log.isTraceEnabled()) {
605            for (String existingId : existingIds) {
606                log.trace("- " + existingId);
607            }
608        }
609
610        // copy extra rows from target
611
612        Map<List<Object>, Object[]> extraRows = Maps.newLinkedHashMap();
613
614        for (String pkStr : existingIds) {
615
616            List<Object> pk = table.fromPkStr(pkStr);
617
618            Object[] extraRow = targetDao.findByPk(pk);
619
620            extraRows.put(pk, extraRow);
621        }
622
623        // remove obsolete extra rows
624
625        List<SynchroInterceptor> interceptors = table.getInterceptors();
626        for (SynchroInterceptor interceptor : interceptors) {
627            extraRows = interceptor.transformExtraLocalData(
628                    targetDao,
629                    sourceDao,
630                    extraRows);
631            if (log.isDebugEnabled()) {
632                log.debug(tablePrefix + " target data existingIds not in source (after apply task): " + extraRows.size());
633            }
634        }
635
636        // delete table
637        targetDao.deleteAll();
638
639        int countR = 0;
640
641        // add all data from source
642        while (incomingData.next()) {
643
644            targetDao.executeInsert(incomingData);
645
646            countR++;
647
648            reportProgress(result, targetDao, countR, tablePrefix);
649        }
650
651        // re-add extra target rows
652        for (Map.Entry<List<Object>, Object[]> entry : extraRows.entrySet()) {
653
654            Object[] row = entry.getValue();
655            targetDao.executeInsert(row);
656
657            countR++;
658
659            reportProgress(result, targetDao, countR, tablePrefix);
660        }
661
662        targetDao.flush();
663
664        int insertCount = targetDao.getInsertCount();
665        result.addInserts(tableName, insertCount);
666        if (log.isInfoEnabled()) {
667            log.info(String.format("%s done: %s (inserts: %s)", tablePrefix, countR, insertCount));
668        }
669
670        if (log.isDebugEnabled()) {
671            log.debug(String.format("%s INSERT count: %s", tablePrefix, result.getNbInserts(tableName)));
672        }
673
674        result.getProgressionModel().increments(countR % 1000);
675    }
676
677    protected void reportProgress(SynchroResult result, SynchroTableDao dao, int countR, String tablePrefix) {
678        if (countR % 1000 == 0) {
679            result.getProgressionModel().increments(1000);
680        }
681
682        if (countR % 10000 == 0) {
683            if (log.isInfoEnabled()) {
684                log.info(String.format("%s Done: %s (inserts: %s, updates: %s)", tablePrefix, countR, dao.getInsertCount(), dao.getUpdateCount()));
685            }
686        }
687    }
688
689    protected Connection createConnection(Properties connectionProperties) throws SQLException {
690        return createConnection(
691                connectionProperties.getProperty(Environment.URL),
692                connectionProperties.getProperty(Environment.USER),
693                connectionProperties.getProperty(Environment.PASS));
694    }
695
696    protected Connection createConnection(String jdbcUrl,
697            String user,
698            String password) throws SQLException {
699        Preconditions.checkArgument(StringUtils.isNotBlank(jdbcUrl));
700
701        // If same URL as datasource, use the dataSource
702        if (jdbcUrl.equals(config.getJdbcURL()) && this.dataSource != null) {
703            return DataSourceUtils.getConnection(this.dataSource);
704        }
705
706        Connection connection = DriverManager.getConnection(jdbcUrl,
707                user,
708                password);
709        connection.setAutoCommit(false);
710        return connection;
711    }
712
713    protected void closeSilently(Connection connection) {
714        String jdbcUrl = null;
715        try {
716            jdbcUrl = connection.getMetaData().getURL();
717        } catch (SQLException e) {
718            // TODO
719        }
720        // If same URL as datasource, use the dataSource
721        if (jdbcUrl != null
722                && jdbcUrl.equals(config.getJdbcURL())
723                && this.dataSource != null) {
724            DataSourceUtils.releaseConnection(connection, this.dataSource);
725        }
726        else {
727            DaoUtils.closeSilently(connection);
728        }
729    }
730
731    protected void prepareSynch(Connection connection) throws SQLException {
732        PreparedStatement statement = connection.prepareStatement("SET REFERENTIAL_INTEGRITY FALSE;");
733        statement.executeUpdate();
734        statement.close();
735    }
736
737    protected void releaseSynch(Connection connection) throws SQLException {
738        PreparedStatement statement = connection.prepareStatement("SET REFERENTIAL_INTEGRITY TRUE;");
739        statement.executeUpdate();
740        statement.close();
741    }
742}