View Javadoc
1   package fr.ifremer.adagio.synchro.dao;
2   
3   /*
4    * #%L
5    * SIH-Adagio :: Synchronization
6    * $Id:$
7    * $HeadURL:$
8    * %%
9    * Copyright (C) 2012 - 2014 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   * 
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   * 
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import java.io.IOException;
27  import java.sql.Connection;
28  import java.sql.PreparedStatement;
29  import java.sql.ResultSet;
30  import java.sql.SQLException;
31  import java.sql.Statement;
32  import java.sql.Timestamp;
33  import java.sql.Types;
34  import java.util.Date;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Set;
38  
39  import org.apache.commons.collections.CollectionUtils;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  
43  import com.google.common.base.Preconditions;
44  import com.google.common.collect.Lists;
45  import com.google.common.collect.Maps;
46  import com.google.common.collect.Sets;
47  
48  import fr.ifremer.adagio.synchro.SynchroTechnicalException;
49  import fr.ifremer.adagio.synchro.config.SynchroConfiguration;
50  import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor;
51  import fr.ifremer.adagio.synchro.intercept.SynchroInterceptorBase;
52  import fr.ifremer.adagio.synchro.intercept.SynchroInterceptorUtils;
53  import fr.ifremer.adagio.synchro.intercept.SynchroWriteBuffer;
54  import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata;
55  import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata;
56  import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata.TableInsertStrategy;
57  import fr.ifremer.adagio.synchro.service.SynchroContext;
58  import fr.ifremer.adagio.synchro.service.SynchroPendingOperationBuffer;
59  
60  public class SynchroTableDaoImpl implements SynchroTableDao {
61  
62  	/** Logger. */
63  	private static final Log log =
64  			LogFactory.getLog(SynchroTableDaoImpl.class);
65  
66  	public static final String TABLE_TEMP_QUERY_PARAMETER = "temp_query_parameter";
67  
68  	protected final Connection connection;
69  
70  	protected final SynchroDatabaseMetadata dbMeta;
71  
72  	protected final SynchroTableMetadata table;
73  
74  	protected final PreparedStatement insertStatement;
75  
76  	protected final PreparedStatement insertQueryParameterStatement;
77  
78  	protected final PreparedStatement updateStatement;
79  
80  	protected final List<PreparedStatement> selectStatements;
81  
82  	protected final boolean insertStrategyGenerateIdFirst;
83  
84  	protected final int columnCount;
85  
86  	protected final int batchSize;
87  
88  	protected final String tableName;
89  
90  	protected boolean debug;
91  
92  	protected int insertCount = 0;
93  
94  	protected int updateCount = 0;
95  
96  	protected final int idColumnIndex;
97  
98  	protected final SynchroInterceptor readInterceptor;
99  
100 	protected final SynchroInterceptor writeInterceptor;
101 
102 	protected SynchroPendingOperationBuffer pendingOperationBuffer;
103 
104 	public SynchroTableDaoImpl(
105 			Connection connection,
106 			SynchroTableMetadata table,
107 			boolean enableWriteStatements) throws SQLException {
108 		this.connection = connection;
109 		this.table = table;
110 		this.dbMeta = table.getDatabaseMetadata();
111 		this.columnCount = table.getColumnsCount();
112 		this.selectStatements = Lists.newArrayList();
113 		this.tableName = table.getName();
114 
115 		// Batch size
116 		this.batchSize = SynchroConfiguration.getInstance().getImportJdbcBatchSize();
117 		Preconditions.checkArgument(this.batchSize > 0);
118 
119 		debug = log.isTraceEnabled();
120 
121 		if (table.isWithIdColumn()) {
122 			this.idColumnIndex = table.getColumnIndex(SynchroTableMetadata.COLUMN_ID) + 1;
123 		}
124 		else {
125 			this.idColumnIndex = -1;
126 		}
127 
128 		if (enableWriteStatements) {
129 			// Prepare insert statement
130 			String insertSql = table.getInsertQuery();
131 			Preconditions.checkArgument(insertSql.toUpperCase().startsWith("INSERT"),
132 					String.format("%s Insert SQL query should be like 'INSERT ...' but was: %s", this.tableName, insertSql));
133 			this.insertStatement = connection.prepareStatement(insertSql);
134 			this.insertStrategyGenerateIdFirst = table.getInsertStrategy() == TableInsertStrategy.GENERATE_ID_FIRST;
135 
136 			// Prepare update statement
137 			String updateSql = table.getUpdateQuery();
138 			Preconditions.checkArgument(updateSql.toUpperCase().startsWith("UPDATE"),
139 					String.format("%s Update SQL query should be like 'UPDATE ...' but was: %s", this.tableName, updateSql));
140 			this.updateStatement = connection.prepareStatement(updateSql);
141 
142 			// Init write interceptors
143 			this.writeInterceptor = initWriteInterceptor(table.getInterceptors());
144 		}
145 		else {
146 			this.insertStatement = null;
147 			this.updateStatement = null;
148 			this.insertStrategyGenerateIdFirst = false;
149 			this.writeInterceptor = null;
150 		}
151 
152 		// Query temp parameter (for query parameter with many values)
153 		String insertQueryParameterSql = createTempQueryParameterQuery(table);
154 		if (insertQueryParameterSql != null) {
155 			insertQueryParameterStatement = getConnection().prepareStatement(insertQueryParameterSql);
156 		}
157 		else {
158 			insertQueryParameterStatement = null;
159 		}
160 
161 		this.readInterceptor = initReadInterceptor(table.getInterceptors());
162 	}
163 
164 	@Override
165 	public Connection getConnection() {
166 		return connection;
167 	}
168 
169 	@Override
170 	public SynchroTableMetadata getTable() {
171 		return table;
172 	}
173 
174 	public void setPendingOperationBuffer(SynchroPendingOperationBuffer pendingChangesBuffer) {
175 		this.pendingOperationBuffer = pendingChangesBuffer;
176 	}
177 
178 	public SynchroPendingOperationBuffer getPendingOperationBuffer() {
179 		return this.pendingOperationBuffer;
180 	}
181 
182 	public int getInsertCount() {
183 		return insertCount;
184 	}
185 
186 	public int getUpdateCount() {
187 		return updateCount;
188 	}
189 
190 	public void flush() throws SQLException {
191 		if (!debug) {
192 			if (insertCount > 0 && insertCount % batchSize != 0) {
193 				insertStatement.executeBatch();
194 				insertStatement.clearBatch();
195 			}
196 			if (updateCount > 0 && updateCount % batchSize != 0) {
197 				updateStatement.executeBatch();
198 				updateStatement.clearBatch();
199 			}
200 		}
201 	}
202 
203 	@Override
204 	public void close() throws IOException {
205 		DaoUtils.closeSilently(insertStatement);
206 		DaoUtils.closeSilently(updateStatement);
207 		DaoUtils.closeSilently(insertQueryParameterStatement);
208 		closeSelectStatements();
209 	}
210 
211 	/**
212 	 * Gets the last updateDate for the given {@code table} using
213 	 * the given datasource.
214 	 * 
215 	 * @return the last update date of the given table, or {@code null} if table does not use a updateDate columns or if
216 	 *         there
217 	 *         is no data in table.
218 	 */
219 	public Timestamp getLastUpdateDate() throws SQLException {
220 		if (!table.isWithUpdateDateColumn()) {
221 			return null;
222 		}
223 
224 		String sql = table.getSelectMaxUpdateDateQuery();
225 		if (sql == null) {
226 			return null;
227 		}
228 
229 		PreparedStatement statement = getConnection().prepareStatement(sql);
230 		try {
231 			ResultSet resultSet = statement.executeQuery();
232 			if (!resultSet.next()) {
233 				return null;
234 			}
235 			Timestamp result = resultSet.getTimestamp(1);
236 			return result;
237 		} finally {
238 			DaoUtils.closeSilently(statement);
239 		}
240 	}
241 
242 	public long count() throws SQLException {
243 
244 		String sql = table.getCountQuery();
245 
246 		PreparedStatement statement = getConnection().prepareStatement(sql);
247 
248 		try {
249 			ResultSet resultSet = statement.executeQuery();
250 			resultSet.next();
251 			long result = resultSet.getLong(1);
252 			return result;
253 		} finally {
254 			DaoUtils.closeSilently(statement);
255 		}
256 	}
257 
258 	public long countDataToUpdate(Date fromDate) throws SQLException {
259 
260 		String sql = table.getCountDataToUpdateQuery(fromDate);
261 
262 		PreparedStatement statement = getConnection().prepareStatement(sql);
263 		log.debug(sql);
264 		try {
265 			if (table.isWithUpdateDateColumn() &&
266 					fromDate != null) {
267 				Timestamp updateDateValue = new Timestamp(fromDate.getTime());
268 				// Set all query parameter (could have more than one,
269 				// when query has been override by an interceptor)
270 				for (int i = 1; i <= statement.getParameterMetaData().getParameterCount(); i++) {
271 					statement.setTimestamp(i, updateDateValue);
272 				}
273 			}
274 
275 			ResultSet queryResult = statement.executeQuery();
276 			queryResult.next();
277 			long result = queryResult.getLong(1);
278 			return result;
279 		} finally {
280 			DaoUtils.closeSilently(statement);
281 		}
282 	}
283 
284 	public ResultSet getDataToUpdate(Date fromDate) throws SQLException {
285 
286 		String sql = table.getSelectDataToUpdateQuery(fromDate);
287 
288 		PreparedStatement statement = getConnection().prepareStatement(sql);
289 		selectStatements.add(statement);
290 
291 		if (table.isWithUpdateDateColumn() &&
292 				fromDate != null) {
293 			statement.setTimestamp(1, new Timestamp(fromDate.getTime()));
294 			if (statement.getParameterMetaData().getParameterCount() > 1) {
295 				statement.setTimestamp(2, new Timestamp(fromDate.getTime()));
296 			}
297 		}
298 		statement.setFetchSize(batchSize);
299 
300 		ResultSet result = statement.executeQuery();
301 		return result;
302 	}
303 
304 	public void deleteAll() throws SQLException {
305 		PreparedStatement deleteStatement =
306 				getConnection().prepareStatement(
307 						"DELETE FROM " + table.getName());
308 		try {
309 			deleteStatement.execute();
310 		} finally {
311 			DaoUtils.closeSilently(deleteStatement);
312 		}
313 	}
314 
315 	public Object[] findByPk(List<Object> pk) throws SQLException {
316 		String selectDataSql = table.getSelectDataQueryFromPk();
317 
318 		PreparedStatement selectStatement = getConnection().prepareStatement(selectDataSql);
319 		int columnCountIndex = 1;
320 
321 		for (Object pkColumn : pk) {
322 			selectStatement.setObject(columnCountIndex++, pkColumn);
323 		}
324 
325 		int columnsCount = table.getColumnsCount();
326 
327 		try {
328 			ResultSet resultSet = selectStatement.executeQuery();
329 			resultSet.next();
330 
331 			Object[] result = new Object[columnsCount];
332 
333 			for (int i = 1; i <= columnsCount; i++) {
334 
335 				result[i - 1] = resultSet.getObject(i);
336 			}
337 			return result;
338 		} finally {
339 			DaoUtils.closeSilently(selectStatement);
340 		}
341 	}
342 
343 	public Set<String> getExistingPrimaryKeys() throws SQLException {
344 		Set<String> result = Sets.newHashSet();
345 
346 		// If simple SQL (select with only one output String column)
347 		if (table.isSelectPrimaryKeysAsStringQueryEnable()) {
348 			String sql = table.getSelectPrimaryKeysAsStringQuery();
349 			PreparedStatement statement = getConnection().prepareStatement(sql);
350 			try {
351 				ResultSet resultSet = statement.executeQuery();
352 
353 				while (resultSet.next()) {
354 					String pk = resultSet.getString(1);
355 					result.add(pk);
356 				}
357 			} finally {
358 				DaoUtils.closeSilently(statement);
359 			}
360 		}
361 
362 		// If more than one one column as String, in the select query
363 		else {
364 			String sql = table.getSelectPrimaryKeysQuery();
365 			PreparedStatement statement = getConnection().prepareStatement(sql);
366 
367 			try {
368 				ResultSet resultSet = statement.executeQuery();
369 
370 				int columnCount = table.getPkNames().size();
371 				List<Object> pks = Lists.newArrayListWithCapacity(columnCount);
372 				while (resultSet.next()) {
373 					for (int i = 1; i <= columnCount; i++) {
374 						Object pk = resultSet.getObject(i);
375 						pks.add(pk);
376 					}
377 					result.add(table.toPkStr(pks));
378 					pks.clear();
379 				}
380 			} finally {
381 				DaoUtils.closeSilently(statement);
382 			}
383 		}
384 
385 		return result;
386 	}
387 
388 	public Integer generateNewId() throws SQLException {
389 		String sql = table.getSequenceNextValString();
390 		Preconditions.checkNotNull(sql);
391 
392 		if (sql == null) {
393 			return null;
394 		}
395 
396 		PreparedStatement statement = getConnection().prepareStatement(sql);
397 		try {
398 			ResultSet resultSet = statement.executeQuery();
399 			if (!resultSet.next()) {
400 				return null;
401 			}
402 			Integer result = resultSet.getInt(1);
403 			return result;
404 		} finally {
405 			DaoUtils.closeSilently(statement);
406 		}
407 	}
408 
409 	public void executeInsert(ResultSet incomingData) throws SQLException {
410 		Preconditions.checkArgument(!this.insertStrategyGenerateIdFirst);
411 
412 		List<Object> params = null;
413 		if (debug) {
414 			params = Lists.newArrayList();
415 		}
416 
417 		if (writeInterceptor != null) {
418 			transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
419 		}
420 		else {
421 			setData(insertStatement, incomingData, params);
422 		}
423 		insertCount++;
424 
425 		if (debug) {
426 			List<Object> pk = table.getPk(incomingData);
427 			log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, table.toPkStr(pk), params));
428 			int nbRowInsert = insertStatement.executeUpdate();
429 			if (nbRowInsert != 1) {
430 				throw new SynchroTechnicalException(String.format("%s Could not insert a row into the table (pk:%s), params: %s", tableName,
431 						table.toPkStr(pk), params));
432 			}
433 		}
434 		else {
435 			insertStatement.addBatch();
436 			if (insertCount > 0 && insertCount % batchSize == 0) {
437 				insertStatement.executeBatch();
438 				insertStatement.clearBatch();
439 			}
440 		}
441 	}
442 
443 	public void executeInsert(Object[] incomingData) throws SQLException {
444 		Preconditions.checkArgument(!this.insertStrategyGenerateIdFirst);
445 
446 		List<Object> params = null;
447 		if (debug) {
448 			params = Lists.newArrayList();
449 		}
450 
451 		if (writeInterceptor != null) {
452 			transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
453 		}
454 		else {
455 			setData(insertStatement, incomingData, params);
456 		}
457 		insertCount++;
458 
459 		if (debug) {
460 			List<Object> pk = table.getPk(incomingData);
461 			log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, table.toPkStr(pk), params));
462 			insertStatement.executeUpdate();
463 		}
464 		else {
465 			insertStatement.addBatch();
466 			if (insertCount > 0 && insertCount % batchSize == 0) {
467 				insertStatement.executeBatch();
468 				insertStatement.clearBatch();
469 			}
470 		}
471 	}
472 
473 	@Override
474 	public Integer executeInsertAndReturnId(ResultSet incomingData) throws SQLException {
475 		Preconditions.checkArgument(this.insertStrategyGenerateIdFirst);
476 
477 		// Generate Id
478 		Integer id = generateNewId();
479 		List<Object> params = null;
480 
481 		if (debug) {
482 			params = Lists.newArrayList();
483 		}
484 
485 		if (writeInterceptor != null) {
486 			transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
487 		}
488 		else {
489 			setData(insertStatement, incomingData, params);
490 		}
491 
492 		insertStatement.setObject(columnCount + 1, id);
493 		insertCount++;
494 
495 		if (debug) {
496 			log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, id, params));
497 			insertStatement.executeUpdate();
498 		}
499 		else {
500 			insertStatement.addBatch();
501 			if (insertCount > 0 && insertCount % batchSize == 0) {
502 				insertStatement.executeBatch();
503 				insertStatement.clearBatch();
504 			}
505 		}
506 
507 		return id;
508 	}
509 
510 	@Override
511 	public void executeUpdate(List<Object> pk, ResultSet incomingData) throws SQLException {
512 
513 		List<Object> params = null;
514 		if (debug) {
515 			params = Lists.newArrayList();
516 		}
517 
518 		if (writeInterceptor != null) {
519 			transformAndSetData(updateStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
520 		}
521 		else {
522 			setData(updateStatement, incomingData, params);
523 		}
524 
525 		int columnCountIndex = columnCount + 1;
526 		for (Object pkColumn : pk) {
527 			updateStatement.setObject(columnCountIndex++, pkColumn);
528 		}
529 
530 		updateCount++;
531 
532 		if (debug) {
533 			log.debug(String.format("%s Execute update query (pk:%s), params: %s", tableName, pk, params));
534 			int nbRowUpdated = updateStatement.executeUpdate();
535 			Preconditions.checkArgument(nbRowUpdated == 1, String.format("%s rows has been updated, but expected 1 row.", nbRowUpdated));
536 		}
537 		else {
538 			updateStatement.addBatch();
539 			if (updateCount > 0 && updateCount % batchSize == 0) {
540 				updateStatement.executeBatch();
541 				updateStatement.clearBatch();
542 			}
543 		}
544 	}
545 
546 	@Override
547 	public void executeUpdate(List<Object> pk, Object[] row) throws SQLException {
548 		List<Object> params = null;
549 		if (debug) {
550 			params = Lists.newArrayList();
551 		}
552 
553 		if (writeInterceptor != null) {
554 			transformAndSetData(updateStatement, row, pendingOperationBuffer, writeInterceptor, params);
555 		}
556 		else {
557 			setData(updateStatement, row, params);
558 		}
559 
560 		int columnCountIndex = columnCount + 1;
561 
562 		for (Object pkColumn : pk) {
563 			updateStatement.setObject(columnCountIndex++, pkColumn);
564 		}
565 
566 		updateCount++;
567 
568 		if (debug) {
569 			log.debug(String.format("%s Execute update query (pk:%s), params: %s", tableName, pk, params));
570 			int nbRowUpdated = updateStatement.executeUpdate();
571 			Preconditions.checkArgument(nbRowUpdated == 1, String.format("%s rows has been updated, but expected 1 row.", nbRowUpdated));
572 		}
573 		else {
574 			updateStatement.addBatch();
575 			if (updateCount > 0 && updateCount % batchSize == 0) {
576 				updateStatement.executeBatch();
577 				updateStatement.clearBatch();
578 			}
579 		}
580 	}
581 
582 	@Override
583 	public ResultSet getDataByFk(String fkColumnName, Set<Integer> fkValues) throws SQLException {
584 
585 		boolean insertUsingTempQueryParameter = isTempQueryParameterEnable() && fkValues.size() > dbMeta.getInExpressionCountLimit();
586 
587 		if (insertUsingTempQueryParameter) {
588 			return getDataByFkUsingTempParameterTable(fkColumnName, fkValues);
589 		}
590 
591 		return getDataByFkWithInOperator(fkColumnName, fkValues);
592 	}
593 
594 	@Override
595 	public Integer getIdFromRemoteId(String tableName, Integer remoteId) throws SQLException {
596 		String sql = getTable().getSelectIdFromRemoteIdQuery(tableName);
597 		if (sql == null) {
598 			return null;
599 		}
600 
601 		PreparedStatement statement = getConnection().prepareStatement(sql);
602 		statement.setInt(1, remoteId);
603 		try {
604 			ResultSet resultSet = statement.executeQuery();
605 			if (!resultSet.next()) {
606 				return null;
607 			}
608 			Integer result = resultSet.getInt(1);
609 
610 			return result;
611 		} finally {
612 			DaoUtils.closeSilently(statement);
613 		}
614 	}
615 
616 	@Override
617 	public Map<Integer, Integer> getExistingRemoteIdsMap() throws SQLException {
618 
619 		String sql = getTable().getSelectRemoteIdsQuery();
620 
621 		PreparedStatement statement = getConnection().prepareStatement(sql);
622 
623 		Map<Integer, Integer> result = Maps.newHashMap();
624 
625 		try {
626 			ResultSet resultSet = statement.executeQuery();
627 			while (resultSet.next()) {
628 				int id = resultSet.getInt(1);
629 				int remoteId = resultSet.getInt(2);
630 				result.put(remoteId, id);
631 			}
632 			statement.close();
633 			return result;
634 		} finally {
635 			DaoUtils.closeSilently(statement);
636 		}
637 	}
638 
639 	public List<Object> getPk(ResultSet incomingData, SynchroWriteBuffer transformResult) throws SQLException {
640 		List<Object> result;
641 
642 		if (readInterceptor != null) {
643 			Object[] data = transformData(incomingData, readInterceptor, transformResult);
644 			result = getTable().getPk(data);
645 		}
646 		else {
647 			result = getTable().getPk(incomingData);
648 		}
649 
650 		return result;
651 	}
652 
653 	public boolean isTempQueryParameterEnable() {
654 		return insertQueryParameterStatement != null;
655 	}
656 
657 	public void clearCounts() {
658 		insertCount = 0;
659 		updateCount = 0;
660 	}
661 
662 	/* -- Abstract method -- */
663 
664 	/* -- Protected methods -- */
665 
666 	protected void insertValuesIntoTempQueryParameter(Set<Integer> parameterValues, String queryParameterName, int queryPersonId) throws SQLException {
667 		Preconditions.checkNotNull(insertQueryParameterStatement);
668 		if (debug) {
669 			log.debug(String.format("%s Setting query parameters into %s", tableName, TABLE_TEMP_QUERY_PARAMETER.toUpperCase()));
670 		}
671 
672 		Statement stm = getConnection().createStatement();
673 		try {
674 			stm.executeUpdate(String.format("DELETE FROM %", TABLE_TEMP_QUERY_PARAMETER));
675 		} finally {
676 			DaoUtils.closeSilently(stm);
677 		}
678 
679 		int i = 1;
680 		for (Integer parameterValue : parameterValues) {
681 			insertQueryParameterStatement.setInt(1, i++);
682 			insertQueryParameterStatement.setString(2, queryParameterName);
683 			insertQueryParameterStatement.setInt(3, parameterValue);
684 			insertQueryParameterStatement.setNull(4, Types.VARCHAR);
685 			insertQueryParameterStatement.setInt(5, queryPersonId);
686 			insertQueryParameterStatement.addBatch();
687 		}
688 
689 		insertQueryParameterStatement.executeBatch();
690 		insertQueryParameterStatement.clearBatch();
691 	}
692 
693 	protected String createTempQueryParameterQuery(SynchroTableMetadata table) {
694 		SynchroDatabaseMetadata dbMeta = table.getDatabaseMetadata();
695 		if (dbMeta.getTable(TABLE_TEMP_QUERY_PARAMETER) == null) {
696 			return null;
697 		}
698 
699 		return String.format("INSERT INTO %s (ID, PARAMETER_NAME, NUMERICAL_VALUE, ALPHANUMERICAL_VALUE, PERSON_FK)"
700 				+ " VALUES (?, ?, ?, ?, ?)",
701 				TABLE_TEMP_QUERY_PARAMETER);
702 
703 	}
704 
705 	protected void closeSelectStatements() {
706 		if (CollectionUtils.isNotEmpty(selectStatements)) {
707 			for (PreparedStatement statement : selectStatements) {
708 				DaoUtils.closeSilently(statement);
709 			}
710 		}
711 		selectStatements.clear();
712 	}
713 
714 	protected void closePreviousSelectStatements(PreparedStatement newStatement) {
715 		closeSelectStatements();
716 		selectStatements.add(newStatement);
717 	}
718 
719 	protected void setData(PreparedStatement statement, Object[] values, List<Object> debugParams) throws SQLException {
720 		for (int c = 1; c <= columnCount; c++) {
721 			Object object = values[c - 1];
722 			statement.setObject(c, object);
723 			if (debug) {
724 				debugParams.add(object);
725 			}
726 		}
727 	}
728 
729 	protected void setData(PreparedStatement statement, ResultSet incomingData, List<Object> debugParams) throws SQLException {
730 		for (int c = 1; c <= columnCount; c++) {
731 			Object object = incomingData.getObject(c);
732 			statement.setObject(c, object);
733 			if (debug) {
734 				debugParams.add(object);
735 			}
736 		}
737 	}
738 
739 	protected void transformAndSetData(PreparedStatement statement, ResultSet incomingData,
740 			SynchroWriteBuffer transformBuffer, SynchroInterceptor interceptor, List<Object> debugParams)
741 			throws SQLException {
742 		Preconditions.checkNotNull(transformBuffer);
743 
744 		// Transform data
745 		Object[] row = transformData(incomingData, interceptor, transformBuffer);
746 
747 		// Set the data into the statement
748 		setData(statement, row, debugParams);
749 	}
750 
751 	protected void transformAndSetData(PreparedStatement statement, Object[] incomingData,
752 			SynchroWriteBuffer transformBuffer, SynchroInterceptor interceptor, List<Object> debugParams)
753 			throws SQLException {
754 		Preconditions.checkNotNull(transformBuffer);
755 
756 		// Transform data
757 		Object[] row = transformData(incomingData, interceptor, transformBuffer);
758 
759 		// Set the data into the statement
760 		setData(statement, row, debugParams);
761 	}
762 
763 	protected Object[] transformData(ResultSet incomingData, SynchroInterceptor interceptor, SynchroWriteBuffer buffer)
764 			throws SQLException {
765 		Preconditions.checkNotNull(interceptor);
766 
767 		// Get data
768 		Object[] result = table.getData(incomingData);
769 
770 		// Transform values
771 		try {
772 			interceptor.onWrite(result, this, buffer);
773 		} catch (Exception e) {
774 			throw new SynchroTechnicalException(String.format("Error while transform row, for table %s", table.getName()), e);
775 		}
776 
777 		return result;
778 	}
779 
780 	protected Object[] transformData(Object[] incomingData, SynchroInterceptor interceptor, SynchroWriteBuffer buffer)
781 			throws SQLException {
782 		Preconditions.checkNotNull(interceptor);
783 
784 		// Transform values
785 		try {
786 			interceptor.onWrite(incomingData, this, buffer);
787 		} catch (Exception e) {
788 			throw new SynchroTechnicalException(String.format("Error while transform row, for table %s", table.getName()), e);
789 		}
790 
791 		return incomingData;
792 	}
793 
794 	protected ResultSet getDataByFkWithInOperator(String fkColumnName, Set<Integer> fkValues) throws SQLException {
795 		String sql = table.getSelectDataFromFkQuery(fkColumnName, fkValues.size());
796 
797 		Preconditions.checkNotNull(sql, String.format("Columns %s is not referenced for table %s", fkColumnName, table.getName()));
798 
799 		PreparedStatement statement = getConnection().prepareStatement(sql);
800 		closePreviousSelectStatements(statement);
801 
802 		int paramIndex = 1;
803 		for (Object value : fkValues) {
804 			statement.setObject(paramIndex, value);
805 			paramIndex++;
806 		}
807 
808 		statement.setFetchSize(batchSize);
809 
810 		ResultSet result = statement.executeQuery();
811 		return result;
812 	}
813 
814 	protected ResultSet getDataByFkUsingTempParameterTable(String fkColumnName, Set<Integer> fkValues) throws SQLException {
815 
816 		String sql = table.getSelectDataFromFkQueryBigParams(fkColumnName);
817 		Preconditions.checkNotNull(sql);
818 
819 		PreparedStatement statement = getConnection().prepareStatement(sql);
820 		closePreviousSelectStatements(statement);
821 
822 		String queryParameterName = "remoteIds";
823 		int personFk = -1;
824 		insertValuesIntoTempQueryParameter(fkValues, queryParameterName, personFk);
825 		statement.setString(1, queryParameterName);
826 		statement.setInt(2, personFk);
827 
828 		statement.setFetchSize(batchSize);
829 
830 		ResultSet result = statement.executeQuery();
831 		return result;
832 	}
833 
834 	@Override
835 	public List<Object> getPk(ResultSet incomingData) throws SQLException {
836 		return table.getPk(incomingData);
837 	}
838 
839 	private SynchroInterceptor initReadInterceptor(List<SynchroInterceptor> interceptors) {
840 		if (CollectionUtils.isEmpty(interceptors)) {
841 			return null;
842 		}
843 		SynchroContext context = table.getDatabaseMetadata().getContext();
844 		List<SynchroInterceptor> readInterceptors = Lists.newArrayList();
845 
846 		try {
847 			for (SynchroInterceptor interceptor : interceptors) {
848 				if (interceptor.enableOnRead()) {
849 					SynchroInterceptor newInterceptor;
850 					newInterceptor = interceptor.getClass().newInstance();
851 
852 					newInterceptor.setContext(context);
853 					readInterceptors.add(interceptor);
854 				}
855 			}
856 		} catch (Exception e) {
857 			throw new SynchroTechnicalException("Could not initialize DAO read interceptors.", e);
858 		}
859 
860 		if (CollectionUtils.isEmpty(readInterceptors)) {
861 			return null;
862 		}
863 
864 		return SynchroInterceptorUtils.chain(readInterceptors, SynchroInterceptorBase.class);
865 	}
866 
867 	private SynchroInterceptor initWriteInterceptor(List<SynchroInterceptor> interceptors) {
868 		if (CollectionUtils.isEmpty(interceptors)) {
869 			return null;
870 		}
871 		SynchroContext context = table.getDatabaseMetadata().getContext();
872 		List<SynchroInterceptor> writeInterceptors = Lists.newArrayList();
873 
874 		try {
875 			for (SynchroInterceptor interceptor : interceptors) {
876 				if (interceptor.enableOnWrite()) {
877 					SynchroInterceptor newInterceptor;
878 					newInterceptor = interceptor.getClass().newInstance();
879 
880 					newInterceptor.setContext(context);
881 					writeInterceptors.add(interceptor);
882 				}
883 			}
884 		} catch (Exception e) {
885 			throw new SynchroTechnicalException("Could not initialize DAO read interceptors.", e);
886 		}
887 
888 		if (CollectionUtils.isEmpty(writeInterceptors)) {
889 			return null;
890 		}
891 
892 		return SynchroInterceptorUtils.chain(writeInterceptors, SynchroInterceptorBase.class);
893 	}
894 
895 }