View Javadoc
1   package fr.ifremer.adagio.synchro.service.referential;
2   
3   /*
4    * #%L
5    * Tutti :: Persistence
6    * $Id: ReferentialSynchroServiceImpl.java 1573 2014-02-04 16:41:40Z tchemit $
7    * $HeadURL: http://svn.forge.codelutin.com/svn/tutti/trunk/tutti-persistence/src/main/java/fr/ifremer/adagio/core/service/technical/synchro/ReferentialSynchroServiceImpl.java $
8    * %%
9    * Copyright (C) 2012 - 2014 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   * 
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   * 
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import static org.nuiton.i18n.I18n.t;
27  
28  import java.io.File;
29  import java.sql.Connection;
30  import java.sql.DriverManager;
31  import java.sql.PreparedStatement;
32  import java.sql.ResultSet;
33  import java.sql.SQLException;
34  import java.sql.Timestamp;
35  import java.util.Date;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Properties;
39  import java.util.Set;
40  
41  import javax.sql.DataSource;
42  
43  import org.apache.commons.collections4.CollectionUtils;
44  import org.apache.commons.io.IOUtils;
45  import org.apache.commons.lang3.StringUtils;
46  import org.apache.commons.lang3.time.DateUtils;
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.hibernate.cfg.Environment;
50  import org.hibernate.dialect.Dialect;
51  import org.nuiton.util.TimeLog;
52  import org.springframework.beans.factory.annotation.Autowired;
53  import org.springframework.context.annotation.Lazy;
54  import org.springframework.jdbc.datasource.DataSourceUtils;
55  import org.springframework.stereotype.Service;
56  
57  import com.google.common.base.Preconditions;
58  import com.google.common.base.Predicate;
59  import com.google.common.collect.Maps;
60  
61  import fr.ifremer.adagio.synchro.config.SynchroConfiguration;
62  import fr.ifremer.adagio.synchro.dao.DaoUtils;
63  import fr.ifremer.adagio.synchro.dao.SynchroTableDao;
64  import fr.ifremer.adagio.synchro.dao.SynchroTableDaoImpl;
65  import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor;
66  import fr.ifremer.adagio.synchro.meta.SynchroColumnMetadata;
67  import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata;
68  import fr.ifremer.adagio.synchro.meta.SynchroMetadataUtils;
69  import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata;
70  import fr.ifremer.adagio.synchro.service.SynchroBaseService;
71  import fr.ifremer.adagio.synchro.service.SynchroContext;
72  import fr.ifremer.adagio.synchro.service.SynchroResult;
73  import fr.ifremer.adagio.synchro.service.SynchroServiceUtils;
74  import fr.ifremer.adagio.synchro.type.ProgressionModel;
75  
76  /**
77   * Created on 1/14/14.
78   * 
79   * @author Tony Chemit <chemit@codelutin.com>
80   * @since 3.0
81   */
82  @Service("referentialSynchroService")
83  @Lazy
84  public class ReferentialSynchroServiceImpl extends SynchroBaseService implements ReferentialSynchroService {
85  
86  	/** Logger. */
87  	private static final Log log =
88  			LogFactory.getLog(ReferentialSynchroServiceImpl.class);
89  
90  	private static final TimeLog TIME =
91  			new TimeLog(ReferentialSynchroServiceImpl.class);
92  
93  	@Autowired
94  	public ReferentialSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) {
95  		super(dataSource, config);
96  	}
97  
98  	public ReferentialSynchroServiceImpl() {
99  		super();
100 	}
101 
102 	@Override
103 	public SynchroContext createSynchroContext(File sourceDbDirectory) {
104 
105 		String dbName = config.getDbName();
106 		Properties targetConnectionProperties = config.getConnectionProperties();
107 
108 		Properties sourceConnectionProperties = new Properties(targetConnectionProperties);
109 		sourceConnectionProperties.setProperty(Environment.URL,
110 				DaoUtils.getJdbcUrl(sourceDbDirectory, dbName));
111 
112 		Set<String> tableToIncludes = config.getImportReferentialTablesIncludes();
113 
114 		SynchroContext context = SynchroContext.newContext(
115 				tableToIncludes,
116 				sourceConnectionProperties,
117 				targetConnectionProperties,
118 				new SynchroResult());
119 		return context;
120 	}
121 
122 	@Override
123 	public SynchroContext createSynchroContext(Properties sourceConnectionProperties) {
124 
125 		Properties targetConnectionProperties = config.getConnectionProperties();
126 
127 		Set<String> tableToIncludes = config.getImportReferentialTablesIncludes();
128 
129 		SynchroContext context = SynchroContext.newContext(
130 				tableToIncludes,
131 				sourceConnectionProperties,
132 				targetConnectionProperties,
133 				new SynchroResult());
134 		return context;
135 	}
136 
137 	@Override
138 	public void prepare(SynchroContext synchroContext) {
139 		Preconditions.checkNotNull(synchroContext);
140 
141 		Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties();
142 		Preconditions.checkNotNull(sourceConnectionProperties);
143 
144 		Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties();
145 		Preconditions.checkNotNull(targetConnectionProperties);
146 
147 		Set<String> tableNames = synchroContext.getTableNames();
148 		Predicate<String> tableFilter = synchroContext.getTableFilter();
149 		if (CollectionUtils.isEmpty(tableNames) && tableFilter == null) {
150 			log.info(t("adagio.persistence.synchronizeReferential.prepare.noTableFilter"));
151 		}
152 
153 		SynchroResult result = synchroContext.getResult();
154 		Preconditions.checkNotNull(result);
155 
156 		result.setLocalUrl(DaoUtils.getUrl(targetConnectionProperties));
157 		result.setRemoteUrl(DaoUtils.getUrl(sourceConnectionProperties));
158 
159 		Connection targetConnection = null;
160 		Connection sourceConnection = null;
161 		try {
162 
163 			ProgressionModel progressionModel = result.getProgressionModel();
164 			progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step1"));
165 
166 			// create target connection
167 			targetConnection = createConnection(targetConnectionProperties);
168 
169 			progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step2"));
170 
171 			// create source Connection
172 			sourceConnection = createConnection(sourceConnectionProperties);
173 
174 			// load metas
175 			SynchroDatabaseMetadata targetMeta =
176 					SynchroDatabaseMetadata.loadDatabaseMetadata(
177 							targetConnection,
178 							DaoUtils.getDialect(targetConnectionProperties),
179 							DaoUtils.getConfiguration(targetConnectionProperties),
180 							synchroContext,
181 							tableNames,
182 							tableFilter,
183 							null /* no column filter */,
184 							false /* do not load joins */);
185 
186 			SynchroDatabaseMetadata sourceMeta =
187 					SynchroDatabaseMetadata.loadDatabaseMetadata(
188 							sourceConnection,
189 							DaoUtils.getDialect(sourceConnectionProperties),
190 							DaoUtils.getConfiguration(sourceConnectionProperties),
191 							synchroContext,
192 							tableNames,
193 							tableFilter,
194 							null /* no column filter */,
195 							false /* do not load joins */);
196 
197 			progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step3"));
198 
199 			// check schema
200 			SynchroServiceUtils.checkSchemas(sourceMeta, targetMeta, true, true, result);
201 
202 			if (result.isSuccess()) {
203 
204 				// prepare model (compute update date, count rows to update,...)
205 
206 				for (String tableName : targetMeta.getLoadedTableNames()) {
207 
208 					long t0 = TimeLog.getTime();
209 
210 					progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step4", tableName));
211 
212 					SynchroTableMetadata sourceTable = sourceMeta.getTable(tableName);
213 
214 					SynchroTableMetadata targetTable = targetMeta.getTable(tableName);
215 
216 					prepareTable(
217 							sourceTable,
218 							targetTable,
219 							synchroContext,
220 							targetConnection,
221 							sourceConnection,
222 							result);
223 
224 					TIME.log(t0, "prepare table " + tableName);
225 				}
226 
227 				long totalRows = result.getTotalRows();
228 				if (log.isInfoEnabled()) {
229 					log.info("Total rows to update: " + totalRows);
230 				}
231 				targetConnection.rollback();
232 			}
233 		} catch (SQLException e) {
234 			try {
235 				if (targetConnection != null) {
236 					targetConnection.rollback();
237 				}
238 			} catch (SQLException e1) {
239 
240 				// ignore the rollback error
241 			}
242 			result.setError(e);
243 		} finally {
244 			closeSilently(sourceConnection);
245 			closeSilently(targetConnection);
246 		}
247 	}
248 
249 	@Override
250 	public void synchronize(SynchroContext synchroContext) {
251 		Preconditions.checkNotNull(synchroContext);
252 
253 		Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties();
254 		Preconditions.checkNotNull(sourceConnectionProperties);
255 
256 		Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties();
257 		Preconditions.checkNotNull(targetConnectionProperties);
258 
259 		Set<String> tableNames = synchroContext.getTableNames();
260 		Predicate<String> tableFilter = synchroContext.getTableFilter();
261 
262 		SynchroResult result = synchroContext.getResult();
263 		Preconditions.checkNotNull(result);
264 
265 		Connection targetConnection = null;
266 		Connection sourceConnection = null;
267 		try {
268 
269 			// create target connection
270 			targetConnection = createConnection(targetConnectionProperties);
271 
272 			// create source Connection
273 			sourceConnection = createConnection(sourceConnectionProperties);
274 
275 			// Create column filter (exclude missing optional column)
276 			Predicate<SynchroColumnMetadata> columnFilter = null;
277 			if (!result.getMissingOptionalColumnNameMaps().isEmpty()) {
278 				columnFilter = SynchroMetadataUtils.newExcludeColumnPredicate(result.getMissingOptionalColumnNameMaps());
279 			}
280 
281 			Dialect targetDialect = DaoUtils.getDialect(targetConnectionProperties);
282 
283 			// load metas
284 			SynchroDatabaseMetadata dbMetas =
285 					SynchroDatabaseMetadata.loadDatabaseMetadata(
286 							targetConnection,
287 							DaoUtils.getDialect(targetConnectionProperties),
288 							DaoUtils.getConfiguration(targetConnectionProperties),
289 							synchroContext,
290 							tableNames,
291 							tableFilter,
292 							columnFilter,
293 							false /* do not load join metadata */);
294 
295 			// set total in progression model
296 			ProgressionModel progressionModel = result.getProgressionModel();
297 			progressionModel.setTotal(result.getTotalRows());
298 
299 			// prepare target (desactivate constraints)
300 			prepareSynch(targetConnection);
301 
302 			try {
303 
304 				for (String tableName : dbMetas.getLoadedTableNames()) {
305 
306 					long t0 = TimeLog.getTime();
307 
308 					progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.synchronize.step1", tableName));
309 
310 					SynchroTableMetadata table = dbMetas.getTable(tableName);
311 
312 					if (log.isInfoEnabled()) {
313 						log.info("Synchronize table: " + tableName);
314 					}
315 					long countToUpdate = result.getNbRows(tableName);
316 
317 					if (countToUpdate > 0) {
318 
319 						synchronizeTable(
320 								table,
321 								targetConnection,
322 								sourceConnection,
323 								result);
324 					}
325 
326 					TIME.log(t0, "synchronize table " + tableName);
327 				}
328 				if (log.isInfoEnabled()) {
329 					long totalInserts = result.getTotalInserts();
330 					long totalUpdates = result.getTotalUpdates();
331 					log.info("Total rows to treat: " + result.getTotalRows());
332 					log.info("Total rows inserted: " + totalInserts);
333 					log.info("Total rows  updated: " + totalUpdates);
334 					log.info("Total rows  treated: " + (totalInserts + totalUpdates));
335 				}
336 			} finally {
337 				releaseSynch(targetConnection);
338 			}
339 
340 			progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.synchronize.step2"));
341 
342 			targetConnection.commit();
343 
344 		} catch (SQLException e) {
345 			try {
346 				if (targetConnection != null) {
347 					targetConnection.rollback();
348 				}
349 			} catch (SQLException e1) {
350 
351 				// ignore the rolback error
352 			}
353 			result.setError(e);
354 		} finally {
355 			closeSilently(sourceConnection);
356 			closeSilently(targetConnection);
357 		}
358 	}
359 
360 	protected void prepareTable(
361 			SynchroTableMetadata sourceTable,
362 			SynchroTableMetadata targetTable,
363 			SynchroContext context,
364 			Connection targetConnection,
365 			Connection sourceConnection,
366 			SynchroResult result) throws SQLException {
367 
368 		String tableName = sourceTable.getName();
369 		String tablePrefix = sourceTable.getTableLogPrefix();
370 
371 		if (log.isDebugEnabled()) {
372 			log.debug("Prepare table: " + tableName);
373 		}
374 
375 		SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, targetTable, false);
376 		SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, sourceTable, false);
377 
378 		try {
379 			long targetCount = targetDao.count();
380 
381 			// get last updateDate used by target db
382 			Timestamp updateDate = null;
383 
384 			if (targetCount < 50000) {
385 
386 				// only use the update date on small table, for big table we will re-insert all the table content
387 				updateDate = targetDao.getLastUpdateDate();
388 
389 				if (updateDate != null) {
390 
391 					// just inscrements of 1 milisecond to not having same
392 					// TODO BL : attention, a cause des transactions parfois longues sur le serveur Oracle, il faut
393 					// peut-etre justement prendre une date inférieure au max(update_date) ?? genre max(update_date) -
394 					// 2h ?
395 					// Ou mieux : stocker puis utiliser une date de dernière mise à jour (systimestamp côté serveur)
396 					updateDate = new Timestamp(DateUtils.setMilliseconds(updateDate, 0).getTime());
397 					updateDate = new Timestamp(DateUtils.addSeconds(updateDate, 1).getTime());
398 				}
399 			}
400 
401 			long countToUpdate = sourceDao.countDataToUpdate(updateDate);
402 
403 			if (log.isInfoEnabled()) {
404 				log.info(String.format("%s nb rows to update: %s", tablePrefix, countToUpdate));
405 			}
406 
407 			result.setUpdateDate(tableName, updateDate);
408 			result.addRows(tableName, (int) countToUpdate);
409 		} finally {
410 			IOUtils.closeQuietly(targetDao);
411 			IOUtils.closeQuietly(sourceDao);
412 		}
413 
414 	}
415 
416 	protected void synchronizeTable(
417 			SynchroTableMetadata table,
418 			Connection targetConnection,
419 			Connection sourceConnection,
420 			SynchroResult result) throws SQLException {
421 
422 		String tableName = table.getName();
423 
424 		result.getProgressionModel().setMessage(t("adagio.persistence.synchronizeReferential.synchronizeTable", tableName));
425 
426 		SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, table, false);
427 		SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, table, true);
428 
429 		// get last updateDate used by target db
430 		Date updateDate = result.getUpdateDate(tableName);
431 
432 		// get table count
433 		long count = targetDao.count();
434 
435 		boolean bigTable = count > 50000;
436 
437 		// get data to update from source db
438 		ResultSet dataToUpdate = sourceDao.getDataToUpdate(
439 				bigTable ? null : updateDate);
440 
441 		try {
442 
443 			if (bigTable) {
444 
445 				// big table update strategy
446 				updateBigTable(
447 						targetDao,
448 						sourceDao,
449 						dataToUpdate,
450 						result);
451 			} else {
452 
453 				// small table update strategy
454 				updateTable(targetDao,
455 						dataToUpdate,
456 						result);
457 			}
458 			dataToUpdate.close();
459 		} finally {
460 
461 			IOUtils.closeQuietly(targetDao);
462 			IOUtils.closeQuietly(sourceDao);
463 			DaoUtils.closeSilently(dataToUpdate);
464 		}
465 	}
466 
467 	/**
468 	 * To update the content of the given {@code table} on the target db,
469 	 * from the given {@code incomingData} of the source db.
470 	 * <p/>
471 	 * The algorithm is pretty simple, for each row of the {@code incomingData}, if exists on target table, then do an
472 	 * update, otherwise do a insert.
473 	 * <p/>
474 	 * As an update query is more expensive, we won't use this method for table with a lot of rows, we will prefer to
475 	 * use the {@code updateBigTable} method instead.
476 	 * 
477 	 * @param targetDao
478 	 *            connection on the target db
479 	 * @param incomingData
480 	 *            data to update from the source db
481 	 * @param result
482 	 *            where to store operation results
483 	 * @throws SQLException
484 	 *             if any sql errors
485 	 */
486 	protected void updateTable(SynchroTableDao targetDao,
487 			ResultSet incomingData,
488 			SynchroResult result) throws SQLException {
489 
490 		SynchroTableMetadata table = targetDao.getTable();
491 
492 		String tableName = table.getName();
493 		String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
494 
495 		// get existing ids in the target db
496 		Set<String> existingIds = targetDao.getExistingPrimaryKeys();
497 
498 		if (log.isDebugEnabled()) {
499 			log.debug(tablePrefix + " existing rows: " + existingIds.size());
500 		}
501 
502 		result.addTableName(tableName);
503 
504 		int countR = 0;
505 
506 		while (incomingData.next()) {
507 
508 			List<Object> pk = table.getPk(incomingData);
509 			String pkStr = table.toPkStr(pk);
510 
511 			boolean doUpdate = existingIds.contains(pkStr);
512 
513 			if (doUpdate) {
514 
515 				targetDao.executeUpdate(pk, incomingData);
516 
517 			} else {
518 
519 				targetDao.executeInsert(incomingData);
520 			}
521 
522 			countR++;
523 
524 			reportProgress(result, targetDao, countR, tablePrefix);
525 		}
526 
527 		targetDao.flush();
528 
529 		int insertCount = targetDao.getInsertCount();
530 		int updateCount = targetDao.getUpdateCount();
531 
532 		result.addInserts(tableName, insertCount);
533 		result.addUpdates(tableName, updateCount);
534 		if (log.isInfoEnabled()) {
535 			log.info(String.format("%s done: %s (inserts: %s, updates: %s)", tablePrefix, countR, insertCount, updateCount));
536 		}
537 
538 		if (log.isDebugEnabled()) {
539 			log.debug(String.format("%s INSERT count: %s", tablePrefix, result.getNbInserts(tableName)));
540 			log.debug(String.format("%s UPDATE count: %s", tablePrefix, result.getNbUpdates(tableName)));
541 		}
542 
543 		result.getProgressionModel().increments(countR % 1000);
544 	}
545 
546 	/**
547 	 * To update the content of the given {@code table} (with a lot of rows) on
548 	 * the target db, from the given {@code incomingData} of the source db.
549 	 * <p/>
550 	 * We can't use the simple algorithm, since update queries cost too much and is not acceptable when talking on huge
551 	 * numbers of rows.
552 	 * <p/>
553 	 * Here is what to do :
554 	 * <ul>
555 	 * <li>Get form the target db the data which are not in source db, keep them</li>
556 	 * <li>Delete target table content</li>
557 	 * <li>Insert source table in target table</li>
558 	 * <li>Insert the saved extra rows from original table</li>
559 	 * </ul>
560 	 * In that way we will only perform some insert queries.
561 	 * 
562 	 * @param dbMetas
563 	 * @param targetDao
564 	 *            connection on the target db
565 	 * @param sourceDao
566 	 *            connection on the target db
567 	 * @param incomingData
568 	 *            data to update from the source db
569 	 * @param interceptor
570 	 * @param result
571 	 *            where to store operation results @throws SQLException if any sql errors
572 	 */
573 	protected void updateBigTable(
574 			SynchroTableDao targetDao,
575 			SynchroTableDao sourceDao,
576 			ResultSet incomingData,
577 			SynchroResult result) throws SQLException {
578 
579 		SynchroTableMetadata table = targetDao.getTable();
580 		String tableName = targetDao.getTable().getName();
581 
582 		result.addTableName(tableName);
583 
584 		String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
585 
586 		// get existing ids in the target db
587 		Set<String> existingIds = targetDao.getExistingPrimaryKeys();
588 
589 		if (log.isDebugEnabled()) {
590 			log.debug(tablePrefix + " target existing rows: " + existingIds.size());
591 		}
592 
593 		Set<String> sourceExistingIds = sourceDao.getExistingPrimaryKeys();
594 
595 		if (log.isDebugEnabled()) {
596 			log.debug(tablePrefix + " source existing rows: " + sourceExistingIds.size());
597 		}
598 
599 		existingIds.removeAll(sourceExistingIds);
600 
601 		if (log.isDebugEnabled()) {
602 			log.debug(tablePrefix + " target existing rows not in source: " + existingIds.size());
603 		}
604 		if (log.isTraceEnabled()) {
605 			for (String existingId : existingIds) {
606 				log.trace("- " + existingId);
607 			}
608 		}
609 
610 		// copy extra rows from target
611 
612 		Map<List<Object>, Object[]> extraRows = Maps.newLinkedHashMap();
613 
614 		for (String pkStr : existingIds) {
615 
616 			List<Object> pk = table.fromPkStr(pkStr);
617 
618 			Object[] extraRow = targetDao.findByPk(pk);
619 
620 			extraRows.put(pk, extraRow);
621 		}
622 
623 		// remove obsolete extra rows
624 
625 		List<SynchroInterceptor> interceptors = table.getInterceptors();
626 		for (SynchroInterceptor interceptor : interceptors) {
627 			extraRows = interceptor.transformExtraLocalData(
628 					targetDao,
629 					sourceDao,
630 					extraRows);
631 			if (log.isDebugEnabled()) {
632 				log.debug(tablePrefix + " target data existingIds not in source (after apply task): " + extraRows.size());
633 			}
634 		}
635 
636 		// delete table
637 		targetDao.deleteAll();
638 
639 		int countR = 0;
640 
641 		// add all data from source
642 		while (incomingData.next()) {
643 
644 			targetDao.executeInsert(incomingData);
645 
646 			countR++;
647 
648 			reportProgress(result, targetDao, countR, tablePrefix);
649 		}
650 
651 		// re-add extra target rows
652 		for (Map.Entry<List<Object>, Object[]> entry : extraRows.entrySet()) {
653 
654 			Object[] row = entry.getValue();
655 			targetDao.executeInsert(row);
656 
657 			countR++;
658 
659 			reportProgress(result, targetDao, countR, tablePrefix);
660 		}
661 
662 		targetDao.flush();
663 
664 		int insertCount = targetDao.getInsertCount();
665 		result.addInserts(tableName, insertCount);
666 		if (log.isInfoEnabled()) {
667 			log.info(String.format("%s done: %s (inserts: %s)", tablePrefix, countR, insertCount));
668 		}
669 
670 		if (log.isDebugEnabled()) {
671 			log.debug(String.format("%s INSERT count: %s", tablePrefix, result.getNbInserts(tableName)));
672 		}
673 
674 		result.getProgressionModel().increments(countR % 1000);
675 	}
676 
677 	protected void reportProgress(SynchroResult result, SynchroTableDao dao, int countR, String tablePrefix) {
678 		if (countR % 1000 == 0) {
679 			result.getProgressionModel().increments(1000);
680 		}
681 
682 		if (countR % 10000 == 0) {
683 			if (log.isInfoEnabled()) {
684 				log.info(String.format("%s Done: %s (inserts: %s, updates: %s)", tablePrefix, countR, dao.getInsertCount(), dao.getUpdateCount()));
685 			}
686 		}
687 	}
688 
689 	protected Connection createConnection(Properties connectionProperties) throws SQLException {
690 		return createConnection(
691 				connectionProperties.getProperty(Environment.URL),
692 				connectionProperties.getProperty(Environment.USER),
693 				connectionProperties.getProperty(Environment.PASS));
694 	}
695 
696 	protected Connection createConnection(String jdbcUrl,
697 			String user,
698 			String password) throws SQLException {
699 		Preconditions.checkArgument(StringUtils.isNotBlank(jdbcUrl));
700 
701 		// If same URL as datasource, use the dataSource
702 		if (jdbcUrl.equals(config.getJdbcURL()) && this.dataSource != null) {
703 			return DataSourceUtils.getConnection(this.dataSource);
704 		}
705 
706 		Connection connection = DriverManager.getConnection(jdbcUrl,
707 				user,
708 				password);
709 		connection.setAutoCommit(false);
710 		return connection;
711 	}
712 
713 	protected void closeSilently(Connection connection) {
714 		String jdbcUrl = null;
715 		try {
716 			jdbcUrl = connection.getMetaData().getURL();
717 		} catch (SQLException e) {
718 			// TODO
719 		}
720 		// If same URL as datasource, use the dataSource
721 		if (jdbcUrl != null
722 				&& jdbcUrl.equals(config.getJdbcURL())
723 				&& this.dataSource != null) {
724 			DataSourceUtils.releaseConnection(connection, this.dataSource);
725 		}
726 		else {
727 			DaoUtils.closeSilently(connection);
728 		}
729 	}
730 
731 	protected void prepareSynch(Connection connection) throws SQLException {
732 		PreparedStatement statement = connection.prepareStatement("SET REFERENTIAL_INTEGRITY FALSE;");
733 		statement.executeUpdate();
734 		statement.close();
735 	}
736 
737 	protected void releaseSynch(Connection connection) throws SQLException {
738 		PreparedStatement statement = connection.prepareStatement("SET REFERENTIAL_INTEGRITY TRUE;");
739 		statement.executeUpdate();
740 		statement.close();
741 	}
742 }