001package fr.ifremer.adagio.synchro.dao;
002
003/*
004 * #%L
005 * SIH-Adagio :: Synchronization
006 * $Id:$
007 * $HeadURL:$
008 * %%
009 * Copyright (C) 2012 - 2014 Ifremer
010 * %%
011 * This program is free software: you can redistribute it and/or modify
012 * it under the terms of the GNU Affero General Public License as published by
013 * the Free Software Foundation, either version 3 of the License, or
014 * (at your option) any later version.
015 * 
016 * This program is distributed in the hope that it will be useful,
017 * but WITHOUT ANY WARRANTY; without even the implied warranty of
018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
019 * GNU General Public License for more details.
020 * 
021 * You should have received a copy of the GNU Affero General Public License
022 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
023 * #L%
024 */
025
026import java.io.IOException;
027import java.sql.Connection;
028import java.sql.PreparedStatement;
029import java.sql.ResultSet;
030import java.sql.SQLException;
031import java.sql.Statement;
032import java.sql.Timestamp;
033import java.sql.Types;
034import java.util.Date;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038
039import org.apache.commons.collections.CollectionUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042
043import com.google.common.base.Preconditions;
044import com.google.common.collect.Lists;
045import com.google.common.collect.Maps;
046import com.google.common.collect.Sets;
047
048import fr.ifremer.adagio.synchro.SynchroTechnicalException;
049import fr.ifremer.adagio.synchro.config.SynchroConfiguration;
050import fr.ifremer.adagio.synchro.intercept.SynchroInterceptor;
051import fr.ifremer.adagio.synchro.intercept.SynchroInterceptorBase;
052import fr.ifremer.adagio.synchro.intercept.SynchroInterceptorUtils;
053import fr.ifremer.adagio.synchro.intercept.SynchroWriteBuffer;
054import fr.ifremer.adagio.synchro.meta.SynchroDatabaseMetadata;
055import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata;
056import fr.ifremer.adagio.synchro.meta.SynchroTableMetadata.TableInsertStrategy;
057import fr.ifremer.adagio.synchro.service.SynchroContext;
058import fr.ifremer.adagio.synchro.service.SynchroPendingOperationBuffer;
059
060public class SynchroTableDaoImpl implements SynchroTableDao {
061
062    /** Logger. */
063    private static final Log log =
064            LogFactory.getLog(SynchroTableDaoImpl.class);
065
066    public static final String TABLE_TEMP_QUERY_PARAMETER = "temp_query_parameter";
067
068    protected final Connection connection;
069
070    protected final SynchroDatabaseMetadata dbMeta;
071
072    protected final SynchroTableMetadata table;
073
074    protected final PreparedStatement insertStatement;
075
076    protected final PreparedStatement insertQueryParameterStatement;
077
078    protected final PreparedStatement updateStatement;
079
080    protected final List<PreparedStatement> selectStatements;
081
082    protected final boolean insertStrategyGenerateIdFirst;
083
084    protected final int columnCount;
085
086    protected final int batchSize;
087
088    protected final String tableName;
089
090    protected boolean debug;
091
092    protected int insertCount = 0;
093
094    protected int updateCount = 0;
095
096    protected final int idColumnIndex;
097
098    protected final SynchroInterceptor readInterceptor;
099
100    protected final SynchroInterceptor writeInterceptor;
101
102    protected SynchroPendingOperationBuffer pendingOperationBuffer;
103
104    public SynchroTableDaoImpl(
105            Connection connection,
106            SynchroTableMetadata table,
107            boolean enableWriteStatements) throws SQLException {
108        this.connection = connection;
109        this.table = table;
110        this.dbMeta = table.getDatabaseMetadata();
111        this.columnCount = table.getColumnsCount();
112        this.selectStatements = Lists.newArrayList();
113        this.tableName = table.getName();
114
115        // Batch size
116        this.batchSize = SynchroConfiguration.getInstance().getImportJdbcBatchSize();
117        Preconditions.checkArgument(this.batchSize > 0);
118
119        debug = log.isTraceEnabled();
120
121        if (table.isWithIdColumn()) {
122            this.idColumnIndex = table.getColumnIndex(SynchroTableMetadata.COLUMN_ID) + 1;
123        }
124        else {
125            this.idColumnIndex = -1;
126        }
127
128        if (enableWriteStatements) {
129            // Prepare insert statement
130            String insertSql = table.getInsertQuery();
131            Preconditions.checkArgument(insertSql.toUpperCase().startsWith("INSERT"),
132                    String.format("%s Insert SQL query should be like 'INSERT ...' but was: %s", this.tableName, insertSql));
133            this.insertStatement = connection.prepareStatement(insertSql);
134            this.insertStrategyGenerateIdFirst = table.getInsertStrategy() == TableInsertStrategy.GENERATE_ID_FIRST;
135
136            // Prepare update statement
137            String updateSql = table.getUpdateQuery();
138            Preconditions.checkArgument(updateSql.toUpperCase().startsWith("UPDATE"),
139                    String.format("%s Update SQL query should be like 'UPDATE ...' but was: %s", this.tableName, updateSql));
140            this.updateStatement = connection.prepareStatement(updateSql);
141
142            // Init write interceptors
143            this.writeInterceptor = initWriteInterceptor(table.getInterceptors());
144        }
145        else {
146            this.insertStatement = null;
147            this.updateStatement = null;
148            this.insertStrategyGenerateIdFirst = false;
149            this.writeInterceptor = null;
150        }
151
152        // Query temp parameter (for query parameter with many values)
153        String insertQueryParameterSql = createTempQueryParameterQuery(table);
154        if (insertQueryParameterSql != null) {
155            insertQueryParameterStatement = getConnection().prepareStatement(insertQueryParameterSql);
156        }
157        else {
158            insertQueryParameterStatement = null;
159        }
160
161        this.readInterceptor = initReadInterceptor(table.getInterceptors());
162    }
163
164    @Override
165    public Connection getConnection() {
166        return connection;
167    }
168
169    @Override
170    public SynchroTableMetadata getTable() {
171        return table;
172    }
173
174    public void setPendingOperationBuffer(SynchroPendingOperationBuffer pendingChangesBuffer) {
175        this.pendingOperationBuffer = pendingChangesBuffer;
176    }
177
178    public SynchroPendingOperationBuffer getPendingOperationBuffer() {
179        return this.pendingOperationBuffer;
180    }
181
182    public int getInsertCount() {
183        return insertCount;
184    }
185
186    public int getUpdateCount() {
187        return updateCount;
188    }
189
190    public void flush() throws SQLException {
191        if (!debug) {
192            if (insertCount > 0 && insertCount % batchSize != 0) {
193                insertStatement.executeBatch();
194                insertStatement.clearBatch();
195            }
196            if (updateCount > 0 && updateCount % batchSize != 0) {
197                updateStatement.executeBatch();
198                updateStatement.clearBatch();
199            }
200        }
201    }
202
203    @Override
204    public void close() throws IOException {
205        DaoUtils.closeSilently(insertStatement);
206        DaoUtils.closeSilently(updateStatement);
207        DaoUtils.closeSilently(insertQueryParameterStatement);
208        closeSelectStatements();
209    }
210
211    /**
212     * Gets the last updateDate for the given {@code table} using
213     * the given datasource.
214     * 
215     * @return the last update date of the given table, or {@code null} if table does not use a updateDate columns or if
216     *         there
217     *         is no data in table.
218     */
219    public Timestamp getLastUpdateDate() throws SQLException {
220        if (!table.isWithUpdateDateColumn()) {
221            return null;
222        }
223
224        String sql = table.getSelectMaxUpdateDateQuery();
225        if (sql == null) {
226            return null;
227        }
228
229        PreparedStatement statement = getConnection().prepareStatement(sql);
230        try {
231            ResultSet resultSet = statement.executeQuery();
232            if (!resultSet.next()) {
233                return null;
234            }
235            Timestamp result = resultSet.getTimestamp(1);
236            return result;
237        } finally {
238            DaoUtils.closeSilently(statement);
239        }
240    }
241
242    public long count() throws SQLException {
243
244        String sql = table.getCountQuery();
245
246        PreparedStatement statement = getConnection().prepareStatement(sql);
247
248        try {
249            ResultSet resultSet = statement.executeQuery();
250            resultSet.next();
251            long result = resultSet.getLong(1);
252            return result;
253        } finally {
254            DaoUtils.closeSilently(statement);
255        }
256    }
257
258    public long countDataToUpdate(Date fromDate) throws SQLException {
259
260        String sql = table.getCountDataToUpdateQuery(fromDate);
261
262        PreparedStatement statement = getConnection().prepareStatement(sql);
263        log.debug(sql);
264        try {
265            if (table.isWithUpdateDateColumn() &&
266                    fromDate != null) {
267                Timestamp updateDateValue = new Timestamp(fromDate.getTime());
268                // Set all query parameter (could have more than one,
269                // when query has been override by an interceptor)
270                for (int i = 1; i <= statement.getParameterMetaData().getParameterCount(); i++) {
271                    statement.setTimestamp(i, updateDateValue);
272                }
273            }
274
275            ResultSet queryResult = statement.executeQuery();
276            queryResult.next();
277            long result = queryResult.getLong(1);
278            return result;
279        } finally {
280            DaoUtils.closeSilently(statement);
281        }
282    }
283
284    public ResultSet getDataToUpdate(Date fromDate) throws SQLException {
285
286        String sql = table.getSelectDataToUpdateQuery(fromDate);
287
288        PreparedStatement statement = getConnection().prepareStatement(sql);
289        selectStatements.add(statement);
290
291        if (table.isWithUpdateDateColumn() &&
292                fromDate != null) {
293            statement.setTimestamp(1, new Timestamp(fromDate.getTime()));
294            if (statement.getParameterMetaData().getParameterCount() > 1) {
295                statement.setTimestamp(2, new Timestamp(fromDate.getTime()));
296            }
297        }
298        statement.setFetchSize(batchSize);
299
300        ResultSet result = statement.executeQuery();
301        return result;
302    }
303
304    public void deleteAll() throws SQLException {
305        PreparedStatement deleteStatement =
306                getConnection().prepareStatement(
307                        "DELETE FROM " + table.getName());
308        try {
309            deleteStatement.execute();
310        } finally {
311            DaoUtils.closeSilently(deleteStatement);
312        }
313    }
314
315    public Object[] findByPk(List<Object> pk) throws SQLException {
316        String selectDataSql = table.getSelectDataQueryFromPk();
317
318        PreparedStatement selectStatement = getConnection().prepareStatement(selectDataSql);
319        int columnCountIndex = 1;
320
321        for (Object pkColumn : pk) {
322            selectStatement.setObject(columnCountIndex++, pkColumn);
323        }
324
325        int columnsCount = table.getColumnsCount();
326
327        try {
328            ResultSet resultSet = selectStatement.executeQuery();
329            resultSet.next();
330
331            Object[] result = new Object[columnsCount];
332
333            for (int i = 1; i <= columnsCount; i++) {
334
335                result[i - 1] = resultSet.getObject(i);
336            }
337            return result;
338        } finally {
339            DaoUtils.closeSilently(selectStatement);
340        }
341    }
342
343    public Set<String> getExistingPrimaryKeys() throws SQLException {
344        Set<String> result = Sets.newHashSet();
345
346        // If simple SQL (select with only one output String column)
347        if (table.isSelectPrimaryKeysAsStringQueryEnable()) {
348            String sql = table.getSelectPrimaryKeysAsStringQuery();
349            PreparedStatement statement = getConnection().prepareStatement(sql);
350            try {
351                ResultSet resultSet = statement.executeQuery();
352
353                while (resultSet.next()) {
354                    String pk = resultSet.getString(1);
355                    result.add(pk);
356                }
357            } finally {
358                DaoUtils.closeSilently(statement);
359            }
360        }
361
362        // If more than one one column as String, in the select query
363        else {
364            String sql = table.getSelectPrimaryKeysQuery();
365            PreparedStatement statement = getConnection().prepareStatement(sql);
366
367            try {
368                ResultSet resultSet = statement.executeQuery();
369
370                int columnCount = table.getPkNames().size();
371                List<Object> pks = Lists.newArrayListWithCapacity(columnCount);
372                while (resultSet.next()) {
373                    for (int i = 1; i <= columnCount; i++) {
374                        Object pk = resultSet.getObject(i);
375                        pks.add(pk);
376                    }
377                    result.add(table.toPkStr(pks));
378                    pks.clear();
379                }
380            } finally {
381                DaoUtils.closeSilently(statement);
382            }
383        }
384
385        return result;
386    }
387
388    public Integer generateNewId() throws SQLException {
389        String sql = table.getSequenceNextValString();
390        Preconditions.checkNotNull(sql);
391
392        if (sql == null) {
393            return null;
394        }
395
396        PreparedStatement statement = getConnection().prepareStatement(sql);
397        try {
398            ResultSet resultSet = statement.executeQuery();
399            if (!resultSet.next()) {
400                return null;
401            }
402            Integer result = resultSet.getInt(1);
403            return result;
404        } finally {
405            DaoUtils.closeSilently(statement);
406        }
407    }
408
409    public void executeInsert(ResultSet incomingData) throws SQLException {
410        Preconditions.checkArgument(!this.insertStrategyGenerateIdFirst);
411
412        List<Object> params = null;
413        if (debug) {
414            params = Lists.newArrayList();
415        }
416
417        if (writeInterceptor != null) {
418            transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
419        }
420        else {
421            setData(insertStatement, incomingData, params);
422        }
423        insertCount++;
424
425        if (debug) {
426            List<Object> pk = table.getPk(incomingData);
427            log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, table.toPkStr(pk), params));
428            int nbRowInsert = insertStatement.executeUpdate();
429            if (nbRowInsert != 1) {
430                throw new SynchroTechnicalException(String.format("%s Could not insert a row into the table (pk:%s), params: %s", tableName,
431                        table.toPkStr(pk), params));
432            }
433        }
434        else {
435            insertStatement.addBatch();
436            if (insertCount > 0 && insertCount % batchSize == 0) {
437                insertStatement.executeBatch();
438                insertStatement.clearBatch();
439            }
440        }
441    }
442
443    public void executeInsert(Object[] incomingData) throws SQLException {
444        Preconditions.checkArgument(!this.insertStrategyGenerateIdFirst);
445
446        List<Object> params = null;
447        if (debug) {
448            params = Lists.newArrayList();
449        }
450
451        if (writeInterceptor != null) {
452            transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
453        }
454        else {
455            setData(insertStatement, incomingData, params);
456        }
457        insertCount++;
458
459        if (debug) {
460            List<Object> pk = table.getPk(incomingData);
461            log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, table.toPkStr(pk), params));
462            insertStatement.executeUpdate();
463        }
464        else {
465            insertStatement.addBatch();
466            if (insertCount > 0 && insertCount % batchSize == 0) {
467                insertStatement.executeBatch();
468                insertStatement.clearBatch();
469            }
470        }
471    }
472
473    @Override
474    public Integer executeInsertAndReturnId(ResultSet incomingData) throws SQLException {
475        Preconditions.checkArgument(this.insertStrategyGenerateIdFirst);
476
477        // Generate Id
478        Integer id = generateNewId();
479        List<Object> params = null;
480
481        if (debug) {
482            params = Lists.newArrayList();
483        }
484
485        if (writeInterceptor != null) {
486            transformAndSetData(insertStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
487        }
488        else {
489            setData(insertStatement, incomingData, params);
490        }
491
492        insertStatement.setObject(columnCount + 1, id);
493        insertCount++;
494
495        if (debug) {
496            log.debug(String.format("%s Execute insert query (pk:%s), params: %s", tableName, id, params));
497            insertStatement.executeUpdate();
498        }
499        else {
500            insertStatement.addBatch();
501            if (insertCount > 0 && insertCount % batchSize == 0) {
502                insertStatement.executeBatch();
503                insertStatement.clearBatch();
504            }
505        }
506
507        return id;
508    }
509
510    @Override
511    public void executeUpdate(List<Object> pk, ResultSet incomingData) throws SQLException {
512
513        List<Object> params = null;
514        if (debug) {
515            params = Lists.newArrayList();
516        }
517
518        if (writeInterceptor != null) {
519            transformAndSetData(updateStatement, incomingData, pendingOperationBuffer, writeInterceptor, params);
520        }
521        else {
522            setData(updateStatement, incomingData, params);
523        }
524
525        int columnCountIndex = columnCount + 1;
526        for (Object pkColumn : pk) {
527            updateStatement.setObject(columnCountIndex++, pkColumn);
528        }
529
530        updateCount++;
531
532        if (debug) {
533            log.debug(String.format("%s Execute update query (pk:%s), params: %s", tableName, pk, params));
534            int nbRowUpdated = updateStatement.executeUpdate();
535            Preconditions.checkArgument(nbRowUpdated == 1, String.format("%s rows has been updated, but expected 1 row.", nbRowUpdated));
536        }
537        else {
538            updateStatement.addBatch();
539            if (updateCount > 0 && updateCount % batchSize == 0) {
540                updateStatement.executeBatch();
541                updateStatement.clearBatch();
542            }
543        }
544    }
545
546    @Override
547    public void executeUpdate(List<Object> pk, Object[] row) throws SQLException {
548        List<Object> params = null;
549        if (debug) {
550            params = Lists.newArrayList();
551        }
552
553        if (writeInterceptor != null) {
554            transformAndSetData(updateStatement, row, pendingOperationBuffer, writeInterceptor, params);
555        }
556        else {
557            setData(updateStatement, row, params);
558        }
559
560        int columnCountIndex = columnCount + 1;
561
562        for (Object pkColumn : pk) {
563            updateStatement.setObject(columnCountIndex++, pkColumn);
564        }
565
566        updateCount++;
567
568        if (debug) {
569            log.debug(String.format("%s Execute update query (pk:%s), params: %s", tableName, pk, params));
570            int nbRowUpdated = updateStatement.executeUpdate();
571            Preconditions.checkArgument(nbRowUpdated == 1, String.format("%s rows has been updated, but expected 1 row.", nbRowUpdated));
572        }
573        else {
574            updateStatement.addBatch();
575            if (updateCount > 0 && updateCount % batchSize == 0) {
576                updateStatement.executeBatch();
577                updateStatement.clearBatch();
578            }
579        }
580    }
581
582    @Override
583    public ResultSet getDataByFk(String fkColumnName, Set<Integer> fkValues) throws SQLException {
584
585        boolean insertUsingTempQueryParameter = isTempQueryParameterEnable() && fkValues.size() > dbMeta.getInExpressionCountLimit();
586
587        if (insertUsingTempQueryParameter) {
588            return getDataByFkUsingTempParameterTable(fkColumnName, fkValues);
589        }
590
591        return getDataByFkWithInOperator(fkColumnName, fkValues);
592    }
593
594    @Override
595    public Integer getIdFromRemoteId(String tableName, Integer remoteId) throws SQLException {
596        String sql = getTable().getSelectIdFromRemoteIdQuery(tableName);
597        if (sql == null) {
598            return null;
599        }
600
601        PreparedStatement statement = getConnection().prepareStatement(sql);
602        statement.setInt(1, remoteId);
603        try {
604            ResultSet resultSet = statement.executeQuery();
605            if (!resultSet.next()) {
606                return null;
607            }
608            Integer result = resultSet.getInt(1);
609
610            return result;
611        } finally {
612            DaoUtils.closeSilently(statement);
613        }
614    }
615
616    @Override
617    public Map<Integer, Integer> getExistingRemoteIdsMap() throws SQLException {
618
619        String sql = getTable().getSelectRemoteIdsQuery();
620
621        PreparedStatement statement = getConnection().prepareStatement(sql);
622
623        Map<Integer, Integer> result = Maps.newHashMap();
624
625        try {
626            ResultSet resultSet = statement.executeQuery();
627            while (resultSet.next()) {
628                int id = resultSet.getInt(1);
629                int remoteId = resultSet.getInt(2);
630                result.put(remoteId, id);
631            }
632            statement.close();
633            return result;
634        } finally {
635            DaoUtils.closeSilently(statement);
636        }
637    }
638
639    public List<Object> getPk(ResultSet incomingData, SynchroWriteBuffer transformResult) throws SQLException {
640        List<Object> result;
641
642        if (readInterceptor != null) {
643            Object[] data = transformData(incomingData, readInterceptor, transformResult);
644            result = getTable().getPk(data);
645        }
646        else {
647            result = getTable().getPk(incomingData);
648        }
649
650        return result;
651    }
652
653    public boolean isTempQueryParameterEnable() {
654        return insertQueryParameterStatement != null;
655    }
656
657    public void clearCounts() {
658        insertCount = 0;
659        updateCount = 0;
660    }
661
662    /* -- Abstract method -- */
663
664    /* -- Protected methods -- */
665
666    protected void insertValuesIntoTempQueryParameter(Set<Integer> parameterValues, String queryParameterName, int queryPersonId) throws SQLException {
667        Preconditions.checkNotNull(insertQueryParameterStatement);
668        if (debug) {
669            log.debug(String.format("%s Setting query parameters into %s", tableName, TABLE_TEMP_QUERY_PARAMETER.toUpperCase()));
670        }
671
672        Statement stm = getConnection().createStatement();
673        try {
674            stm.executeUpdate(String.format("DELETE FROM %", TABLE_TEMP_QUERY_PARAMETER));
675        } finally {
676            DaoUtils.closeSilently(stm);
677        }
678
679        int i = 1;
680        for (Integer parameterValue : parameterValues) {
681            insertQueryParameterStatement.setInt(1, i++);
682            insertQueryParameterStatement.setString(2, queryParameterName);
683            insertQueryParameterStatement.setInt(3, parameterValue);
684            insertQueryParameterStatement.setNull(4, Types.VARCHAR);
685            insertQueryParameterStatement.setInt(5, queryPersonId);
686            insertQueryParameterStatement.addBatch();
687        }
688
689        insertQueryParameterStatement.executeBatch();
690        insertQueryParameterStatement.clearBatch();
691    }
692
693    protected String createTempQueryParameterQuery(SynchroTableMetadata table) {
694        SynchroDatabaseMetadata dbMeta = table.getDatabaseMetadata();
695        if (dbMeta.getTable(TABLE_TEMP_QUERY_PARAMETER) == null) {
696            return null;
697        }
698
699        return String.format("INSERT INTO %s (ID, PARAMETER_NAME, NUMERICAL_VALUE, ALPHANUMERICAL_VALUE, PERSON_FK)"
700                + " VALUES (?, ?, ?, ?, ?)",
701                TABLE_TEMP_QUERY_PARAMETER);
702
703    }
704
705    protected void closeSelectStatements() {
706        if (CollectionUtils.isNotEmpty(selectStatements)) {
707            for (PreparedStatement statement : selectStatements) {
708                DaoUtils.closeSilently(statement);
709            }
710        }
711        selectStatements.clear();
712    }
713
714    protected void closePreviousSelectStatements(PreparedStatement newStatement) {
715        closeSelectStatements();
716        selectStatements.add(newStatement);
717    }
718
719    protected void setData(PreparedStatement statement, Object[] values, List<Object> debugParams) throws SQLException {
720        for (int c = 1; c <= columnCount; c++) {
721            Object object = values[c - 1];
722            statement.setObject(c, object);
723            if (debug) {
724                debugParams.add(object);
725            }
726        }
727    }
728
729    protected void setData(PreparedStatement statement, ResultSet incomingData, List<Object> debugParams) throws SQLException {
730        for (int c = 1; c <= columnCount; c++) {
731            Object object = incomingData.getObject(c);
732            statement.setObject(c, object);
733            if (debug) {
734                debugParams.add(object);
735            }
736        }
737    }
738
739    protected void transformAndSetData(PreparedStatement statement, ResultSet incomingData,
740            SynchroWriteBuffer transformBuffer, SynchroInterceptor interceptor, List<Object> debugParams)
741            throws SQLException {
742        Preconditions.checkNotNull(transformBuffer);
743
744        // Transform data
745        Object[] row = transformData(incomingData, interceptor, transformBuffer);
746
747        // Set the data into the statement
748        setData(statement, row, debugParams);
749    }
750
751    protected void transformAndSetData(PreparedStatement statement, Object[] incomingData,
752            SynchroWriteBuffer transformBuffer, SynchroInterceptor interceptor, List<Object> debugParams)
753            throws SQLException {
754        Preconditions.checkNotNull(transformBuffer);
755
756        // Transform data
757        Object[] row = transformData(incomingData, interceptor, transformBuffer);
758
759        // Set the data into the statement
760        setData(statement, row, debugParams);
761    }
762
763    protected Object[] transformData(ResultSet incomingData, SynchroInterceptor interceptor, SynchroWriteBuffer buffer)
764            throws SQLException {
765        Preconditions.checkNotNull(interceptor);
766
767        // Get data
768        Object[] result = table.getData(incomingData);
769
770        // Transform values
771        try {
772            interceptor.onWrite(result, this, buffer);
773        } catch (Exception e) {
774            throw new SynchroTechnicalException(String.format("Error while transform row, for table %s", table.getName()), e);
775        }
776
777        return result;
778    }
779
780    protected Object[] transformData(Object[] incomingData, SynchroInterceptor interceptor, SynchroWriteBuffer buffer)
781            throws SQLException {
782        Preconditions.checkNotNull(interceptor);
783
784        // Transform values
785        try {
786            interceptor.onWrite(incomingData, this, buffer);
787        } catch (Exception e) {
788            throw new SynchroTechnicalException(String.format("Error while transform row, for table %s", table.getName()), e);
789        }
790
791        return incomingData;
792    }
793
794    protected ResultSet getDataByFkWithInOperator(String fkColumnName, Set<Integer> fkValues) throws SQLException {
795        String sql = table.getSelectDataFromFkQuery(fkColumnName, fkValues.size());
796
797        Preconditions.checkNotNull(sql, String.format("Columns %s is not referenced for table %s", fkColumnName, table.getName()));
798
799        PreparedStatement statement = getConnection().prepareStatement(sql);
800        closePreviousSelectStatements(statement);
801
802        int paramIndex = 1;
803        for (Object value : fkValues) {
804            statement.setObject(paramIndex, value);
805            paramIndex++;
806        }
807
808        statement.setFetchSize(batchSize);
809
810        ResultSet result = statement.executeQuery();
811        return result;
812    }
813
814    protected ResultSet getDataByFkUsingTempParameterTable(String fkColumnName, Set<Integer> fkValues) throws SQLException {
815
816        String sql = table.getSelectDataFromFkQueryBigParams(fkColumnName);
817        Preconditions.checkNotNull(sql);
818
819        PreparedStatement statement = getConnection().prepareStatement(sql);
820        closePreviousSelectStatements(statement);
821
822        String queryParameterName = "remoteIds";
823        int personFk = -1;
824        insertValuesIntoTempQueryParameter(fkValues, queryParameterName, personFk);
825        statement.setString(1, queryParameterName);
826        statement.setInt(2, personFk);
827
828        statement.setFetchSize(batchSize);
829
830        ResultSet result = statement.executeQuery();
831        return result;
832    }
833
834    @Override
835    public List<Object> getPk(ResultSet incomingData) throws SQLException {
836        return table.getPk(incomingData);
837    }
838
839    private SynchroInterceptor initReadInterceptor(List<SynchroInterceptor> interceptors) {
840        if (CollectionUtils.isEmpty(interceptors)) {
841            return null;
842        }
843        SynchroContext context = table.getDatabaseMetadata().getContext();
844        List<SynchroInterceptor> readInterceptors = Lists.newArrayList();
845
846        try {
847            for (SynchroInterceptor interceptor : interceptors) {
848                if (interceptor.enableOnRead()) {
849                    SynchroInterceptor newInterceptor;
850                    newInterceptor = interceptor.getClass().newInstance();
851
852                    newInterceptor.setContext(context);
853                    readInterceptors.add(interceptor);
854                }
855            }
856        } catch (Exception e) {
857            throw new SynchroTechnicalException("Could not initialize DAO read interceptors.", e);
858        }
859
860        if (CollectionUtils.isEmpty(readInterceptors)) {
861            return null;
862        }
863
864        return SynchroInterceptorUtils.chain(readInterceptors, SynchroInterceptorBase.class);
865    }
866
867    private SynchroInterceptor initWriteInterceptor(List<SynchroInterceptor> interceptors) {
868        if (CollectionUtils.isEmpty(interceptors)) {
869            return null;
870        }
871        SynchroContext context = table.getDatabaseMetadata().getContext();
872        List<SynchroInterceptor> writeInterceptors = Lists.newArrayList();
873
874        try {
875            for (SynchroInterceptor interceptor : interceptors) {
876                if (interceptor.enableOnWrite()) {
877                    SynchroInterceptor newInterceptor;
878                    newInterceptor = interceptor.getClass().newInstance();
879
880                    newInterceptor.setContext(context);
881                    writeInterceptors.add(interceptor);
882                }
883            }
884        } catch (Exception e) {
885            throw new SynchroTechnicalException("Could not initialize DAO read interceptors.", e);
886        }
887
888        if (CollectionUtils.isEmpty(writeInterceptors)) {
889            return null;
890        }
891
892        return SynchroInterceptorUtils.chain(writeInterceptors, SynchroInterceptorBase.class);
893    }
894
895}