1 package fr.ifremer.adagio.synchro.dao;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import java.io.IOException;
27 import java.sql.Connection;
28 import java.sql.PreparedStatement;
29 import java.sql.ResultSet;
30 import java.sql.SQLException;
31 import java.sql.Statement;
32 import java.sql.Timestamp;
33 import java.sql.Types;
34 import java.util.Date;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38
39 import org.apache.commons.collections.CollectionUtils;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43 import com.google.common.base.Preconditions;
44 import com.google.common.collect.Lists;
45 import com.google.common.collect.Maps;
46 import com.google.common.collect.Sets;
47
48 import fr.ifremer.adagio.synchro.SynchroTechnicalException;
49 import fr.ifremer.adagio.synchro.config.SynchroConfiguration;
50 import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor;
51 import fr.ifremer.adagio.synchro.intercept.SynchroInterceptorBase;
52 import fr.ifremer.adagio.synchro.intercept.SynchroInterceptorUtils;
53 import fr.ifremer.adagio.synchro.intercept.SynchroWriteBuffer;
54 import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata;
55 import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata;
56 import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata.TableInsertStrategy;
57 import fr.ifremer.adagio.synchro.service.SynchroContext;
58 import fr.ifremer.adagio.synchro.service.SynchroPendingOperationBuffer;
59
60 public class SynchroTableDaoImpl implements SynchroTableDao {
61
62
63 private static final Log log =
64 LogFactory.getLog(SynchroTableDaoImpl.class);
65
66 public static final String TABLE_TEMP_QUERY_PARAMETER = "temp_query_parameter";
67
68 protected final Connection connection;
69
70 protected final SynchroDatabaseMetadata dbMeta;
71
72 protected final SynchroTableMetadata table;
73
74 protected final PreparedStatement insertStatement;
75
76 protected final PreparedStatement insertQueryParameterStatement;
77
78 protected final PreparedStatement updateStatement;
79
80 protected final List<PreparedStatement> selectStatements;
81
82 protected final boolean insertStrategyGenerateIdFirst;
83
84 protected final int columnCount;
85
86 protected final int batchSize;
87
88 protected final String tableName;
89
90 protected boolean debug;
91
92 protected int insertCount = 0;
93
94 protected int updateCount = 0;
95
96 protected final int idColumnIndex;
97
98 protected final SynchroInterceptor readInterceptor;
99
100 protected final SynchroInterceptor writeInterceptor;
101
102 protected SynchroPendingOperationBuffer pendingOperationBuffer;
103
104 public SynchroTableDaoImpl(
105 Connection connection,
106 SynchroTableMetadata table,
107 boolean enableWriteStatements) throws SQLException {
108 this.connection = connection;
109 this.table = table;
110 this.dbMeta = table.getDatabaseMetadata();
111 this.columnCount = table.getColumnsCount();
112 this.selectStatements = Lists.newArrayList();
113 this.tableName = table.getName();
114
115
116 this.batchSize = SynchroConfiguration.getInstance().getImportJdbcBatchSize();
117 Preconditions.checkArgument(this.batchSize > 0);
118
119 debug = log.isTraceEnabled();
120
121 if (table.isWithIdColumn()) {
122 this.idColumnIndex = table.getColumnIndex(SynchroTableMetadata.COLUMN_ID) + 1;
123 }
124 else {
125 this.idColumnIndex = -1;
126 }
127
128 if (enableWriteStatements) {
129
130 String insertSql = table.getInsertQuery();
131 Preconditions.checkArgument(insertSql.toUpperCase().startsWith("INSERT"),
132 String.format("%s Insert SQL query should be like 'INSERT ...' but was: %s", this.tableName, insertSql));
133 this.insertStatement = connection.prepareStatement(insertSql);
134 this.insertStrategyGenerateIdFirst = table.getInsertStrategy() == TableInsertStrategy.GENERATE_ID_FIRST;
135
136
137 String updateSql = table.getUpdateQuery();
138 Preconditions.checkArgument(updateSql.toUpperCase().startsWith("UPDATE"),
139 String.format("%s Update SQL query should be like 'UPDATE ...' but was: %s", this.tableName, updateSql));
140 this.updateStatement = connection.prepareStatement(updateSql);
141
142
143 this.writeInterceptor = initWriteInterceptor(table.getInterceptors());
144 }
145 else {
146 this.insertStatement = null;
147 this.updateStatement = null;
148 this.insertStrategyGenerateIdFirst = false;
149 this.writeInterceptor = null;
150 }
151
152
153 String insertQueryParameterSql = createTempQueryParameterQuery(table);
154 if (insertQueryParameterSql != null) {
155 insertQueryParameterStatement = getConnection().prepareStatement(insertQueryParameterSql);
156 }
157 else {
158 insertQueryParameterStatement = null;
159 }
160
161 this.readInterceptor = initReadInterceptor(table.getInterceptors());
162 }
163
164 @Override
165 public Connection getConnection() {
166 return connection;
167 }
168
169 @Override
170 public SynchroTableMetadata getTable() {
171 return table;
172 }
173
174 public void setPendingOperationBuffer(SynchroPendingOperationBuffer pendingChangesBuffer) {
175 this.pendingOperationBuffer = pendingChangesBuffer;
176 }
177
178 public SynchroPendingOperationBuffer getPendingOperationBuffer() {
179 return this.pendingOperationBuffer;
180 }
181
182 public int getInsertCount() {
183 return insertCount;
184 }
185
186 public int getUpdateCount() {
187 return updateCount;
188 }
189
190 public void flush() throws SQLException {
191 if (!debug) {
192 if (insertCount > 0 && insertCount % batchSize != 0) {
193 insertStatement.executeBatch();
194 insertStatement.clearBatch();
195 }
196 if (updateCount > 0 && updateCount % batchSize != 0) {
197 updateStatement.executeBatch();
198 updateStatement.clearBatch();
199 }
200 }
201 }
202
203 @Override
204 public void close() throws IOException {
205 DaoUtils.closeSilently(insertStatement);
206 DaoUtils.closeSilently(updateStatement);
207 DaoUtils.closeSilently(insertQueryParameterStatement);
208 closeSelectStatements();
209 }
210
211
212
213
214
215
216
217
218
219 public Timestamp getLastUpdateDate() throws SQLException {
220 if (!table.isWithUpdateDateColumn()) {
221 return null;
222 }
223
224 String sql = table.getSelectMaxUpdateDateQuery();
225 if (sql == null) {
226 return null;
227 }
228
229 PreparedStatement statement = getConnection().prepareStatement(sql);
230 try {
231 ResultSet resultSet = statement.executeQuery();
232 if (!resultSet.next()) {
233 return null;
234 }
235 Timestamp result = resultSet.getTimestamp(1);
236 return result;
237 } finally {
238 DaoUtils.closeSilently(statement);
239 }
240 }
241
242 public long count() throws SQLException {
243
244 String sql = table.getCountQuery();
245
246 PreparedStatement statement = getConnection().prepareStatement(sql);
247
248 try {
249 ResultSet resultSet = statement.executeQuery();
250 resultSet.next();
251 long result = resultSet.getLong(1);
252 return result;
253 } finally {
254 DaoUtils.closeSilently(statement);
255 }
256 }
257
258 public long countDataToUpdate(Date fromDate) throws SQLException {
259
260 String sql = table.getCountDataToUpdateQuery(fromDate);
261
262 PreparedStatement statement = getConnection().prepareStatement(sql);
263 log.debug(sql);
264 try {
265 if (table.isWithUpdateDateColumn() &&
266 fromDate != null) {
267 Timestamp updateDateValue = new Timestamp(fromDate.getTime());
268
269
270 for (int i = 1; i <= statement.getParameterMetaData().getParameterCount(); i++) {
271 statement.setTimestamp(i, updateDateValue);
272 }
273 }
274
275 ResultSet queryResult = statement.executeQuery();
276 queryResult.next();
277 long result = queryResult.getLong(1);
278 return result;
279 } finally {
280 DaoUtils.closeSilently(statement);
281 }
282 }
283
284 public ResultSet getDataToUpdate(Date fromDate) throws SQLException {
285
286 String sql = table.getSelectDataToUpdateQuery(fromDate);
287
288 PreparedStatement statement = getConnection().prepareStatement(sql);
289 selectStatements.add(statement);
290
291 if (table.isWithUpdateDateColumn() &&
292 fromDate != null) {
293 statement.setTimestamp(1, new Timestamp(fromDate.getTime()));
294 if (statement.getParameterMetaData().getParameterCount() > 1) {
295 statement.setTimestamp(2, new Timestamp(fromDate.getTime()));
296 }
297 }
298 statement.setFetchSize(batchSize);
299
300 ResultSet result = statement.executeQuery();
301 return result;
302 }
303
304 public void deleteAll() throws SQLException {
305 PreparedStatement deleteStatement =
306 getConnection().prepareStatement(
307 "DELETE FROM " + table.getName());
308 try {
309 deleteStatement.execute();
310 } finally {
311 DaoUtils.closeSilently(deleteStatement);
312 }
313 }
314
315 public Object[] findByPk(List<Object> pk) throws SQLException {
316 String selectDataSql = table.getSelectDataQueryFromPk();
317
318 PreparedStatement selectStatement = getConnection().prepareStatement(selectDataSql);
319 int columnCountIndex = 1;
320
321 for (Object pkColumn : pk) {
322 selectStatement.setObject(columnCountIndex++, pkColumn);
323 }
324
325 int columnsCount = table.getColumnsCount();
326
327 try {
328 ResultSet resultSet = selectStatement.executeQuery();
329 resultSet.next();
330
331 Object[] result = new Object[columnsCount];
332
333 for (int i = 1; i <= columnsCount; i++) {
334
335 result[i - 1] = resultSet.getObject(i);
336 }
337 return result;
338 } finally {
339 DaoUtils.closeSilently(selectStatement);
340 }
341 }
342
343 public Set<String> getExistingPrimaryKeys() throws SQLException {
344 Set<String> result = Sets.newHashSet();
345
346
347 if (table.isSelectPrimaryKeysAsStringQueryEnable()) {
348 String sql = table.getSelectPrimaryKeysAsStringQuery();
349 PreparedStatement statement = getConnection().prepareStatement(sql);
350 try {
351 ResultSet resultSet = statement.executeQuery();
352
353 while (resultSet.next()) {
354 String pk = resultSet.getString(1);
355 result.add(pk);
356 }
357 } finally {
358 DaoUtils.closeSilently(statement);
359 }
360 }
361
362
363 else {
364 String sql = table.getSelectPrimaryKeysQuery();
365 PreparedStatement statement = getConnection().prepareStatement(sql);
366
367 try {
368 ResultSet resultSet = statement.executeQuery();
369
370 int columnCount = table.getPkNames().size();
371 List<Object> pks = Lists.newArrayListWithCapacity(columnCount);
372 while (resultSet.next()) {
373 for (int i = 1; i <= columnCount; i++) {
374 Object pk = resultSet.getObject(i);
375 pks.add(pk);
376 }
377 result.add(table.toPkStr(pks));
378 pks.clear();
379 }
380 } finally {
381 DaoUtils.closeSilently(statement);
382 }
383 }
384
385 return result;
386 }
387
388 public Integer generateNewId() throws SQLException {
389 String sql = table.getSequenceNextValString();
390 Preconditions.checkNotNull(sql);
391
392 if (sql == null) {
393 return null;
394 }
395
396 PreparedStatement statement = getConnection().prepareStatement(sql);
397 try {
398 ResultSet resultSet = statement.executeQuery();
399 if (!resultSet.next()) {
400 return null;
401 }
402 Integer result = resultSet.getInt(1);
403 return result;
404 } finally {
405 DaoUtils.closeSilently(statement);
406 }
407 }
408
409 public void executeInsert(ResultSet incomingData) throws SQLException {
410 Preconditions.checkArgument(!this.insertStrategyGenerateIdFirst);
411
412 List<Object> params = null;
413 if (debug) {
414 params = Lists.newArrayList();
415 }
416
417 if (writeInterceptor != null) {
418 transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
419 }
420 else {
421 setData(insertStatement, incomingData, params);
422 }
423 insertCount++;
424
425 if (debug) {
426 List<Object> pk = table.getPk(incomingData);
427 log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, table.toPkStr(pk), params));
428 int nbRowInsert = insertStatement.executeUpdate();
429 if (nbRowInsert != 1) {
430 throw new SynchroTechnicalException(String.format("%s Could not insert a row into the table (pk:%s), params: %s", tableName,
431 table.toPkStr(pk), params));
432 }
433 }
434 else {
435 insertStatement.addBatch();
436 if (insertCount > 0 && insertCount % batchSize == 0) {
437 insertStatement.executeBatch();
438 insertStatement.clearBatch();
439 }
440 }
441 }
442
443 public void executeInsert(Object[] incomingData) throws SQLException {
444 Preconditions.checkArgument(!this.insertStrategyGenerateIdFirst);
445
446 List<Object> params = null;
447 if (debug) {
448 params = Lists.newArrayList();
449 }
450
451 if (writeInterceptor != null) {
452 transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
453 }
454 else {
455 setData(insertStatement, incomingData, params);
456 }
457 insertCount++;
458
459 if (debug) {
460 List<Object> pk = table.getPk(incomingData);
461 log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, table.toPkStr(pk), params));
462 insertStatement.executeUpdate();
463 }
464 else {
465 insertStatement.addBatch();
466 if (insertCount > 0 && insertCount % batchSize == 0) {
467 insertStatement.executeBatch();
468 insertStatement.clearBatch();
469 }
470 }
471 }
472
473 @Override
474 public Integer executeInsertAndReturnId(ResultSet incomingData) throws SQLException {
475 Preconditions.checkArgument(this.insertStrategyGenerateIdFirst);
476
477
478 Integer id = generateNewId();
479 List<Object> params = null;
480
481 if (debug) {
482 params = Lists.newArrayList();
483 }
484
485 if (writeInterceptor != null) {
486 transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
487 }
488 else {
489 setData(insertStatement, incomingData, params);
490 }
491
492 insertStatement.setObject(columnCount + 1, id);
493 insertCount++;
494
495 if (debug) {
496 log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, id, params));
497 insertStatement.executeUpdate();
498 }
499 else {
500 insertStatement.addBatch();
501 if (insertCount > 0 && insertCount % batchSize == 0) {
502 insertStatement.executeBatch();
503 insertStatement.clearBatch();
504 }
505 }
506
507 return id;
508 }
509
510 @Override
511 public void executeUpdate(List<Object> pk, ResultSet incomingData) throws SQLException {
512
513 List<Object> params = null;
514 if (debug) {
515 params = Lists.newArrayList();
516 }
517
518 if (writeInterceptor != null) {
519 transformAndSetData(updateStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
520 }
521 else {
522 setData(updateStatement, incomingData, params);
523 }
524
525 int columnCountIndex = columnCount + 1;
526 for (Object pkColumn : pk) {
527 updateStatement.setObject(columnCountIndex++, pkColumn);
528 }
529
530 updateCount++;
531
532 if (debug) {
533 log.debug(String.format("%s Execute update query (pk:%s), params: %s", tableName, pk, params));
534 int nbRowUpdated = updateStatement.executeUpdate();
535 Preconditions.checkArgument(nbRowUpdated == 1, String.format("%s rows has been updated, but expected 1 row.", nbRowUpdated));
536 }
537 else {
538 updateStatement.addBatch();
539 if (updateCount > 0 && updateCount % batchSize == 0) {
540 updateStatement.executeBatch();
541 updateStatement.clearBatch();
542 }
543 }
544 }
545
546 @Override
547 public void executeUpdate(List<Object> pk, Object[] row) throws SQLException {
548 List<Object> params = null;
549 if (debug) {
550 params = Lists.newArrayList();
551 }
552
553 if (writeInterceptor != null) {
554 transformAndSetData(updateStatement, row, pendingOperationBuffer, writeInterceptor, params);
555 }
556 else {
557 setData(updateStatement, row, params);
558 }
559
560 int columnCountIndex = columnCount + 1;
561
562 for (Object pkColumn : pk) {
563 updateStatement.setObject(columnCountIndex++, pkColumn);
564 }
565
566 updateCount++;
567
568 if (debug) {
569 log.debug(String.format("%s Execute update query (pk:%s), params: %s", tableName, pk, params));
570 int nbRowUpdated = updateStatement.executeUpdate();
571 Preconditions.checkArgument(nbRowUpdated == 1, String.format("%s rows has been updated, but expected 1 row.", nbRowUpdated));
572 }
573 else {
574 updateStatement.addBatch();
575 if (updateCount > 0 && updateCount % batchSize == 0) {
576 updateStatement.executeBatch();
577 updateStatement.clearBatch();
578 }
579 }
580 }
581
582 @Override
583 public ResultSet getDataByFk(String fkColumnName, Set<Integer> fkValues) throws SQLException {
584
585 boolean insertUsingTempQueryParameter = isTempQueryParameterEnable() && fkValues.size() > dbMeta.getInExpressionCountLimit();
586
587 if (insertUsingTempQueryParameter) {
588 return getDataByFkUsingTempParameterTable(fkColumnName, fkValues);
589 }
590
591 return getDataByFkWithInOperator(fkColumnName, fkValues);
592 }
593
594 @Override
595 public Integer getIdFromRemoteId(String tableName, Integer remoteId) throws SQLException {
596 String sql = getTable().getSelectIdFromRemoteIdQuery(tableName);
597 if (sql == null) {
598 return null;
599 }
600
601 PreparedStatement statement = getConnection().prepareStatement(sql);
602 statement.setInt(1, remoteId);
603 try {
604 ResultSet resultSet = statement.executeQuery();
605 if (!resultSet.next()) {
606 return null;
607 }
608 Integer result = resultSet.getInt(1);
609
610 return result;
611 } finally {
612 DaoUtils.closeSilently(statement);
613 }
614 }
615
616 @Override
617 public Map<Integer, Integer> getExistingRemoteIdsMap() throws SQLException {
618
619 String sql = getTable().getSelectRemoteIdsQuery();
620
621 PreparedStatement statement = getConnection().prepareStatement(sql);
622
623 Map<Integer, Integer> result = Maps.newHashMap();
624
625 try {
626 ResultSet resultSet = statement.executeQuery();
627 while (resultSet.next()) {
628 int id = resultSet.getInt(1);
629 int remoteId = resultSet.getInt(2);
630 result.put(remoteId, id);
631 }
632 statement.close();
633 return result;
634 } finally {
635 DaoUtils.closeSilently(statement);
636 }
637 }
638
639 public List<Object> getPk(ResultSet incomingData, SynchroWriteBuffer transformResult) throws SQLException {
640 List<Object> result;
641
642 if (readInterceptor != null) {
643 Object[] data = transformData(incomingData, readInterceptor, transformResult);
644 result = getTable().getPk(data);
645 }
646 else {
647 result = getTable().getPk(incomingData);
648 }
649
650 return result;
651 }
652
653 public boolean isTempQueryParameterEnable() {
654 return insertQueryParameterStatement != null;
655 }
656
657 public void clearCounts() {
658 insertCount = 0;
659 updateCount = 0;
660 }
661
662
663
664
665
666 protected void insertValuesIntoTempQueryParameter(Set<Integer> parameterValues, String queryParameterName, int queryPersonId) throws SQLException {
667 Preconditions.checkNotNull(insertQueryParameterStatement);
668 if (debug) {
669 log.debug(String.format("%s Setting query parameters into %s", tableName, TABLE_TEMP_QUERY_PARAMETER.toUpperCase()));
670 }
671
672 Statement stm = getConnection().createStatement();
673 try {
674 stm.executeUpdate(String.format("DELETE FROM %", TABLE_TEMP_QUERY_PARAMETER));
675 } finally {
676 DaoUtils.closeSilently(stm);
677 }
678
679 int i = 1;
680 for (Integer parameterValue : parameterValues) {
681 insertQueryParameterStatement.setInt(1, i++);
682 insertQueryParameterStatement.setString(2, queryParameterName);
683 insertQueryParameterStatement.setInt(3, parameterValue);
684 insertQueryParameterStatement.setNull(4, Types.VARCHAR);
685 insertQueryParameterStatement.setInt(5, queryPersonId);
686 insertQueryParameterStatement.addBatch();
687 }
688
689 insertQueryParameterStatement.executeBatch();
690 insertQueryParameterStatement.clearBatch();
691 }
692
693 protected String createTempQueryParameterQuery(SynchroTableMetadata table) {
694 SynchroDatabaseMetadata dbMeta = table.getDatabaseMetadata();
695 if (dbMeta.getTable(TABLE_TEMP_QUERY_PARAMETER) == null) {
696 return null;
697 }
698
699 return String.format("INSERT INTO %s (ID, PARAMETER_NAME, NUMERICAL_VALUE, ALPHANUMERICAL_VALUE, PERSON_FK)"
700 + " VALUES (?, ?, ?, ?, ?)",
701 TABLE_TEMP_QUERY_PARAMETER);
702
703 }
704
705 protected void closeSelectStatements() {
706 if (CollectionUtils.isNotEmpty(selectStatements)) {
707 for (PreparedStatement statement : selectStatements) {
708 DaoUtils.closeSilently(statement);
709 }
710 }
711 selectStatements.clear();
712 }
713
714 protected void closePreviousSelectStatements(PreparedStatement newStatement) {
715 closeSelectStatements();
716 selectStatements.add(newStatement);
717 }
718
719 protected void setData(PreparedStatement statement, Object[] values, List<Object> debugParams) throws SQLException {
720 for (int c = 1; c <= columnCount; c++) {
721 Object object = values[c - 1];
722 statement.setObject(c, object);
723 if (debug) {
724 debugParams.add(object);
725 }
726 }
727 }
728
729 protected void setData(PreparedStatement statement, ResultSet incomingData, List<Object> debugParams) throws SQLException {
730 for (int c = 1; c <= columnCount; c++) {
731 Object object = incomingData.getObject(c);
732 statement.setObject(c, object);
733 if (debug) {
734 debugParams.add(object);
735 }
736 }
737 }
738
739 protected void transformAndSetData(PreparedStatement statement, ResultSet incomingData,
740 SynchroWriteBuffer transformBuffer, SynchroInterceptor interceptor, List<Object> debugParams)
741 throws SQLException {
742 Preconditions.checkNotNull(transformBuffer);
743
744
745 Object[] row = transformData(incomingData, interceptor, transformBuffer);
746
747
748 setData(statement, row, debugParams);
749 }
750
751 protected void transformAndSetData(PreparedStatement statement, Object[] incomingData,
752 SynchroWriteBuffer transformBuffer, SynchroInterceptor interceptor, List<Object> debugParams)
753 throws SQLException {
754 Preconditions.checkNotNull(transformBuffer);
755
756
757 Object[] row = transformData(incomingData, interceptor, transformBuffer);
758
759
760 setData(statement, row, debugParams);
761 }
762
763 protected Object[] transformData(ResultSet incomingData, SynchroInterceptor interceptor, SynchroWriteBuffer buffer)
764 throws SQLException {
765 Preconditions.checkNotNull(interceptor);
766
767
768 Object[] result = table.getData(incomingData);
769
770
771 try {
772 interceptor.onWrite(result, this, buffer);
773 } catch (Exception e) {
774 throw new SynchroTechnicalException(String.format("Error while transform row, for table %s", table.getName()), e);
775 }
776
777 return result;
778 }
779
780 protected Object[] transformData(Object[] incomingData, SynchroInterceptor interceptor, SynchroWriteBuffer buffer)
781 throws SQLException {
782 Preconditions.checkNotNull(interceptor);
783
784
785 try {
786 interceptor.onWrite(incomingData, this, buffer);
787 } catch (Exception e) {
788 throw new SynchroTechnicalException(String.format("Error while transform row, for table %s", table.getName()), e);
789 }
790
791 return incomingData;
792 }
793
794 protected ResultSet getDataByFkWithInOperator(String fkColumnName, Set<Integer> fkValues) throws SQLException {
795 String sql = table.getSelectDataFromFkQuery(fkColumnName, fkValues.size());
796
797 Preconditions.checkNotNull(sql, String.format("Columns %s is not referenced for table %s", fkColumnName, table.getName()));
798
799 PreparedStatement statement = getConnection().prepareStatement(sql);
800 closePreviousSelectStatements(statement);
801
802 int paramIndex = 1;
803 for (Object value : fkValues) {
804 statement.setObject(paramIndex, value);
805 paramIndex++;
806 }
807
808 statement.setFetchSize(batchSize);
809
810 ResultSet result = statement.executeQuery();
811 return result;
812 }
813
814 protected ResultSet getDataByFkUsingTempParameterTable(String fkColumnName, Set<Integer> fkValues) throws SQLException {
815
816 String sql = table.getSelectDataFromFkQueryBigParams(fkColumnName);
817 Preconditions.checkNotNull(sql);
818
819 PreparedStatement statement = getConnection().prepareStatement(sql);
820 closePreviousSelectStatements(statement);
821
822 String queryParameterName = "remoteIds";
823 int personFk = -1;
824 insertValuesIntoTempQueryParameter(fkValues, queryParameterName, personFk);
825 statement.setString(1, queryParameterName);
826 statement.setInt(2, personFk);
827
828 statement.setFetchSize(batchSize);
829
830 ResultSet result = statement.executeQuery();
831 return result;
832 }
833
834 @Override
835 public List<Object> getPk(ResultSet incomingData) throws SQLException {
836 return table.getPk(incomingData);
837 }
838
839 private SynchroInterceptor initReadInterceptor(List<SynchroInterceptor> interceptors) {
840 if (CollectionUtils.isEmpty(interceptors)) {
841 return null;
842 }
843 SynchroContext context = table.getDatabaseMetadata().getContext();
844 List<SynchroInterceptor> readInterceptors = Lists.newArrayList();
845
846 try {
847 for (SynchroInterceptor interceptor : interceptors) {
848 if (interceptor.enableOnRead()) {
849 SynchroInterceptor newInterceptor;
850 newInterceptor = interceptor.getClass().newInstance();
851
852 newInterceptor.setContext(context);
853 readInterceptors.add(interceptor);
854 }
855 }
856 } catch (Exception e) {
857 throw new SynchroTechnicalException("Could not initialize DAO read interceptors.", e);
858 }
859
860 if (CollectionUtils.isEmpty(readInterceptors)) {
861 return null;
862 }
863
864 return SynchroInterceptorUtils.chain(readInterceptors, SynchroInterceptorBase.class);
865 }
866
867 private SynchroInterceptor initWriteInterceptor(List<SynchroInterceptor> interceptors) {
868 if (CollectionUtils.isEmpty(interceptors)) {
869 return null;
870 }
871 SynchroContext context = table.getDatabaseMetadata().getContext();
872 List<SynchroInterceptor> writeInterceptors = Lists.newArrayList();
873
874 try {
875 for (SynchroInterceptor interceptor : interceptors) {
876 if (interceptor.enableOnWrite()) {
877 SynchroInterceptor newInterceptor;
878 newInterceptor = interceptor.getClass().newInstance();
879
880 newInterceptor.setContext(context);
881 writeInterceptors.add(interceptor);
882 }
883 }
884 } catch (Exception e) {
885 throw new SynchroTechnicalException("Could not initialize DAO read interceptors.", e);
886 }
887
888 if (CollectionUtils.isEmpty(writeInterceptors)) {
889 return null;
890 }
891
892 return SynchroInterceptorUtils.chain(writeInterceptors, SynchroInterceptorBase.class);
893 }
894
895 }