View Javadoc

1   package net.sf.appstatus.batch.jdbc;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.InputStreamReader;
6   import java.io.Reader;
7   import java.io.StringWriter;
8   import java.sql.Clob;
9   import java.sql.SQLException;
10  import java.util.ArrayList;
11  import java.util.List;
12  
13  import net.sf.appstatus.core.batch.IBatch;
14  
15  import org.apache.commons.lang3.StringUtils;
16  import org.joda.time.DateTime;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  import org.springframework.dao.DataAccessException;
20  import org.springframework.jdbc.core.JdbcTemplate;
21  import org.springframework.jdbc.support.rowset.SqlRowSet;
22  import org.springframework.transaction.annotation.Propagation;
23  import org.springframework.transaction.annotation.Transactional;
24  
25  /**
26   * 
27   * <pre>
28   * &lt;bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"
29   * scope="singleton">
30   *     &lt;constructor-arg ref="dataSource" /> 
31   * &lt;/bean>
32   * 
33   * &lt;bean id="batchDao" class="net.sf.appstatus.batch.jdbc.BatchDao"
34   * scope="singleton"> &lt;property name="jdbcTemplate" ref="jdbcTemplate" />
35   * &lt;/bean>
36   * 
37   * 
38   * 
39   * &lt;bean id="jdbcBatchManager"
40   * class="net.sf.appstatus.batch.jdbc.JdbcBatchManager" scope="singleton">
41   * 		&lt;property name="batchDao" ref="batchDao" /> 
42   * &lt;/bean>
43   * 
44   * </pre>
45   * 
46   * Create table: BATCH
47   * 
48   * <p>
49   * <table>
50   * <tr>
51   * <td>UUID_BATCH</td>
52   * <td>varchar (256)</td>
53   * </tr>
54   * <tr>
55   * <td>GROUP_BATCH</td>
56   * <td>varchar (256)</td>
57   * </tr>
58   * <tr>
59   * <td>NAME_BATCH</td>
60   * <td>varchar (256)</td>
61   * </tr>
62   * <tr>
63   * <td>START_DATE</td>
64   * <td>DATETIME</td>
65   * </tr>
66   * <tr>
67   * <td>END_DATE</td>
68   * <td>DATETIME</td>
69   * </tr>
70   * <tr>
71   * <td>UPDATED</td>
72   * <td>DATETIME</td>
73   * </tr>
74   * <tr>
75   * <td>STATUS</td>
76   * <td>varchar (64)</td>
77   * </tr>
78   * <tr>
79   * <td>SUCCESS</td>
80   * <td>BOOLEAN</td>
81   * </tr>
82   * <tr>
83   * <td>ITEMCOUNT</td>
84   * <td>LONG</td>
85   * </tr>
86   * <tr>
87   * <td>ITEM</td>
88   * <td>varchar (256)</td>
89   * </tr>
90   * </tr>
91   * <tr>
92   * <td>CURRENT_TASK</td>
93   * <td>varchar (256)</td>
94   * </tr>
95   * <tr>
96   * <td>PROGRESS</td>
97   * <td>Float</td>
98   * </tr>
99   * <tr>
100  * <td>REJECT</td>
101  * <td>CLOB</td>
102  * </tr>
103  * <tr>
104  * <td>LAST_MSG</td>
105  * <td>varchar (1024)</td>
106  * </tr>
107  * </table>
108  */
109 public class BatchDao {
110 
111 	// Query codes
112 	public static final int BATCH_FETCH = 2;
113 	public static final int BATCH_DELETE = 3;
114 	public static final int BATCH_DELETE_OLD = 4;
115 	public static final int BATCH_DELETE_SUCCESS = 5;
116 	public static final int BATCH_INSERT = 6;
117 	public static final int BATCH_UPDATE = 7;
118 	public static final int BATCH_CREATE_TABLE = 1;
119 	public static final int BATCH_FETCH_BY_NAME = 8;
120 
121 	private static Logger logger = LoggerFactory.getLogger(BatchDao.class);
122 
123 	/**
124 	 * Spring JDBC template
125 	 */
126 	private JdbcTemplate jdbcTemplate;
127 	protected String tableName = "BATCH";
128 
129 	/**
130 	 * Check for storage table and create if necessary.
131 	 * 
132 	 * @return true if database was created.
133 	 */
134 	public boolean createDbIfNecessary() {
135 		logger.info("Looking for table {}...", tableName);
136 
137 		try {
138 			this.jdbcTemplate.execute("select count(*) from " + tableName);
139 			logger.info("Table {} found.", tableName);
140 			return false;
141 		} catch (DataAccessException e) {
142 			logger.warn("Table {} not found. Creating using \"{}\" ...",
143 					tableName, getSql(BATCH_CREATE_TABLE));
144 			jdbcTemplate.execute(getSql(BATCH_CREATE_TABLE));
145 			logger.info("Table {} created", tableName);
146 			return true;
147 		}
148 	}
149 
150 	@Transactional(propagation = Propagation.REQUIRES_NEW)
151 	public void deleteBatch(final String uuidBatch) {
152 		Object[] parameters = new Object[] { uuidBatch };
153 		this.jdbcTemplate.update(getSql(BATCH_DELETE), parameters);
154 		logger.info("Batch {} deleted.", uuidBatch);
155 	}
156 
157 	@Transactional(propagation = Propagation.REQUIRES_NEW)
158 	public void deleteOldBatches(final int delay) {
159 		Object[] parameters = new Object[] {
160 				new DateTime().minusMonths(delay).toDate(),
161 				IBatch.STATUS_RUNNING };
162 		this.jdbcTemplate.update(getSql(BATCH_DELETE_OLD), parameters);
163 		logger.info("Batchs older than {} months deleted.", delay);
164 	}
165 
166 	@Transactional(propagation = Propagation.REQUIRES_NEW)
167 	public void deleteSuccessBatches() {
168 		Object[] parameters = new Object[] { IBatch.STATUS_SUCCESS };
169 		this.jdbcTemplate.update(getSql(BATCH_DELETE_SUCCESS), parameters);
170 		logger.info("Batchs with success status deleted.");
171 	}
172 
173 	@Transactional(readOnly = true)
174 	private List<BdBatch> fetchBdBatch(final int max, String[] status) {
175 		SqlRowSet srs = this.jdbcTemplate.queryForRowSet(
176 				insertParametersFromList(getSql(BATCH_FETCH), status),
177 				new Object[] { max });
178 		return resultSet2Batches(srs);
179 	}
180 
181 	@Transactional(readOnly = true)
182 	private List<BdBatch> fetchBdBatch(String group, String name,
183 			final int max, String[] status) {
184 		SqlRowSet srs = this.jdbcTemplate.queryForRowSet(
185 				insertParametersFromList(getSql(BATCH_FETCH_BY_NAME), status),
186 				new Object[] { group, name, max });
187 		return resultSet2Batches(srs);
188 	}
189 
190 	@Transactional(readOnly = true)
191 	public List<BdBatch> fetchError(final int max) {
192 		return fetchBdBatch(max, new String[] { IBatch.STATUS_FAILURE });
193 	}
194 
195 	@Transactional(readOnly = true)
196 	public List<BdBatch> fetchFinished(final int max) {
197 		return fetchBdBatch(max, new String[] { IBatch.STATUS_SUCCESS,
198 				IBatch.STATUS_FAILURE });
199 	}
200 
201 	@Transactional(readOnly = true)
202 	public List<BdBatch> fetchRunning(final int max) {
203 		return fetchBdBatch(max, new String[] { IBatch.STATUS_RUNNING });
204 	}
205 
206 	/**
207 	 * Get SQL query for the requested action.
208 	 * <p>
209 	 * Override this method to adapt to a new SQL Dialect.
210 	 * 
211 	 * @param query
212 	 *            {@link #BATCH_FETCH} {@link #BATCH_CREATE_TABLE}
213 	 * @return the SQL query
214 	 */
215 	protected String getSql(int query) {
216 
217 		switch (query) {
218 
219 		case BATCH_UPDATE:
220 			return "UPDATE "
221 					+ tableName
222 					+ " set ITEM = ?, CURRENT_TASK = ?, END_DATE=?, GROUP_BATCH=?,  ITEMCOUNT=?, "
223 					+ "LAST_MSG = ?, UPDATED=?, NAME_BATCH=?, PROGRESS = ?, REJECT = ?, STATUS=?, "
224 					+ "SUCCESS=?  WHERE  UUID_BATCH=?";
225 
226 		case BATCH_DELETE_SUCCESS:
227 			return "delete from " + tableName
228 					+ " where STATUS = ? AND REJECT ='' ";
229 		case BATCH_DELETE:
230 			return "delete from " + tableName + " where UUID_BATCH = ?";
231 		case BATCH_DELETE_OLD:
232 			return "delete from " + tableName
233 					+ " where UPDATED < ? AND STATUS != ?";
234 		case BATCH_INSERT:
235 			return "INSERT into "
236 					+ tableName
237 					+ " (UUID_BATCH,GROUP_BATCH,NAME_BATCH,START_DATE,STATUS,ITEMCOUNT) values (?,?,?,?,?,0)";
238 
239 		case BATCH_FETCH:
240 			return "SELECT UUID_BATCH, ITEM, CURRENT_TASK, END_DATE, GROUP_BATCH, ITEMCOUNT, LAST_MSG, UPDATED,"
241 					+ " NAME_BATCH, PROGRESS, REJECT, START_DATE, STATUS,SUCCESS FROM "//
242 					+ tableName
243 					+ " WHERE STATUS IN ( %s ) ORDER BY UPDATED DESC LIMIT ? ";
244 		case BATCH_FETCH_BY_NAME:
245 			return "SELECT UUID_BATCH, ITEM, CURRENT_TASK, END_DATE, GROUP_BATCH, ITEMCOUNT, LAST_MSG, UPDATED,"
246 					+ " NAME_BATCH, PROGRESS, REJECT, START_DATE, STATUS,SUCCESS FROM "//
247 					+ tableName
248 					+ " WHERE  GROUP_BATCH = ? AND NAME_BATCH = ? AND STATUS IN ( %s ) ORDER BY UPDATED DESC LIMIT ? ";
249 		case BATCH_CREATE_TABLE:
250 			return "CREATE TABLE " + tableName + " (" //
251 					+ " UUID_BATCH varchar(256) NOT NULL," //
252 					+ "GROUP_BATCH varchar(256) NULL," //
253 					+ "NAME_BATCH varchar(256) NULL," //
254 					+ "START_DATE DATETIME  NULL," //
255 					+ "END_DATE DATETIME NULL," //
256 					+ "UPDATED DATETIME NULL," //
257 					+ "STATUS varchar(64) NULL," //
258 					+ "SUCCESS BOOLEAN NULL," //
259 					+ "ITEMCOUNT BIGINT NULL," //
260 					+ "ITEM varchar(256) NULL," //
261 					+ "CURRENT_TASK varchar(256) NULL," //
262 					+ "PROGRESS Float NULL," //
263 					+ "REJECT CLOB NULL," //
264 					+ "LAST_MSG varchar(1024) NULL," //
265 					+ "PRIMARY KEY (UUID_BATCH)" + ")  ";
266 
267 		default:
268 			return null;
269 		}
270 	}
271 
272 	@Transactional(propagation = Propagation.REQUIRES_NEW)
273 	public BdBatch save(BdBatch bdBatch) {
274 		Object[] parameters = new Object[] { bdBatch.getUuid(),
275 				bdBatch.getGroup(), bdBatch.getName(), bdBatch.getStartDate(),
276 				bdBatch.getStatus() };
277 		logger.debug("PARAMETERS UUID BATCH:{} NAME: {} GROUP: {}",
278 				bdBatch.getUuid(), bdBatch.getName(), bdBatch.getGroup());
279 		int result = this.jdbcTemplate.update(getSql(BATCH_INSERT), parameters);
280 		logger.debug("{} lines inserted.", result);
281 		return bdBatch;
282 	}
283 
284 	@Transactional(propagation = Propagation.REQUIRES_NEW)
285 	public void update(BdBatch bdBatch) {
286 		logger.debug("Batch {} update ", bdBatch.getUuid());
287 		Object[] parameters = new Object[] { bdBatch.getCurrentItem(),
288 				bdBatch.getCurrentTask(), bdBatch.getEndDate(),
289 				bdBatch.getGroup(), bdBatch.getItemCount(),
290 				bdBatch.getLastMessage(), bdBatch.getLastUpdate(),
291 				bdBatch.getName(), bdBatch.getProgress(), bdBatch.getReject(),
292 				bdBatch.getStatus(), bdBatch.getSuccess(), bdBatch.getUuid() };
293 		this.jdbcTemplate.update(getSql(BATCH_UPDATE), parameters);
294 	}
295 
296 	@Transactional(readOnly = true)
297 	public List<BdBatch> fetch(String group, String name, int max) {
298 		return fetchBdBatch(group, name, max, new String[] {
299 				IBatch.STATUS_SUCCESS, IBatch.STATUS_FAILURE });
300 	}
301 
302 	private List<BdBatch> resultSet2Batches(SqlRowSet srs) {
303 		List<BdBatch> results = new ArrayList<BdBatch>();
304 
305 		while (srs.next()) {
306 			BdBatch bdBatch;
307 			try {
308 				bdBatch = mappinpBdbatch(srs);
309 				results.add(bdBatch);
310 			} catch (SQLException e) {
311 				throw new RuntimeException(e);
312 			} catch (IOException e) {
313 				throw new RuntimeException(e);
314 			}
315 		}
316 
317 		return results;
318 	}
319 
320 	/**
321 	 * Replace %s by values passed as parameter
322 	 * 
323 	 * @param values
324 	 * @return String
325 	 */
326 	private String insertParametersFromList(String sql, String[] values) {
327 		for (int i = 0; i < values.length; ++i) {
328 			values[i] = String.format("'%s'", values[i]);
329 		}
330 
331 		return String.format(sql, StringUtils.join(values, ","));
332 	}
333 
334 	/**
335 	 * Read batch object from result set.
336 	 * 
337 	 * @param srs
338 	 * @return
339 	 * @throws IOException
340 	 * @throws SQLException
341 	 */
342 	private BdBatch mappinpBdbatch(SqlRowSet srs) throws SQLException,
343 			IOException {
344 		BdBatch bdBatch = new BdBatch();
345 		bdBatch.setUuid(srs.getString("UUID_BATCH"));
346 		bdBatch.setCurrentItem(srs.getString("ITEM"));
347 		bdBatch.setEndDate(srs.getDate("END_DATE"));
348 		bdBatch.setGroup(srs.getString("GROUP_BATCH"));
349 		bdBatch.setItemCount(srs.getLong("ITEMCOUNT"));
350 		bdBatch.setLastMessage(srs.getString("LAST_MSG"));
351 		bdBatch.setLastUpdate(srs.getDate("UPDATED"));
352 		bdBatch.setName(srs.getString("NAME_BATCH"));
353 		bdBatch.setProgress(srs.getFloat("PROGRESS"));
354 		bdBatch.setStartDate(srs.getDate("START_DATE"));
355 		bdBatch.setStatus(srs.getString("STATUS"));
356 		bdBatch.setSuccess(srs.getBoolean("SUCCESS"));
357 
358 		// Clob
359 		Clob reject = (Clob) srs.getObject("REJECT");
360 		bdBatch.setReject(clobToString(reject));
361 
362 		return bdBatch;
363 	}
364 
365 	private String clobToString(Clob clob) throws SQLException, IOException {
366 		if (clob == null)
367 			return null;
368 
369 		InputStream in = clob.getAsciiStream();
370 		Reader read = new InputStreamReader(in);
371 		StringWriter w = new StringWriter();
372 
373 		int c = -1;
374 		while ((c = read.read()) != -1) {
375 			w.write(c);
376 		}
377 		w.flush();
378 		return StringUtils.trim(w.toString());
379 	}
380 
381 	public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
382 		this.jdbcTemplate = jdbcTemplate;
383 	}
384 
385 	public void setTableName(String tableName) {
386 		this.tableName = tableName;
387 	}
388 }