1 package fr.ifremer.adagio.synchro.service.referential;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import static org.nuiton.i18n.I18n.t;
27
28 import java.io.File;
29 import java.sql.Connection;
30 import java.sql.DriverManager;
31 import java.sql.PreparedStatement;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.sql.Timestamp;
35 import java.util.Date;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Properties;
39 import java.util.Set;
40
41 import javax.sql.DataSource;
42
43 import org.apache.commons.collections4.CollectionUtils;
44 import org.apache.commons.io.IOUtils;
45 import org.apache.commons.lang3.StringUtils;
46 import org.apache.commons.lang3.time.DateUtils;
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.hibernate.cfg.Environment;
50 import org.hibernate.dialect.Dialect;
51 import org.nuiton.util.TimeLog;
52 import org.springframework.beans.factory.annotation.Autowired;
53 import org.springframework.context.annotation.Lazy;
54 import org.springframework.jdbc.datasource.DataSourceUtils;
55 import org.springframework.stereotype.Service;
56
57 import com.google.common.base.Preconditions;
58 import com.google.common.base.Predicate;
59 import com.google.common.collect.Maps;
60
61 import fr.ifremer.adagio.synchro.config.SynchroConfiguration;
62 import fr.ifremer.adagio.synchro.dao.DaoUtils;
63 import fr.ifremer.adagio.synchro.dao.SynchroTableDao;
64 import fr.ifremer.adagio.synchro.dao.SynchroTableDaoImpl;
65 import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor;
66 import fr.ifremer.adagio.synchro.meta.SynchroColumnMetadata;
67 import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata;
68 import fr.ifremer.adagio.synchro.meta.SynchroMetadataUtils;
69 import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata;
70 import fr.ifremer.adagio.synchro.service.SynchroBaseService;
71 import fr.ifremer.adagio.synchro.service.SynchroContext;
72 import fr.ifremer.adagio.synchro.service.SynchroResult;
73 import fr.ifremer.adagio.synchro.service.SynchroServiceUtils;
74 import fr.ifremer.adagio.synchro.type.ProgressionModel;
75
76
77
78
79
80
81
82 @Service("referentialSynchroService")
83 @Lazy
84 public class ReferentialSynchroServiceImpl extends SynchroBaseService implements ReferentialSynchroService {
85
86
87 private static final Log log =
88 LogFactory.getLog(ReferentialSynchroServiceImpl.class);
89
90 private static final TimeLog TIME =
91 new TimeLog(ReferentialSynchroServiceImpl.class);
92
93 @Autowired
94 public ReferentialSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) {
95 super(dataSource, config);
96 }
97
98 public ReferentialSynchroServiceImpl() {
99 super();
100 }
101
102 @Override
103 public SynchroContext createSynchroContext(File sourceDbDirectory) {
104
105 String dbName = config.getDbName();
106 Properties targetConnectionProperties = config.getConnectionProperties();
107
108 Properties sourceConnectionProperties = new Properties(targetConnectionProperties);
109 sourceConnectionProperties.setProperty(Environment.URL,
110 DaoUtils.getJdbcUrl(sourceDbDirectory, dbName));
111
112 Set<String> tableToIncludes = config.getImportReferentialTablesIncludes();
113
114 SynchroContext context = SynchroContext.newContext(
115 tableToIncludes,
116 sourceConnectionProperties,
117 targetConnectionProperties,
118 new SynchroResult());
119 return context;
120 }
121
122 @Override
123 public SynchroContext createSynchroContext(Properties sourceConnectionProperties) {
124
125 Properties targetConnectionProperties = config.getConnectionProperties();
126
127 Set<String> tableToIncludes = config.getImportReferentialTablesIncludes();
128
129 SynchroContext context = SynchroContext.newContext(
130 tableToIncludes,
131 sourceConnectionProperties,
132 targetConnectionProperties,
133 new SynchroResult());
134 return context;
135 }
136
137 @Override
138 public void prepare(SynchroContext synchroContext) {
139 Preconditions.checkNotNull(synchroContext);
140
141 Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties();
142 Preconditions.checkNotNull(sourceConnectionProperties);
143
144 Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties();
145 Preconditions.checkNotNull(targetConnectionProperties);
146
147 Set<String> tableNames = synchroContext.getTableNames();
148 Predicate<String> tableFilter = synchroContext.getTableFilter();
149 if (CollectionUtils.isEmpty(tableNames) && tableFilter == null) {
150 log.info(t("adagio.persistence.synchronizeReferential.prepare.noTableFilter"));
151 }
152
153 SynchroResult result = synchroContext.getResult();
154 Preconditions.checkNotNull(result);
155
156 result.setLocalUrl(DaoUtils.getUrl(targetConnectionProperties));
157 result.setRemoteUrl(DaoUtils.getUrl(sourceConnectionProperties));
158
159 Connection targetConnection = null;
160 Connection sourceConnection = null;
161 try {
162
163 ProgressionModel progressionModel = result.getProgressionModel();
164 progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step1"));
165
166
167 targetConnection = createConnection(targetConnectionProperties);
168
169 progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step2"));
170
171
172 sourceConnection = createConnection(sourceConnectionProperties);
173
174
175 SynchroDatabaseMetadata targetMeta =
176 SynchroDatabaseMetadata.loadDatabaseMetadata(
177 targetConnection,
178 DaoUtils.getDialect(targetConnectionProperties),
179 DaoUtils.getConfiguration(targetConnectionProperties),
180 synchroContext,
181 tableNames,
182 tableFilter,
183 null ,
184 false );
185
186 SynchroDatabaseMetadata sourceMeta =
187 SynchroDatabaseMetadata.loadDatabaseMetadata(
188 sourceConnection,
189 DaoUtils.getDialect(sourceConnectionProperties),
190 DaoUtils.getConfiguration(sourceConnectionProperties),
191 synchroContext,
192 tableNames,
193 tableFilter,
194 null ,
195 false );
196
197 progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step3"));
198
199
200 SynchroServiceUtils.checkSchemas(sourceMeta, targetMeta, true, true, result);
201
202 if (result.isSuccess()) {
203
204
205
206 for (String tableName : targetMeta.getLoadedTableNames()) {
207
208 long t0 = TimeLog.getTime();
209
210 progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.prepare.step4", tableName));
211
212 SynchroTableMetadata sourceTable = sourceMeta.getTable(tableName);
213
214 SynchroTableMetadata targetTable = targetMeta.getTable(tableName);
215
216 prepareTable(
217 sourceTable,
218 targetTable,
219 synchroContext,
220 targetConnection,
221 sourceConnection,
222 result);
223
224 TIME.log(t0, "prepare table " + tableName);
225 }
226
227 long totalRows = result.getTotalRows();
228 if (log.isInfoEnabled()) {
229 log.info("Total rows to update: " + totalRows);
230 }
231 targetConnection.rollback();
232 }
233 } catch (SQLException e) {
234 try {
235 if (targetConnection != null) {
236 targetConnection.rollback();
237 }
238 } catch (SQLException e1) {
239
240
241 }
242 result.setError(e);
243 } finally {
244 closeSilently(sourceConnection);
245 closeSilently(targetConnection);
246 }
247 }
248
249 @Override
250 public void synchronize(SynchroContext synchroContext) {
251 Preconditions.checkNotNull(synchroContext);
252
253 Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties();
254 Preconditions.checkNotNull(sourceConnectionProperties);
255
256 Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties();
257 Preconditions.checkNotNull(targetConnectionProperties);
258
259 Set<String> tableNames = synchroContext.getTableNames();
260 Predicate<String> tableFilter = synchroContext.getTableFilter();
261
262 SynchroResult result = synchroContext.getResult();
263 Preconditions.checkNotNull(result);
264
265 Connection targetConnection = null;
266 Connection sourceConnection = null;
267 try {
268
269
270 targetConnection = createConnection(targetConnectionProperties);
271
272
273 sourceConnection = createConnection(sourceConnectionProperties);
274
275
276 Predicate<SynchroColumnMetadata> columnFilter = null;
277 if (!result.getMissingOptionalColumnNameMaps().isEmpty()) {
278 columnFilter = SynchroMetadataUtils.newExcludeColumnPredicate(result.getMissingOptionalColumnNameMaps());
279 }
280
281 Dialect targetDialect = DaoUtils.getDialect(targetConnectionProperties);
282
283
284 SynchroDatabaseMetadata dbMetas =
285 SynchroDatabaseMetadata.loadDatabaseMetadata(
286 targetConnection,
287 DaoUtils.getDialect(targetConnectionProperties),
288 DaoUtils.getConfiguration(targetConnectionProperties),
289 synchroContext,
290 tableNames,
291 tableFilter,
292 columnFilter,
293 false );
294
295
296 ProgressionModel progressionModel = result.getProgressionModel();
297 progressionModel.setTotal(result.getTotalRows());
298
299
300 prepareSynch(targetConnection);
301
302 try {
303
304 for (String tableName : dbMetas.getLoadedTableNames()) {
305
306 long t0 = TimeLog.getTime();
307
308 progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.synchronize.step1", tableName));
309
310 SynchroTableMetadata table = dbMetas.getTable(tableName);
311
312 if (log.isInfoEnabled()) {
313 log.info("Synchronize table: " + tableName);
314 }
315 long countToUpdate = result.getNbRows(tableName);
316
317 if (countToUpdate > 0) {
318
319 synchronizeTable(
320 table,
321 targetConnection,
322 sourceConnection,
323 result);
324 }
325
326 TIME.log(t0, "synchronize table " + tableName);
327 }
328 if (log.isInfoEnabled()) {
329 long totalInserts = result.getTotalInserts();
330 long totalUpdates = result.getTotalUpdates();
331 log.info("Total rows to treat: " + result.getTotalRows());
332 log.info("Total rows inserted: " + totalInserts);
333 log.info("Total rows updated: " + totalUpdates);
334 log.info("Total rows treated: " + (totalInserts + totalUpdates));
335 }
336 } finally {
337 releaseSynch(targetConnection);
338 }
339
340 progressionModel.setMessage(t("adagio.persistence.synchronizeReferential.synchronize.step2"));
341
342 targetConnection.commit();
343
344 } catch (SQLException e) {
345 try {
346 if (targetConnection != null) {
347 targetConnection.rollback();
348 }
349 } catch (SQLException e1) {
350
351
352 }
353 result.setError(e);
354 } finally {
355 closeSilently(sourceConnection);
356 closeSilently(targetConnection);
357 }
358 }
359
360 protected void prepareTable(
361 SynchroTableMetadata sourceTable,
362 SynchroTableMetadata targetTable,
363 SynchroContext context,
364 Connection targetConnection,
365 Connection sourceConnection,
366 SynchroResult result) throws SQLException {
367
368 String tableName = sourceTable.getName();
369 String tablePrefix = sourceTable.getTableLogPrefix();
370
371 if (log.isDebugEnabled()) {
372 log.debug("Prepare table: " + tableName);
373 }
374
375 SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, targetTable, false);
376 SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, sourceTable, false);
377
378 try {
379 long targetCount = targetDao.count();
380
381
382 Timestamp updateDate = null;
383
384 if (targetCount < 50000) {
385
386
387 updateDate = targetDao.getLastUpdateDate();
388
389 if (updateDate != null) {
390
391
392
393
394
395
396 updateDate = new Timestamp(DateUtils.setMilliseconds(updateDate, 0).getTime());
397 updateDate = new Timestamp(DateUtils.addSeconds(updateDate, 1).getTime());
398 }
399 }
400
401 long countToUpdate = sourceDao.countDataToUpdate(updateDate);
402
403 if (log.isInfoEnabled()) {
404 log.info(String.format("%s nb rows to update: %s", tablePrefix, countToUpdate));
405 }
406
407 result.setUpdateDate(tableName, updateDate);
408 result.addRows(tableName, (int) countToUpdate);
409 } finally {
410 IOUtils.closeQuietly(targetDao);
411 IOUtils.closeQuietly(sourceDao);
412 }
413
414 }
415
416 protected void synchronizeTable(
417 SynchroTableMetadata table,
418 Connection targetConnection,
419 Connection sourceConnection,
420 SynchroResult result) throws SQLException {
421
422 String tableName = table.getName();
423
424 result.getProgressionModel().setMessage(t("adagio.persistence.synchronizeReferential.synchronizeTable", tableName));
425
426 SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, table, false);
427 SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, table, true);
428
429
430 Date updateDate = result.getUpdateDate(tableName);
431
432
433 long count = targetDao.count();
434
435 boolean bigTable = count > 50000;
436
437
438 ResultSet dataToUpdate = sourceDao.getDataToUpdate(
439 bigTable ? null : updateDate);
440
441 try {
442
443 if (bigTable) {
444
445
446 updateBigTable(
447 targetDao,
448 sourceDao,
449 dataToUpdate,
450 result);
451 } else {
452
453
454 updateTable(targetDao,
455 dataToUpdate,
456 result);
457 }
458 dataToUpdate.close();
459 } finally {
460
461 IOUtils.closeQuietly(targetDao);
462 IOUtils.closeQuietly(sourceDao);
463 DaoUtils.closeSilently(dataToUpdate);
464 }
465 }
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486 protected void updateTable(SynchroTableDao targetDao,
487 ResultSet incomingData,
488 SynchroResult result) throws SQLException {
489
490 SynchroTableMetadata table = targetDao.getTable();
491
492 String tableName = table.getName();
493 String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
494
495
496 Set<String> existingIds = targetDao.getExistingPrimaryKeys();
497
498 if (log.isDebugEnabled()) {
499 log.debug(tablePrefix + " existing rows: " + existingIds.size());
500 }
501
502 result.addTableName(tableName);
503
504 int countR = 0;
505
506 while (incomingData.next()) {
507
508 List<Object> pk = table.getPk(incomingData);
509 String pkStr = table.toPkStr(pk);
510
511 boolean doUpdate = existingIds.contains(pkStr);
512
513 if (doUpdate) {
514
515 targetDao.executeUpdate(pk, incomingData);
516
517 } else {
518
519 targetDao.executeInsert(incomingData);
520 }
521
522 countR++;
523
524 reportProgress(result, targetDao, countR, tablePrefix);
525 }
526
527 targetDao.flush();
528
529 int insertCount = targetDao.getInsertCount();
530 int updateCount = targetDao.getUpdateCount();
531
532 result.addInserts(tableName, insertCount);
533 result.addUpdates(tableName, updateCount);
534 if (log.isInfoEnabled()) {
535 log.info(String.format("%s done: %s (inserts: %s, updates: %s)", tablePrefix, countR, insertCount, updateCount));
536 }
537
538 if (log.isDebugEnabled()) {
539 log.debug(String.format("%s INSERT count: %s", tablePrefix, result.getNbInserts(tableName)));
540 log.debug(String.format("%s UPDATE count: %s", tablePrefix, result.getNbUpdates(tableName)));
541 }
542
543 result.getProgressionModel().increments(countR % 1000);
544 }
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573 protected void updateBigTable(
574 SynchroTableDao targetDao,
575 SynchroTableDao sourceDao,
576 ResultSet incomingData,
577 SynchroResult result) throws SQLException {
578
579 SynchroTableMetadata table = targetDao.getTable();
580 String tableName = targetDao.getTable().getName();
581
582 result.addTableName(tableName);
583
584 String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
585
586
587 Set<String> existingIds = targetDao.getExistingPrimaryKeys();
588
589 if (log.isDebugEnabled()) {
590 log.debug(tablePrefix + " target existing rows: " + existingIds.size());
591 }
592
593 Set<String> sourceExistingIds = sourceDao.getExistingPrimaryKeys();
594
595 if (log.isDebugEnabled()) {
596 log.debug(tablePrefix + " source existing rows: " + sourceExistingIds.size());
597 }
598
599 existingIds.removeAll(sourceExistingIds);
600
601 if (log.isDebugEnabled()) {
602 log.debug(tablePrefix + " target existing rows not in source: " + existingIds.size());
603 }
604 if (log.isTraceEnabled()) {
605 for (String existingId : existingIds) {
606 log.trace("- " + existingId);
607 }
608 }
609
610
611
612 Map<List<Object>, Object[]> extraRows = Maps.newLinkedHashMap();
613
614 for (String pkStr : existingIds) {
615
616 List<Object> pk = table.fromPkStr(pkStr);
617
618 Object[] extraRow = targetDao.findByPk(pk);
619
620 extraRows.put(pk, extraRow);
621 }
622
623
624
625 List<SynchroInterceptor> interceptors = table.getInterceptors();
626 for (SynchroInterceptor interceptor : interceptors) {
627 extraRows = interceptor.transformExtraLocalData(
628 targetDao,
629 sourceDao,
630 extraRows);
631 if (log.isDebugEnabled()) {
632 log.debug(tablePrefix + " target data existingIds not in source (after apply task): " + extraRows.size());
633 }
634 }
635
636
637 targetDao.deleteAll();
638
639 int countR = 0;
640
641
642 while (incomingData.next()) {
643
644 targetDao.executeInsert(incomingData);
645
646 countR++;
647
648 reportProgress(result, targetDao, countR, tablePrefix);
649 }
650
651
652 for (Map.Entry<List<Object>, Object[]> entry : extraRows.entrySet()) {
653
654 Object[] row = entry.getValue();
655 targetDao.executeInsert(row);
656
657 countR++;
658
659 reportProgress(result, targetDao, countR, tablePrefix);
660 }
661
662 targetDao.flush();
663
664 int insertCount = targetDao.getInsertCount();
665 result.addInserts(tableName, insertCount);
666 if (log.isInfoEnabled()) {
667 log.info(String.format("%s done: %s (inserts: %s)", tablePrefix, countR, insertCount));
668 }
669
670 if (log.isDebugEnabled()) {
671 log.debug(String.format("%s INSERT count: %s", tablePrefix, result.getNbInserts(tableName)));
672 }
673
674 result.getProgressionModel().increments(countR % 1000);
675 }
676
677 protected void reportProgress(SynchroResult result, SynchroTableDao dao, int countR, String tablePrefix) {
678 if (countR % 1000 == 0) {
679 result.getProgressionModel().increments(1000);
680 }
681
682 if (countR % 10000 == 0) {
683 if (log.isInfoEnabled()) {
684 log.info(String.format("%s Done: %s (inserts: %s, updates: %s)", tablePrefix, countR, dao.getInsertCount(), dao.getUpdateCount()));
685 }
686 }
687 }
688
689 protected Connection createConnection(Properties connectionProperties) throws SQLException {
690 return createConnection(
691 connectionProperties.getProperty(Environment.URL),
692 connectionProperties.getProperty(Environment.USER),
693 connectionProperties.getProperty(Environment.PASS));
694 }
695
696 protected Connection createConnection(String jdbcUrl,
697 String user,
698 String password) throws SQLException {
699 Preconditions.checkArgument(StringUtils.isNotBlank(jdbcUrl));
700
701
702 if (jdbcUrl.equals(config.getJdbcURL()) && this.dataSource != null) {
703 return DataSourceUtils.getConnection(this.dataSource);
704 }
705
706 Connection connection = DriverManager.getConnection(jdbcUrl,
707 user,
708 password);
709 connection.setAutoCommit(false);
710 return connection;
711 }
712
713 protected void closeSilently(Connection connection) {
714 String jdbcUrl = null;
715 try {
716 jdbcUrl = connection.getMetaData().getURL();
717 } catch (SQLException e) {
718
719 }
720
721 if (jdbcUrl != null
722 && jdbcUrl.equals(config.getJdbcURL())
723 && this.dataSource != null) {
724 DataSourceUtils.releaseConnection(connection, this.dataSource);
725 }
726 else {
727 DaoUtils.closeSilently(connection);
728 }
729 }
730
731 protected void prepareSynch(Connection connection) throws SQLException {
732 PreparedStatement statement = connection.prepareStatement("SET REFERENTIAL_INTEGRITY FALSE;");
733 statement.executeUpdate();
734 statement.close();
735 }
736
737 protected void releaseSynch(Connection connection) throws SQLException {
738 PreparedStatement statement = connection.prepareStatement("SET REFERENTIAL_INTEGRITY TRUE;");
739 statement.executeUpdate();
740 statement.close();
741 }
742 }