001package fr.ifremer.adagio.synchro.service.data; 002 003/* 004 * #%L 005 * Tutti :: Persistence 006 * $Id: DataSynchroServiceImpl.java 1573 2014-02-04 16:41:40Z tchemit $ 007 * $HeadURL: http://svn.forge.codelutin.com/svn/tutti/trunk/tutti-persistence/src/main/java/fr/ifremer/adagio/core/service/technical/synchro/DataSynchroServiceImpl.java $ 008 * %% 009 * Copyright (C) 2012 - 2014 Ifremer 010 * %% 011 * This program is free software: you can redistribute it and/or modify 012 * it under the terms of the GNU Affero General Public License as published by 013 * the Free Software Foundation, either version 3 of the License, or 014 * (at your option) any later version. 015 * 016 * This program is distributed in the hope that it will be useful, 017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 019 * GNU General Public License for more details. 020 * 021 * You should have received a copy of the GNU Affero General Public License 022 * along with this program. If not, see <http://www.gnu.org/licenses/>. 023 * #L% 024 */ 025 026import static org.nuiton.i18n.I18n.t; 027 028import java.io.File; 029import java.sql.Connection; 030import java.sql.DriverManager; 031import java.sql.ResultSet; 032import java.sql.SQLException; 033import java.sql.Timestamp; 034import java.util.Date; 035import java.util.List; 036import java.util.Map; 037import java.util.Properties; 038import java.util.Set; 039 040import javax.sql.DataSource; 041 042import org.apache.commons.collections4.CollectionUtils; 043import org.apache.commons.io.IOUtils; 044import org.apache.commons.lang3.StringUtils; 045import org.apache.commons.lang3.time.DateUtils; 046import org.apache.commons.logging.Log; 047import org.apache.commons.logging.LogFactory; 048import org.hibernate.cfg.Configuration; 049import org.hibernate.cfg.Environment; 050import org.hibernate.dialect.Dialect; 051import org.nuiton.util.TimeLog; 052import org.springframework.beans.factory.annotation.Autowired; 053import org.springframework.context.annotation.Lazy; 054import org.springframework.jdbc.datasource.DataSourceUtils; 055import org.springframework.stereotype.Service; 056 057import com.google.common.base.Preconditions; 058import com.google.common.base.Predicate; 059import com.google.common.collect.Lists; 060import com.google.common.collect.Maps; 061 062import fr.ifremer.adagio.synchro.config.SynchroConfiguration; 063import fr.ifremer.adagio.synchro.dao.DaoUtils; 064import fr.ifremer.adagio.synchro.dao.SynchroTableDao; 065import fr.ifremer.adagio.synchro.dao.SynchroTableDaoImpl; 066import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor; 067import fr.ifremer.adagio.synchro.meta.SynchroColumnMetadata; 068import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata; 069import fr.ifremer.adagio.synchro.meta.SynchroJoinMetadata; 070import fr.ifremer.adagio.synchro.meta.SynchroMetadataUtils; 071import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata; 072import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata.TableInsertStrategy; 073import fr.ifremer.adagio.synchro.service.SynchroBaseService; 074import fr.ifremer.adagio.synchro.service.SynchroContext; 075import fr.ifremer.adagio.synchro.service.SynchroResult; 076import fr.ifremer.adagio.synchro.service.SynchroServiceUtils; 077import fr.ifremer.adagio.synchro.service.SynchroPendingOperationBuffer; 078import fr.ifremer.adagio.synchro.type.ProgressionModel; 079 080/** 081 * Created on 1/14/14. 082 * 083 * @author Benoit Lavenier <benoit.lavenier@e-is.pro> 084 * @since 3.5.2 085 */ 086@Service("dataSynchroService") 087@Lazy 088public class DataSynchroServiceImpl extends SynchroBaseService implements DataSynchroService { 089 090 /** Logger. */ 091 private static final Log log = 092 LogFactory.getLog(DataSynchroServiceImpl.class); 093 094 private static final TimeLog TIME = 095 new TimeLog(DataSynchroServiceImpl.class); 096 097 public DataSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) { 098 super(dataSource, config); 099 } 100 101 public DataSynchroServiceImpl() { 102 super(); 103 } 104 105 @Override 106 public SynchroContext createSynchroContext(File sourceDbDirectory) { 107 108 String dbName = config.getDbName(); 109 Properties targetConnectionProperties = config.getConnectionProperties(); 110 111 Properties sourceConnectionProperties = new Properties(targetConnectionProperties); 112 sourceConnectionProperties.setProperty(Environment.URL, 113 DaoUtils.getJdbcUrl(sourceDbDirectory, dbName)); 114 115 Set<String> tableToIncludes = config.getImportDataTablesIncludes(); 116 117 SynchroContext context = SynchroContext.newContext( 118 tableToIncludes, 119 sourceConnectionProperties, 120 targetConnectionProperties, 121 new SynchroResult()); 122 return context; 123 } 124 125 @Override 126 public SynchroContext createSynchroContext(Properties sourceConnectionProperties) { 127 128 Properties targetConnectionProperties = config.getConnectionProperties(); 129 130 Set<String> tableToIncludes = config.getImportDataTablesIncludes(); 131 132 SynchroContext context = SynchroContext.newContext( 133 tableToIncludes, 134 sourceConnectionProperties, 135 targetConnectionProperties, 136 new SynchroResult()); 137 return context; 138 } 139 140 @Override 141 public void prepare(SynchroContext synchroContext) { 142 143 Preconditions.checkNotNull(synchroContext); 144 145 Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties(); 146 Preconditions.checkNotNull(sourceConnectionProperties); 147 148 Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties(); 149 Preconditions.checkNotNull(targetConnectionProperties); 150 151 Set<String> tableNames = synchroContext.getTableNames(); 152 Predicate<String> tableFilter = synchroContext.getTableFilter(); 153 if (CollectionUtils.isEmpty(tableNames) && tableFilter == null) { 154 log.info(t("adagio.persistence.synchronizeData.prepare.noTableFilter")); 155 } 156 157 SynchroResult result = synchroContext.getResult(); 158 Preconditions.checkNotNull(result); 159 160 result.setLocalUrl(getUrl(targetConnectionProperties)); 161 result.setRemoteUrl(getUrl(sourceConnectionProperties)); 162 163 Connection targetConnection = null; 164 Connection sourceConnection = null; 165 try { 166 167 ProgressionModel progressionModel = result.getProgressionModel(); 168 progressionModel.setMessage(t("adagio.persistence.synchronizeData.prepare.step1")); 169 170 // create target connection 171 targetConnection = createConnection(targetConnectionProperties); 172 173 progressionModel.setMessage(t("adagio.persistence.synchronizeData.prepare.step2")); 174 175 // create source Connection 176 sourceConnection = createConnection(sourceConnectionProperties); 177 178 // load metas 179 SynchroDatabaseMetadata targetMeta = 180 SynchroDatabaseMetadata.loadDatabaseMetadata( 181 targetConnection, 182 getDialect(targetConnectionProperties), 183 getConfiguration(targetConnectionProperties), 184 synchroContext, 185 tableNames, 186 tableFilter, 187 null, /* no column filter */ 188 true /* load join metadata */); 189 190 SynchroDatabaseMetadata sourceMeta = 191 SynchroDatabaseMetadata.loadDatabaseMetadata( 192 sourceConnection, 193 getDialect(sourceConnectionProperties), 194 getConfiguration(sourceConnectionProperties), 195 synchroContext, 196 tableNames, 197 tableFilter, 198 null, /* no column filter */ 199 true /* load join metadata */); 200 201 progressionModel.setMessage(t("adagio.persistence.synchronizeData.prepare.step3")); 202 203 // check schema 204 SynchroServiceUtils.checkSchemas(sourceMeta, targetMeta, true, false, result); 205 206 if (result.isSuccess()) { 207 208 // prepare model (compute update date, count rows to update,...) 209 Set<String> rootTableNames = targetMeta.getLoadedRootTableNames(); 210 if (rootTableNames.size() == 0 && log.isWarnEnabled()) { 211 log.warn(t("adagio.persistence.synchronizeData.prepare.noRootTable")); 212 } 213 214 for (String tableName : rootTableNames) { 215 216 long t0 = TimeLog.getTime(); 217 218 progressionModel.setMessage(t("adagio.persistence.synchronizeData.prepare.step4", tableName)); 219 220 SynchroTableMetadata sourceTable = sourceMeta.getTable(tableName); 221 SynchroTableMetadata targetTable = targetMeta.getTable(tableName); 222 223 if (log.isDebugEnabled()) { 224 log.debug("Prepare table: " + tableName); 225 } 226 prepareRootTable( 227 sourceTable, 228 targetTable, 229 sourceConnection, 230 targetConnection, 231 result); 232 233 TIME.log(t0, "prepare table " + tableName); 234 } 235 236 long totalRows = result.getTotalRows(); 237 if (log.isInfoEnabled()) { 238 log.info("Total rows to update: " + totalRows); 239 } 240 targetConnection.rollback(); 241 } 242 } catch (SQLException e) { 243 try { 244 if (targetConnection != null) { 245 targetConnection.rollback(); 246 } 247 } catch (SQLException e1) { 248 249 // ignore the rolback error 250 } 251 result.setError(e); 252 } finally { 253 releaseConnection(sourceConnection); 254 releaseConnection(targetConnection); 255 } 256 } 257 258 @Override 259 public void synchronize(SynchroContext synchroContext) { 260 261 Preconditions.checkNotNull(synchroContext); 262 263 Properties sourceConnectionProperties = synchroContext.getSourceConnectionProperties(); 264 Preconditions.checkNotNull(sourceConnectionProperties); 265 266 Properties targetConnectionProperties = synchroContext.getTargetConnectionProperties(); 267 Preconditions.checkNotNull(targetConnectionProperties); 268 269 Set<String> tableNames = synchroContext.getTableNames(); 270 Predicate<String> tableFilter = synchroContext.getTableFilter(); 271 272 SynchroResult result = synchroContext.getResult(); 273 Preconditions.checkNotNull(result); 274 275 Connection targetConnection = null; 276 Connection sourceConnection = null; 277 try { 278 279 // create source Connection 280 sourceConnection = createConnection(sourceConnectionProperties); 281 282 // create target connection 283 targetConnection = createConnection(targetConnectionProperties); 284 285 // Create column filter (exclude missing optional column) 286 Predicate<SynchroColumnMetadata> columnFilter = null; 287 if (!result.getMissingOptionalColumnNameMaps().isEmpty()) { 288 columnFilter = SynchroMetadataUtils.newExcludeColumnPredicate(result.getMissingOptionalColumnNameMaps()); 289 } 290 291 // load metas 292 SynchroDatabaseMetadata dbMetas = 293 SynchroDatabaseMetadata.loadDatabaseMetadata( 294 targetConnection, 295 getDialect(targetConnectionProperties), 296 getConfiguration(targetConnectionProperties), 297 synchroContext, 298 tableNames, 299 tableFilter, 300 columnFilter, 301 true /* load join metadata */ 302 ); 303 304 // set total in progression model 305 ProgressionModel progressionModel = result.getProgressionModel(); 306 progressionModel.setTotal(result.getTotalRows()); 307 308 // For all root table 309 for (String tableName : dbMetas.getLoadedRootTableNames()) { 310 311 SynchroTableMetadata table = dbMetas.getTable(tableName); 312 long t0 = TimeLog.getTime(); 313 314 progressionModel.setMessage(t("adagio.persistence.synchronizeData.synchronize.step1", tableName)); 315 316 if (log.isInfoEnabled()) { 317 log.info("Synchronize root table: " + tableName); 318 } 319 long countToUpdate = result.getNbRows(tableName); 320 321 if (countToUpdate > 0) { 322 323 SynchroPendingOperationBuffer tableBuffer = new SynchroPendingOperationBuffer(tableName); 324 325 synchronizeRootTable( 326 table, 327 synchroContext, 328 sourceConnection, 329 targetConnection, 330 result, 331 tableBuffer); 332 333 TIME.log(t0, "synchronize table " + tableName); 334 335 // Retrieve parent table context. If empty (=no row updated) then exit 336 if (!tableBuffer.isEmpty()) { 337 Set<Integer> updatedRemoteIds = tableBuffer.getRemoteIdsMap().keySet(); 338 339 // Synchronize childs tables 340 synchronizeChildTables(table, updatedRemoteIds, synchroContext, sourceConnection, targetConnection, result, true); 341 } 342 } 343 } 344 345 if (log.isInfoEnabled()) { 346 long totalInserts = result.getTotalInserts(); 347 long totalUpdates = result.getTotalUpdates(); 348 log.info("Total rows to treat: " + result.getTotalRows()); 349 log.info("Total rows inserted: " + totalInserts); 350 log.info("Total rows updated: " + totalUpdates); 351 log.info("Total rows treated: " + (totalInserts + totalUpdates)); 352 } 353 354 progressionModel.setMessage(t("adagio.persistence.synchronizeData.synchronize.step2")); 355 356 targetConnection.commit(); 357 358 } catch (SQLException e) { 359 try { 360 if (targetConnection != null) { 361 targetConnection.rollback(); 362 } 363 } catch (SQLException e1) { 364 365 // ignore the rollback error 366 } 367 result.setError(e); 368 } catch (Exception e) { 369 try { 370 if (targetConnection != null) { 371 targetConnection.rollback(); 372 } 373 } catch (SQLException e1) { 374 375 // ignore the rollback error 376 } 377 result.setError(e); 378 } finally { 379 releaseConnection(sourceConnection); 380 releaseConnection(targetConnection); 381 } 382 } 383 384 protected void prepareRootTable( 385 SynchroTableMetadata sourceTable, 386 SynchroTableMetadata targetTable, 387 Connection sourceConnection, 388 Connection targetConnection, 389 SynchroResult result) throws SQLException { 390 391 String tablePrefix = sourceTable.getTableLogPrefix(); 392 String tableName = sourceTable.getName(); 393 394 SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, targetTable, false); 395 SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, sourceTable, false); 396 397 try { 398 // get last updateDate used by target db 399 Timestamp updateDate = targetDao.getLastUpdateDate(); 400 401 if (updateDate != null) { 402 403 // just inscrements of 1 milisecond to not having same 404 updateDate = new Timestamp(DateUtils.setMilliseconds(updateDate, 0).getTime()); 405 updateDate = new Timestamp(DateUtils.addSeconds(updateDate, 1).getTime()); 406 } 407 408 long countToUpdate = sourceDao.countDataToUpdate(updateDate); 409 410 if (log.isInfoEnabled()) { 411 log.info(String.format("%s nb rows to update: %s", tablePrefix, countToUpdate)); 412 } 413 414 result.setUpdateDate(tableName, updateDate); 415 result.addRows(tableName, (int) countToUpdate); 416 } finally { 417 IOUtils.closeQuietly(targetDao); 418 IOUtils.closeQuietly(sourceDao); 419 } 420 } 421 422 protected void synchronizeRootTable( 423 SynchroTableMetadata table, 424 SynchroContext context, 425 Connection sourceConnection, 426 Connection targetConnection, 427 SynchroResult result, 428 SynchroPendingOperationBuffer tableBuffer) throws SQLException { 429 430 String tableName = table.getName(); 431 432 result.getProgressionModel().setMessage(t("adagio.persistence.synchronizeData.synchronizeTable", tableName)); 433 434 SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, table, false); 435 SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, table, true); 436 437 // get last updateDate used by target db 438 Date updateDate = result.getUpdateDate(tableName); 439 440 ResultSet dataToUpdate = null; 441 try { 442 // get data to update from source db 443 dataToUpdate = sourceDao.getDataToUpdate(updateDate); 444 445 updateTableUsingRemoteId( 446 targetDao, 447 dataToUpdate, 448 result, 449 tableBuffer); 450 } finally { 451 DaoUtils.closeSilently(dataToUpdate); 452 453 IOUtils.closeQuietly(targetDao); 454 IOUtils.closeQuietly(sourceDao); 455 } 456 } 457 458 protected void synchronizeChildTable( 459 SynchroTableMetadata table, 460 String joinColumnName, 461 Set<Integer> joinColumnIds, 462 Connection sourceConnection, 463 Connection targetConnection, 464 SynchroResult result, 465 SynchroPendingOperationBuffer tableBuffer) throws SQLException { 466 467 String tableName = table.getName(); 468 469 result.getProgressionModel().setMessage(t("adagio.persistence.synchronizeData.synchronizeTable", tableName)); 470 471 SynchroTableDao sourceDao = new SynchroTableDaoImpl(sourceConnection, table, false); 472 SynchroTableDao targetDao = new SynchroTableDaoImpl(targetConnection, table, true); 473 474 try { 475 ResultSet dataToUpdate = sourceDao.getDataByFk(joinColumnName, joinColumnIds); 476 477 try { 478 // Table with a REMOTE_ID (and ID) 479 if (table.isWithRemoteIdColumn()) { 480 481 // small table update strategy 482 updateTableUsingRemoteId( 483 targetDao, 484 dataToUpdate, 485 result, 486 tableBuffer); 487 } 488 489 // Association tables, ... 490 else { 491 updateTableNoRemoteId( 492 targetDao, 493 dataToUpdate, 494 result, 495 tableBuffer); 496 } 497 498 } finally { 499 DaoUtils.closeSilently(dataToUpdate); 500 } 501 502 } finally { 503 IOUtils.closeQuietly(targetDao); 504 IOUtils.closeQuietly(sourceDao); 505 } 506 } 507 508 /** 509 * To update the content of the given {@code table} on the target db, 510 * from the given {@code incomingData} of the source db. 511 * <p/> 512 * The algorithm use remote_id : for each row of the {@code incomingData}, if exists on target table, then do an 513 * update, otherwise do a insert. 514 * <p/> 515 * 516 * @param synchroContext 517 * Synchronization context 518 * @param targetDao 519 * connection on the target db 520 * @param incomingData 521 * data to update from the source db 522 * @param result 523 * where to store operation results 524 * @throws SQLException 525 * if any sql errors 526 */ 527 protected void updateTableUsingRemoteId( 528 SynchroTableDao targetDao, 529 ResultSet incomingData, 530 SynchroResult result, 531 SynchroPendingOperationBuffer tableBuffer) throws SQLException { 532 533 SynchroTableMetadata table = targetDao.getTable(); 534 Preconditions.checkArgument(table.isWithRemoteIdColumn()); 535 boolean enableGeneratedIdFirst = table.getInsertStrategy() == TableInsertStrategy.GENERATE_ID_FIRST; 536 537 String tableName = table.getName(); 538 String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName); 539 540 // get existing ids in the target db 541 Map<Integer, Integer> existingRemoteIdsMap = targetDao.getExistingRemoteIdsMap(); 542 if (log.isDebugEnabled()) { 543 log.debug(tablePrefix + " existing rows: " + existingRemoteIdsMap.size()); 544 } 545 546 result.addTableName(tableName); 547 548 int countR = 0; 549 550 // boolean hasChildTables = table.hasChildJoins(); 551 // Map<Integer, Integer> updatedRemoteIds = null; 552 // if (hasChildTables) { 553 // updatedRemoteIds = Maps.newHashMap(); 554 // } 555 556 while (incomingData.next()) { 557 558 Integer remoteId = table.getId(incomingData); 559 Integer localId = existingRemoteIdsMap.get(remoteId); 560 boolean doUpdate = localId != null; 561 562 if (doUpdate) { 563 List<Object> pk = Lists.<Object> newArrayList(localId); 564 targetDao.executeUpdate(pk, incomingData); 565 566 } else { 567 if (enableGeneratedIdFirst) { 568 localId = targetDao.executeInsertAndReturnId(incomingData); 569 // updatedRemoteIds.put(remoteId, localId); 570 } 571 else { 572 targetDao.executeInsert(incomingData); 573 } 574 } 575 576 countR++; 577 578 reportProgress(result, targetDao, countR, tablePrefix); 579 } 580 581 targetDao.flush(); 582 583 int insertCount = targetDao.getInsertCount(); 584 int updateCount = targetDao.getUpdateCount(); 585 586 result.addInserts(tableName, insertCount); 587 result.addUpdates(tableName, updateCount); 588 589 if (log.isInfoEnabled()) { 590 log.info(String.format("%s done: %s (inserts: %s, updates: %s)", tablePrefix, insertCount + updateCount, insertCount, updateCount)); 591 } 592 593 if (log.isDebugEnabled()) { 594 log.debug(String.format("%s INSERT count: %s", tablePrefix, insertCount)); 595 log.debug(String.format("%s UPDATE count: %s", tablePrefix, updateCount)); 596 } 597 598 result.getProgressionModel().increments(countR % 1000); 599 } 600 601 /** 602 * To update the content of the given {@code table} on the target db, 603 * from the given {@code incomingData} of the source db. 604 * The algorithm use a standard update strategy, using primary key. 605 * 606 * @param synchroContext 607 * @param targetDao 608 * @param incomingData 609 * @param result 610 * @throws SQLException 611 */ 612 protected void updateTableNoRemoteId( 613 SynchroTableDao targetDao, 614 ResultSet incomingData, 615 SynchroResult result, 616 SynchroPendingOperationBuffer tableBuffer) throws SQLException { 617 SynchroTableMetadata table = targetDao.getTable(); 618 Preconditions.checkArgument(!table.isWithRemoteIdColumn()); 619 620 String tableName = table.getName(); 621 String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName); 622 623 // get existing ids in the target db 624 Set<String> existingIds = targetDao.getExistingPrimaryKeys(); 625 if (log.isDebugEnabled()) { 626 log.debug(tablePrefix + " existing rows: " + existingIds.size()); 627 } 628 629 result.addTableName(tableName); 630 631 int countR = 0; 632 633 boolean hasChildTables = table.hasChildJoins(); 634 List<List<Object>> updatedPks = null; 635 if (hasChildTables) { 636 updatedPks = Lists.newArrayList(); 637 } 638 639 while (incomingData.next()) { 640 List<Object> pk = targetDao.getPk(incomingData); 641 String pkStr = table.toPkStr(pk); 642 643 boolean doUpdate = existingIds.contains(pkStr); 644 645 if (doUpdate) { 646 647 targetDao.executeUpdate(pk, incomingData); 648 649 } else { 650 651 targetDao.executeInsert(incomingData); 652 } 653 654 if (hasChildTables) { 655 updatedPks.add(pk); 656 } 657 658 countR++; 659 660 reportProgress(result, targetDao, countR, tablePrefix); 661 } 662 663 targetDao.flush(); 664 665 // Put in context (to be used by child join tables) 666 if (hasChildTables && !updatedPks.isEmpty()) { 667 tableBuffer.addPks(updatedPks); 668 } 669 670 int insertCount = targetDao.getInsertCount(); 671 int updateCount = targetDao.getUpdateCount(); 672 673 result.addInserts(tableName, insertCount); 674 result.addUpdates(tableName, updateCount); 675 676 if (log.isInfoEnabled()) { 677 log.info(String.format("%s done: %s (inserts: %s, updates: %s)", tablePrefix, insertCount + updateCount, insertCount, updateCount)); 678 } 679 680 if (log.isDebugEnabled()) { 681 log.debug(String.format("%s INSERT count: %s", tablePrefix, insertCount)); 682 log.debug(String.format("%s UPDATE count: %s", tablePrefix, updateCount)); 683 } 684 685 result.getProgressionModel().increments(countR % 1000); 686 } 687 688 Connection createConnection(Properties connectionProperties) throws SQLException { 689 return createConnection( 690 connectionProperties.getProperty(Environment.URL), 691 connectionProperties.getProperty(Environment.USER), 692 connectionProperties.getProperty(Environment.PASS)); 693 } 694 695 String getUrl(Properties connectionProperties) { 696 return connectionProperties.getProperty(Environment.URL); 697 } 698 699 Dialect getDialect(Properties connectionProperties) { 700 return Dialect.getDialect(connectionProperties); 701 } 702 703 Configuration getConfiguration(Properties connectionProperties) { 704 return new Configuration().setProperties(connectionProperties); 705 } 706 707 Connection createConnection(String jdbcUrl, 708 String user, 709 String password) throws SQLException { 710 Preconditions.checkArgument(StringUtils.isNotBlank(jdbcUrl)); 711 712 // If same URL as datasource, use the dataSource 713 if (jdbcUrl.equals(config.getJdbcURL()) && this.dataSource != null) { 714 return DataSourceUtils.getConnection(this.dataSource); 715 } 716 717 Connection connection = DriverManager.getConnection(jdbcUrl, 718 user, 719 password); 720 connection.setAutoCommit(false); 721 return connection; 722 } 723 724 void releaseConnection(Connection connection) { 725 DaoUtils.closeSilently(connection); 726 } 727 728 protected Properties getRemoteProperties(File dbDirectory) { 729 Properties sourceConnectionProperties = new Properties(); 730 SynchroConfiguration config = SynchroConfiguration.getInstance(); 731 732 String jdbcUrl = DaoUtils.getJdbcUrl(dbDirectory, 733 config.getDbName()); 734 735 DaoUtils.fillConnectionProperties(sourceConnectionProperties, 736 jdbcUrl, 737 config.getJdbcUsername(), 738 config.getJdbcPassword()); 739 return sourceConnectionProperties; 740 } 741 742 protected void synchronizeChildTables( 743 SynchroTableMetadata parentTable, 744 Set<Integer> parentRemoteIds, 745 SynchroContext context, 746 Connection sourceConnection, 747 Connection targetConnection, 748 SynchroResult result, 749 boolean enableLogCount) throws SQLException { 750 751 Preconditions.checkNotNull(parentTable); 752 Preconditions.checkNotNull(parentRemoteIds); 753 Preconditions.checkArgument(!parentRemoteIds.isEmpty()); 754 755 List<SynchroTableMetadata> updatedTables = Lists.newArrayList(); 756 List<SynchroPendingOperationBuffer> updatedTablesBuffers = Lists.newArrayList(); 757 758 for (SynchroJoinMetadata join : parentTable.getChildJoins()) { 759 long t0 = TimeLog.getTime(); 760 761 SynchroTableMetadata table = join.getTargetTable(); 762 String tableName = table.getName(); 763 if (log.isInfoEnabled()) { 764 log.info(String.format("Synchronize table: %s (as child of %s)", tableName, parentTable.getName())); 765 } 766 767 SynchroPendingOperationBuffer penginOperationBuffer = new SynchroPendingOperationBuffer(tableName); 768 769 // Retrieve the table to update, from the join 770 String joinColumnName = join.getTargetColumn().getName(); 771 772 synchronizeChildTable( 773 table, 774 joinColumnName, 775 parentRemoteIds, 776 sourceConnection, 777 targetConnection, 778 result, 779 penginOperationBuffer); 780 781 TIME.log(t0, "synchronize table " + tableName); 782 783 if (!penginOperationBuffer.isEmpty()) { 784 updatedTables.add(table); 785 updatedTablesBuffers.add(penginOperationBuffer); 786 } 787 } 788 789 // Recursive call, for each child of the processed child tables 790 for (int i = 0; i < updatedTables.size(); i++) { 791 SynchroTableMetadata table = updatedTables.get(i); 792 SynchroPendingOperationBuffer tableBuffer = updatedTablesBuffers.get(i); 793 794 Set<Integer> updatedRemoteIds = tableBuffer.getRemoteIdsMap().keySet(); 795 synchronizeChildTables(table, updatedRemoteIds, context, sourceConnection, targetConnection, result, false); 796 } 797 } 798}