001package fr.ifremer.adagio.synchro.dao; 002 003/* 004 * #%L 005 * SIH-Adagio :: Synchronization 006 * $Id:$ 007 * $HeadURL:$ 008 * %% 009 * Copyright (C) 2012 - 2014 Ifremer 010 * %% 011 * This program is free software: you can redistribute it and/or modify 012 * it under the terms of the GNU Affero General Public License as published by 013 * the Free Software Foundation, either version 3 of the License, or 014 * (at your option) any later version. 015 * 016 * This program is distributed in the hope that it will be useful, 017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 019 * GNU General Public License for more details. 020 * 021 * You should have received a copy of the GNU Affero General Public License 022 * along with this program. If not, see <http://www.gnu.org/licenses/>. 023 * #L% 024 */ 025 026import java.io.IOException; 027import java.sql.Connection; 028import java.sql.PreparedStatement; 029import java.sql.ResultSet; 030import java.sql.SQLException; 031import java.sql.Statement; 032import java.sql.Timestamp; 033import java.sql.Types; 034import java.util.Date; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038 039import org.apache.commons.collections.CollectionUtils; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042 043import com.google.common.base.Preconditions; 044import com.google.common.collect.Lists; 045import com.google.common.collect.Maps; 046import com.google.common.collect.Sets; 047 048import fr.ifremer.adagio.synchro.SynchroTechnicalException; 049import fr.ifremer.adagio.synchro.config.SynchroConfiguration; 050import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor; 051import fr.ifremer.adagio.synchro.intercept.SynchroInterceptorBase; 052import fr.ifremer.adagio.synchro.intercept.SynchroInterceptorUtils; 053import fr.ifremer.adagio.synchro.intercept.SynchroWriteBuffer; 054import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata; 055import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata; 056import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata.TableInsertStrategy; 057import fr.ifremer.adagio.synchro.service.SynchroContext; 058import fr.ifremer.adagio.synchro.service.SynchroPendingOperationBuffer; 059 060public class SynchroTableDaoImpl implements SynchroTableDao { 061 062 /** Logger. */ 063 private static final Log log = 064 LogFactory.getLog(SynchroTableDaoImpl.class); 065 066 public static final String TABLE_TEMP_QUERY_PARAMETER = "temp_query_parameter"; 067 068 protected final Connection connection; 069 070 protected final SynchroDatabaseMetadata dbMeta; 071 072 protected final SynchroTableMetadata table; 073 074 protected final PreparedStatement insertStatement; 075 076 protected final PreparedStatement insertQueryParameterStatement; 077 078 protected final PreparedStatement updateStatement; 079 080 protected final List<PreparedStatement> selectStatements; 081 082 protected final boolean insertStrategyGenerateIdFirst; 083 084 protected final int columnCount; 085 086 protected final int batchSize; 087 088 protected final String tableName; 089 090 protected boolean debug; 091 092 protected int insertCount = 0; 093 094 protected int updateCount = 0; 095 096 protected final int idColumnIndex; 097 098 protected final SynchroInterceptor readInterceptor; 099 100 protected final SynchroInterceptor writeInterceptor; 101 102 protected SynchroPendingOperationBuffer pendingOperationBuffer; 103 104 public SynchroTableDaoImpl( 105 Connection connection, 106 SynchroTableMetadata table, 107 boolean enableWriteStatements) throws SQLException { 108 this.connection = connection; 109 this.table = table; 110 this.dbMeta = table.getDatabaseMetadata(); 111 this.columnCount = table.getColumnsCount(); 112 this.selectStatements = Lists.newArrayList(); 113 this.tableName = table.getName(); 114 115 // Batch size 116 this.batchSize = SynchroConfiguration.getInstance().getImportJdbcBatchSize(); 117 Preconditions.checkArgument(this.batchSize > 0); 118 119 debug = log.isTraceEnabled(); 120 121 if (table.isWithIdColumn()) { 122 this.idColumnIndex = table.getColumnIndex(SynchroTableMetadata.COLUMN_ID) + 1; 123 } 124 else { 125 this.idColumnIndex = -1; 126 } 127 128 if (enableWriteStatements) { 129 // Prepare insert statement 130 String insertSql = table.getInsertQuery(); 131 Preconditions.checkArgument(insertSql.toUpperCase().startsWith("INSERT"), 132 String.format("%s Insert SQL query should be like 'INSERT ...' but was: %s", this.tableName, insertSql)); 133 this.insertStatement = connection.prepareStatement(insertSql); 134 this.insertStrategyGenerateIdFirst = table.getInsertStrategy() == TableInsertStrategy.GENERATE_ID_FIRST; 135 136 // Prepare update statement 137 String updateSql = table.getUpdateQuery(); 138 Preconditions.checkArgument(updateSql.toUpperCase().startsWith("UPDATE"), 139 String.format("%s Update SQL query should be like 'UPDATE ...' but was: %s", this.tableName, updateSql)); 140 this.updateStatement = connection.prepareStatement(updateSql); 141 142 // Init write interceptors 143 this.writeInterceptor = initWriteInterceptor(table.getInterceptors()); 144 } 145 else { 146 this.insertStatement = null; 147 this.updateStatement = null; 148 this.insertStrategyGenerateIdFirst = false; 149 this.writeInterceptor = null; 150 } 151 152 // Query temp parameter (for query parameter with many values) 153 String insertQueryParameterSql = createTempQueryParameterQuery(table); 154 if (insertQueryParameterSql != null) { 155 insertQueryParameterStatement = getConnection().prepareStatement(insertQueryParameterSql); 156 } 157 else { 158 insertQueryParameterStatement = null; 159 } 160 161 this.readInterceptor = initReadInterceptor(table.getInterceptors()); 162 } 163 164 @Override 165 public Connection getConnection() { 166 return connection; 167 } 168 169 @Override 170 public SynchroTableMetadata getTable() { 171 return table; 172 } 173 174 public void setPendingOperationBuffer(SynchroPendingOperationBuffer pendingChangesBuffer) { 175 this.pendingOperationBuffer = pendingChangesBuffer; 176 } 177 178 public SynchroPendingOperationBuffer getPendingOperationBuffer() { 179 return this.pendingOperationBuffer; 180 } 181 182 public int getInsertCount() { 183 return insertCount; 184 } 185 186 public int getUpdateCount() { 187 return updateCount; 188 } 189 190 public void flush() throws SQLException { 191 if (!debug) { 192 if (insertCount > 0 && insertCount % batchSize != 0) { 193 insertStatement.executeBatch(); 194 insertStatement.clearBatch(); 195 } 196 if (updateCount > 0 && updateCount % batchSize != 0) { 197 updateStatement.executeBatch(); 198 updateStatement.clearBatch(); 199 } 200 } 201 } 202 203 @Override 204 public void close() throws IOException { 205 DaoUtils.closeSilently(insertStatement); 206 DaoUtils.closeSilently(updateStatement); 207 DaoUtils.closeSilently(insertQueryParameterStatement); 208 closeSelectStatements(); 209 } 210 211 /** 212 * Gets the last updateDate for the given {@code table} using 213 * the given datasource. 214 * 215 * @return the last update date of the given table, or {@code null} if table does not use a updateDate columns or if 216 * there 217 * is no data in table. 218 */ 219 public Timestamp getLastUpdateDate() throws SQLException { 220 if (!table.isWithUpdateDateColumn()) { 221 return null; 222 } 223 224 String sql = table.getSelectMaxUpdateDateQuery(); 225 if (sql == null) { 226 return null; 227 } 228 229 PreparedStatement statement = getConnection().prepareStatement(sql); 230 try { 231 ResultSet resultSet = statement.executeQuery(); 232 if (!resultSet.next()) { 233 return null; 234 } 235 Timestamp result = resultSet.getTimestamp(1); 236 return result; 237 } finally { 238 DaoUtils.closeSilently(statement); 239 } 240 } 241 242 public long count() throws SQLException { 243 244 String sql = table.getCountQuery(); 245 246 PreparedStatement statement = getConnection().prepareStatement(sql); 247 248 try { 249 ResultSet resultSet = statement.executeQuery(); 250 resultSet.next(); 251 long result = resultSet.getLong(1); 252 return result; 253 } finally { 254 DaoUtils.closeSilently(statement); 255 } 256 } 257 258 public long countDataToUpdate(Date fromDate) throws SQLException { 259 260 String sql = table.getCountDataToUpdateQuery(fromDate); 261 262 PreparedStatement statement = getConnection().prepareStatement(sql); 263 log.debug(sql); 264 try { 265 if (table.isWithUpdateDateColumn() && 266 fromDate != null) { 267 Timestamp updateDateValue = new Timestamp(fromDate.getTime()); 268 // Set all query parameter (could have more than one, 269 // when query has been override by an interceptor) 270 for (int i = 1; i <= statement.getParameterMetaData().getParameterCount(); i++) { 271 statement.setTimestamp(i, updateDateValue); 272 } 273 } 274 275 ResultSet queryResult = statement.executeQuery(); 276 queryResult.next(); 277 long result = queryResult.getLong(1); 278 return result; 279 } finally { 280 DaoUtils.closeSilently(statement); 281 } 282 } 283 284 public ResultSet getDataToUpdate(Date fromDate) throws SQLException { 285 286 String sql = table.getSelectDataToUpdateQuery(fromDate); 287 288 PreparedStatement statement = getConnection().prepareStatement(sql); 289 selectStatements.add(statement); 290 291 if (table.isWithUpdateDateColumn() && 292 fromDate != null) { 293 statement.setTimestamp(1, new Timestamp(fromDate.getTime())); 294 if (statement.getParameterMetaData().getParameterCount() > 1) { 295 statement.setTimestamp(2, new Timestamp(fromDate.getTime())); 296 } 297 } 298 statement.setFetchSize(batchSize); 299 300 ResultSet result = statement.executeQuery(); 301 return result; 302 } 303 304 public void deleteAll() throws SQLException { 305 PreparedStatement deleteStatement = 306 getConnection().prepareStatement( 307 "DELETE FROM " + table.getName()); 308 try { 309 deleteStatement.execute(); 310 } finally { 311 DaoUtils.closeSilently(deleteStatement); 312 } 313 } 314 315 public Object[] findByPk(List<Object> pk) throws SQLException { 316 String selectDataSql = table.getSelectDataQueryFromPk(); 317 318 PreparedStatement selectStatement = getConnection().prepareStatement(selectDataSql); 319 int columnCountIndex = 1; 320 321 for (Object pkColumn : pk) { 322 selectStatement.setObject(columnCountIndex++, pkColumn); 323 } 324 325 int columnsCount = table.getColumnsCount(); 326 327 try { 328 ResultSet resultSet = selectStatement.executeQuery(); 329 resultSet.next(); 330 331 Object[] result = new Object[columnsCount]; 332 333 for (int i = 1; i <= columnsCount; i++) { 334 335 result[i - 1] = resultSet.getObject(i); 336 } 337 return result; 338 } finally { 339 DaoUtils.closeSilently(selectStatement); 340 } 341 } 342 343 public Set<String> getExistingPrimaryKeys() throws SQLException { 344 Set<String> result = Sets.newHashSet(); 345 346 // If simple SQL (select with only one output String column) 347 if (table.isSelectPrimaryKeysAsStringQueryEnable()) { 348 String sql = table.getSelectPrimaryKeysAsStringQuery(); 349 PreparedStatement statement = getConnection().prepareStatement(sql); 350 try { 351 ResultSet resultSet = statement.executeQuery(); 352 353 while (resultSet.next()) { 354 String pk = resultSet.getString(1); 355 result.add(pk); 356 } 357 } finally { 358 DaoUtils.closeSilently(statement); 359 } 360 } 361 362 // If more than one one column as String, in the select query 363 else { 364 String sql = table.getSelectPrimaryKeysQuery(); 365 PreparedStatement statement = getConnection().prepareStatement(sql); 366 367 try { 368 ResultSet resultSet = statement.executeQuery(); 369 370 int columnCount = table.getPkNames().size(); 371 List<Object> pks = Lists.newArrayListWithCapacity(columnCount); 372 while (resultSet.next()) { 373 for (int i = 1; i <= columnCount; i++) { 374 Object pk = resultSet.getObject(i); 375 pks.add(pk); 376 } 377 result.add(table.toPkStr(pks)); 378 pks.clear(); 379 } 380 } finally { 381 DaoUtils.closeSilently(statement); 382 } 383 } 384 385 return result; 386 } 387 388 public Integer generateNewId() throws SQLException { 389 String sql = table.getSequenceNextValString(); 390 Preconditions.checkNotNull(sql); 391 392 if (sql == null) { 393 return null; 394 } 395 396 PreparedStatement statement = getConnection().prepareStatement(sql); 397 try { 398 ResultSet resultSet = statement.executeQuery(); 399 if (!resultSet.next()) { 400 return null; 401 } 402 Integer result = resultSet.getInt(1); 403 return result; 404 } finally { 405 DaoUtils.closeSilently(statement); 406 } 407 } 408 409 public void executeInsert(ResultSet incomingData) throws SQLException { 410 Preconditions.checkArgument(!this.insertStrategyGenerateIdFirst); 411 412 List<Object> params = null; 413 if (debug) { 414 params = Lists.newArrayList(); 415 } 416 417 if (writeInterceptor != null) { 418 transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params); 419 } 420 else { 421 setData(insertStatement, incomingData, params); 422 } 423 insertCount++; 424 425 if (debug) { 426 List<Object> pk = table.getPk(incomingData); 427 log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, table.toPkStr(pk), params)); 428 int nbRowInsert = insertStatement.executeUpdate(); 429 if (nbRowInsert != 1) { 430 throw new SynchroTechnicalException(String.format("%s Could not insert a row into the table (pk:%s), params: %s", tableName, 431 table.toPkStr(pk), params)); 432 } 433 } 434 else { 435 insertStatement.addBatch(); 436 if (insertCount > 0 && insertCount % batchSize == 0) { 437 insertStatement.executeBatch(); 438 insertStatement.clearBatch(); 439 } 440 } 441 } 442 443 public void executeInsert(Object[] incomingData) throws SQLException { 444 Preconditions.checkArgument(!this.insertStrategyGenerateIdFirst); 445 446 List<Object> params = null; 447 if (debug) { 448 params = Lists.newArrayList(); 449 } 450 451 if (writeInterceptor != null) { 452 transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params); 453 } 454 else { 455 setData(insertStatement, incomingData, params); 456 } 457 insertCount++; 458 459 if (debug) { 460 List<Object> pk = table.getPk(incomingData); 461 log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, table.toPkStr(pk), params)); 462 insertStatement.executeUpdate(); 463 } 464 else { 465 insertStatement.addBatch(); 466 if (insertCount > 0 && insertCount % batchSize == 0) { 467 insertStatement.executeBatch(); 468 insertStatement.clearBatch(); 469 } 470 } 471 } 472 473 @Override 474 public Integer executeInsertAndReturnId(ResultSet incomingData) throws SQLException { 475 Preconditions.checkArgument(this.insertStrategyGenerateIdFirst); 476 477 // Generate Id 478 Integer id = generateNewId(); 479 List<Object> params = null; 480 481 if (debug) { 482 params = Lists.newArrayList(); 483 } 484 485 if (writeInterceptor != null) { 486 transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params); 487 } 488 else { 489 setData(insertStatement, incomingData, params); 490 } 491 492 insertStatement.setObject(columnCount + 1, id); 493 insertCount++; 494 495 if (debug) { 496 log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, id, params)); 497 insertStatement.executeUpdate(); 498 } 499 else { 500 insertStatement.addBatch(); 501 if (insertCount > 0 && insertCount % batchSize == 0) { 502 insertStatement.executeBatch(); 503 insertStatement.clearBatch(); 504 } 505 } 506 507 return id; 508 } 509 510 @Override 511 public void executeUpdate(List<Object> pk, ResultSet incomingData) throws SQLException { 512 513 List<Object> params = null; 514 if (debug) { 515 params = Lists.newArrayList(); 516 } 517 518 if (writeInterceptor != null) { 519 transformAndSetData(updateStatement, incomingData, pendingOperationBuffer, writeInterceptor, params); 520 } 521 else { 522 setData(updateStatement, incomingData, params); 523 } 524 525 int columnCountIndex = columnCount + 1; 526 for (Object pkColumn : pk) { 527 updateStatement.setObject(columnCountIndex++, pkColumn); 528 } 529 530 updateCount++; 531 532 if (debug) { 533 log.debug(String.format("%s Execute update query (pk:%s), params: %s", tableName, pk, params)); 534 int nbRowUpdated = updateStatement.executeUpdate(); 535 Preconditions.checkArgument(nbRowUpdated == 1, String.format("%s rows has been updated, but expected 1 row.", nbRowUpdated)); 536 } 537 else { 538 updateStatement.addBatch(); 539 if (updateCount > 0 && updateCount % batchSize == 0) { 540 updateStatement.executeBatch(); 541 updateStatement.clearBatch(); 542 } 543 } 544 } 545 546 @Override 547 public void executeUpdate(List<Object> pk, Object[] row) throws SQLException { 548 List<Object> params = null; 549 if (debug) { 550 params = Lists.newArrayList(); 551 } 552 553 if (writeInterceptor != null) { 554 transformAndSetData(updateStatement, row, pendingOperationBuffer, writeInterceptor, params); 555 } 556 else { 557 setData(updateStatement, row, params); 558 } 559 560 int columnCountIndex = columnCount + 1; 561 562 for (Object pkColumn : pk) { 563 updateStatement.setObject(columnCountIndex++, pkColumn); 564 } 565 566 updateCount++; 567 568 if (debug) { 569 log.debug(String.format("%s Execute update query (pk:%s), params: %s", tableName, pk, params)); 570 int nbRowUpdated = updateStatement.executeUpdate(); 571 Preconditions.checkArgument(nbRowUpdated == 1, String.format("%s rows has been updated, but expected 1 row.", nbRowUpdated)); 572 } 573 else { 574 updateStatement.addBatch(); 575 if (updateCount > 0 && updateCount % batchSize == 0) { 576 updateStatement.executeBatch(); 577 updateStatement.clearBatch(); 578 } 579 } 580 } 581 582 @Override 583 public ResultSet getDataByFk(String fkColumnName, Set<Integer> fkValues) throws SQLException { 584 585 boolean insertUsingTempQueryParameter = isTempQueryParameterEnable() && fkValues.size() > dbMeta.getInExpressionCountLimit(); 586 587 if (insertUsingTempQueryParameter) { 588 return getDataByFkUsingTempParameterTable(fkColumnName, fkValues); 589 } 590 591 return getDataByFkWithInOperator(fkColumnName, fkValues); 592 } 593 594 @Override 595 public Integer getIdFromRemoteId(String tableName, Integer remoteId) throws SQLException { 596 String sql = getTable().getSelectIdFromRemoteIdQuery(tableName); 597 if (sql == null) { 598 return null; 599 } 600 601 PreparedStatement statement = getConnection().prepareStatement(sql); 602 statement.setInt(1, remoteId); 603 try { 604 ResultSet resultSet = statement.executeQuery(); 605 if (!resultSet.next()) { 606 return null; 607 } 608 Integer result = resultSet.getInt(1); 609 610 return result; 611 } finally { 612 DaoUtils.closeSilently(statement); 613 } 614 } 615 616 @Override 617 public Map<Integer, Integer> getExistingRemoteIdsMap() throws SQLException { 618 619 String sql = getTable().getSelectRemoteIdsQuery(); 620 621 PreparedStatement statement = getConnection().prepareStatement(sql); 622 623 Map<Integer, Integer> result = Maps.newHashMap(); 624 625 try { 626 ResultSet resultSet = statement.executeQuery(); 627 while (resultSet.next()) { 628 int id = resultSet.getInt(1); 629 int remoteId = resultSet.getInt(2); 630 result.put(remoteId, id); 631 } 632 statement.close(); 633 return result; 634 } finally { 635 DaoUtils.closeSilently(statement); 636 } 637 } 638 639 public List<Object> getPk(ResultSet incomingData, SynchroWriteBuffer transformResult) throws SQLException { 640 List<Object> result; 641 642 if (readInterceptor != null) { 643 Object[] data = transformData(incomingData, readInterceptor, transformResult); 644 result = getTable().getPk(data); 645 } 646 else { 647 result = getTable().getPk(incomingData); 648 } 649 650 return result; 651 } 652 653 public boolean isTempQueryParameterEnable() { 654 return insertQueryParameterStatement != null; 655 } 656 657 public void clearCounts() { 658 insertCount = 0; 659 updateCount = 0; 660 } 661 662 /* -- Abstract method -- */ 663 664 /* -- Protected methods -- */ 665 666 protected void insertValuesIntoTempQueryParameter(Set<Integer> parameterValues, String queryParameterName, int queryPersonId) throws SQLException { 667 Preconditions.checkNotNull(insertQueryParameterStatement); 668 if (debug) { 669 log.debug(String.format("%s Setting query parameters into %s", tableName, TABLE_TEMP_QUERY_PARAMETER.toUpperCase())); 670 } 671 672 Statement stm = getConnection().createStatement(); 673 try { 674 stm.executeUpdate(String.format("DELETE FROM %", TABLE_TEMP_QUERY_PARAMETER)); 675 } finally { 676 DaoUtils.closeSilently(stm); 677 } 678 679 int i = 1; 680 for (Integer parameterValue : parameterValues) { 681 insertQueryParameterStatement.setInt(1, i++); 682 insertQueryParameterStatement.setString(2, queryParameterName); 683 insertQueryParameterStatement.setInt(3, parameterValue); 684 insertQueryParameterStatement.setNull(4, Types.VARCHAR); 685 insertQueryParameterStatement.setInt(5, queryPersonId); 686 insertQueryParameterStatement.addBatch(); 687 } 688 689 insertQueryParameterStatement.executeBatch(); 690 insertQueryParameterStatement.clearBatch(); 691 } 692 693 protected String createTempQueryParameterQuery(SynchroTableMetadata table) { 694 SynchroDatabaseMetadata dbMeta = table.getDatabaseMetadata(); 695 if (dbMeta.getTable(TABLE_TEMP_QUERY_PARAMETER) == null) { 696 return null; 697 } 698 699 return String.format("INSERT INTO %s (ID, PARAMETER_NAME, NUMERICAL_VALUE, ALPHANUMERICAL_VALUE, PERSON_FK)" 700 + " VALUES (?, ?, ?, ?, ?)", 701 TABLE_TEMP_QUERY_PARAMETER); 702 703 } 704 705 protected void closeSelectStatements() { 706 if (CollectionUtils.isNotEmpty(selectStatements)) { 707 for (PreparedStatement statement : selectStatements) { 708 DaoUtils.closeSilently(statement); 709 } 710 } 711 selectStatements.clear(); 712 } 713 714 protected void closePreviousSelectStatements(PreparedStatement newStatement) { 715 closeSelectStatements(); 716 selectStatements.add(newStatement); 717 } 718 719 protected void setData(PreparedStatement statement, Object[] values, List<Object> debugParams) throws SQLException { 720 for (int c = 1; c <= columnCount; c++) { 721 Object object = values[c - 1]; 722 statement.setObject(c, object); 723 if (debug) { 724 debugParams.add(object); 725 } 726 } 727 } 728 729 protected void setData(PreparedStatement statement, ResultSet incomingData, List<Object> debugParams) throws SQLException { 730 for (int c = 1; c <= columnCount; c++) { 731 Object object = incomingData.getObject(c); 732 statement.setObject(c, object); 733 if (debug) { 734 debugParams.add(object); 735 } 736 } 737 } 738 739 protected void transformAndSetData(PreparedStatement statement, ResultSet incomingData, 740 SynchroWriteBuffer transformBuffer, SynchroInterceptor interceptor, List<Object> debugParams) 741 throws SQLException { 742 Preconditions.checkNotNull(transformBuffer); 743 744 // Transform data 745 Object[] row = transformData(incomingData, interceptor, transformBuffer); 746 747 // Set the data into the statement 748 setData(statement, row, debugParams); 749 } 750 751 protected void transformAndSetData(PreparedStatement statement, Object[] incomingData, 752 SynchroWriteBuffer transformBuffer, SynchroInterceptor interceptor, List<Object> debugParams) 753 throws SQLException { 754 Preconditions.checkNotNull(transformBuffer); 755 756 // Transform data 757 Object[] row = transformData(incomingData, interceptor, transformBuffer); 758 759 // Set the data into the statement 760 setData(statement, row, debugParams); 761 } 762 763 protected Object[] transformData(ResultSet incomingData, SynchroInterceptor interceptor, SynchroWriteBuffer buffer) 764 throws SQLException { 765 Preconditions.checkNotNull(interceptor); 766 767 // Get data 768 Object[] result = table.getData(incomingData); 769 770 // Transform values 771 try { 772 interceptor.onWrite(result, this, buffer); 773 } catch (Exception e) { 774 throw new SynchroTechnicalException(String.format("Error while transform row, for table %s", table.getName()), e); 775 } 776 777 return result; 778 } 779 780 protected Object[] transformData(Object[] incomingData, SynchroInterceptor interceptor, SynchroWriteBuffer buffer) 781 throws SQLException { 782 Preconditions.checkNotNull(interceptor); 783 784 // Transform values 785 try { 786 interceptor.onWrite(incomingData, this, buffer); 787 } catch (Exception e) { 788 throw new SynchroTechnicalException(String.format("Error while transform row, for table %s", table.getName()), e); 789 } 790 791 return incomingData; 792 } 793 794 protected ResultSet getDataByFkWithInOperator(String fkColumnName, Set<Integer> fkValues) throws SQLException { 795 String sql = table.getSelectDataFromFkQuery(fkColumnName, fkValues.size()); 796 797 Preconditions.checkNotNull(sql, String.format("Columns %s is not referenced for table %s", fkColumnName, table.getName())); 798 799 PreparedStatement statement = getConnection().prepareStatement(sql); 800 closePreviousSelectStatements(statement); 801 802 int paramIndex = 1; 803 for (Object value : fkValues) { 804 statement.setObject(paramIndex, value); 805 paramIndex++; 806 } 807 808 statement.setFetchSize(batchSize); 809 810 ResultSet result = statement.executeQuery(); 811 return result; 812 } 813 814 protected ResultSet getDataByFkUsingTempParameterTable(String fkColumnName, Set<Integer> fkValues) throws SQLException { 815 816 String sql = table.getSelectDataFromFkQueryBigParams(fkColumnName); 817 Preconditions.checkNotNull(sql); 818 819 PreparedStatement statement = getConnection().prepareStatement(sql); 820 closePreviousSelectStatements(statement); 821 822 String queryParameterName = "remoteIds"; 823 int personFk = -1; 824 insertValuesIntoTempQueryParameter(fkValues, queryParameterName, personFk); 825 statement.setString(1, queryParameterName); 826 statement.setInt(2, personFk); 827 828 statement.setFetchSize(batchSize); 829 830 ResultSet result = statement.executeQuery(); 831 return result; 832 } 833 834 @Override 835 public List<Object> getPk(ResultSet incomingData) throws SQLException { 836 return table.getPk(incomingData); 837 } 838 839 private SynchroInterceptor initReadInterceptor(List<SynchroInterceptor> interceptors) { 840 if (CollectionUtils.isEmpty(interceptors)) { 841 return null; 842 } 843 SynchroContext context = table.getDatabaseMetadata().getContext(); 844 List<SynchroInterceptor> readInterceptors = Lists.newArrayList(); 845 846 try { 847 for (SynchroInterceptor interceptor : interceptors) { 848 if (interceptor.enableOnRead()) { 849 SynchroInterceptor newInterceptor; 850 newInterceptor = interceptor.getClass().newInstance(); 851 852 newInterceptor.setContext(context); 853 readInterceptors.add(interceptor); 854 } 855 } 856 } catch (Exception e) { 857 throw new SynchroTechnicalException("Could not initialize DAO read interceptors.", e); 858 } 859 860 if (CollectionUtils.isEmpty(readInterceptors)) { 861 return null; 862 } 863 864 return SynchroInterceptorUtils.chain(readInterceptors, SynchroInterceptorBase.class); 865 } 866 867 private SynchroInterceptor initWriteInterceptor(List<SynchroInterceptor> interceptors) { 868 if (CollectionUtils.isEmpty(interceptors)) { 869 return null; 870 } 871 SynchroContext context = table.getDatabaseMetadata().getContext(); 872 List<SynchroInterceptor> writeInterceptors = Lists.newArrayList(); 873 874 try { 875 for (SynchroInterceptor interceptor : interceptors) { 876 if (interceptor.enableOnWrite()) { 877 SynchroInterceptor newInterceptor; 878 newInterceptor = interceptor.getClass().newInstance(); 879 880 newInterceptor.setContext(context); 881 writeInterceptors.add(interceptor); 882 } 883 } 884 } catch (Exception e) { 885 throw new SynchroTechnicalException("Could not initialize DAO read interceptors.", e); 886 } 887 888 if (CollectionUtils.isEmpty(writeInterceptors)) { 889 return null; 890 } 891 892 return SynchroInterceptorUtils.chain(writeInterceptors, SynchroInterceptorBase.class); 893 } 894 895}