C++ API Reference for Intel® Data Analytics Acceleration Library 2018 Update 3

odbc_data_source.h
1 /* file: odbc_data_source.h */
2 /*******************************************************************************
3 * Copyright 2014-2018 Intel Corporation.
4 *
5 * This software and the related documents are Intel copyrighted materials, and
6 * your use of them is governed by the express license under which they were
7 * provided to you (License). Unless the License provides otherwise, you may not
8 * use, modify, copy, publish, distribute, disclose or transmit this software or
9 * the related documents without Intel's prior written permission.
10 *
11 * This software and the related documents are provided as is, with no express
12 * or implied warranties, other than those that are expressly stated in the
13 * License.
14 *******************************************************************************/
15 
16 /*
17 //++
18 // Implementation of the ODBC data source class
19 //--
20 */
21 #ifndef __ODBC_DATA_SOURCE_H__
22 #define __ODBC_DATA_SOURCE_H__
23 
24 #include <sstream>
25 #include <fstream>
26 #include "services/daal_memory.h"
27 #include "data_management/data_source/data_source.h"
28 #include "data_management/data/data_dictionary.h"
29 #include "data_management/data/numeric_table.h"
30 #include "data_management/data/homogen_numeric_table.h"
31 #include <sql.h>
32 #include <sqltypes.h>
33 #include <sqlext.h>
34 
35 #include "mysql_feature_manager.h"
36 
37 namespace daal
38 {
39 namespace data_management
40 {
41 
45 namespace interface1
46 {
58 template<typename _featureManager, typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
59 class ODBCDataSource : public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
60 {
61 public:
62  typedef _featureManager FeatureManager;
63 
64  using DataSourceIface::NumericTableAllocationFlag;
65  using DataSourceIface::DictionaryCreationFlag;
66  using DataSourceIface::DataSourceStatus;
67 
68  using DataSource::checkDictionary;
69  using DataSource::checkNumericTable;
70  using DataSource::freeNumericTable;
71  using DataSource::_dict;
72  using DataSource::_initialMaxRows;
73 
74 protected:
75  typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
76 
77  FeatureManager featureManager;
78 
79 public:
95  ODBCDataSource(const std::string &dbname, const std::string &tablename, const std::string &username = "",
96  const std::string &password = "",
97  DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
98  DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
99  size_t initialMaxRows = 10) :
100  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
101  _dbname(dbname), _username(username), _password(password), _tablename(tablename),
102  _idx_last_read(0), _hdlDbc(SQL_NULL_HDBC), _hdlEnv(SQL_NULL_HENV)
103  {
104  _query = "SELECT * FROM " + _tablename;
105  _connectionStatus = DataSource::notReady;
106  _initialMaxRows = initialMaxRows;
107  }
108 
110  ~ODBCDataSource()
111  {
112  _freeHandles();
113  }
114 
118  void freeHandles()
119  {
120  SQLRETURN ret = _freeHandles();
121  if (!SQL_SUCCEEDED(ret))
122  {
123  this->_status.add(services::ErrorODBC);
124  }
125  }
126 
127  virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
128  {
129  services::Status s = checkDictionary();
130  if(!s) { return 0; }
131 
132  s = checkNumericTable();
133  if(!s) { return 0; }
134 
135  return loadDataBlock(maxRows, this->DataSource::_spnt.get());
136  }
137 
144  virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
145  {
146  services::Status s = checkDictionary();
147  if(!s) { return 0; }
148 
149  if( nt == NULL ) { this->_status.add(services::throwIfPossible(services::ErrorNullInputNumericTable)); return 0; }
150 
151  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( maxRows, nt );
152 
153  if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
154  {
155  if(nt->getNumberOfRows() < maxRows)
156  {
157  this->_status.add(services::throwIfPossible(services::ErrorIncorrectNumberOfObservations));
158  return 0;
159  }
160  if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
161  {
162  this->_status.add(services::throwIfPossible(services::ErrorIncorrectNumberOfFeatures));
163  return 0;
164  }
165  }
166 
167  std::string query_exec = featureManager.setLimitQuery(_query, _idx_last_read, maxRows);
168  SQLRETURN ret;
169  ret = _establishHandles();
170  if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorHandlesSQL)); return 0; }
171 
172  SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
173  ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
174  if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorSQLstmtHandle)); return 0; }
175 
176  ret = SQLExecDirect(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
177  if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorODBC)); return 0; }
178 
179  DataSourceIface::DataSourceStatus dataSourceStatus;
180 
181  dataSourceStatus = featureManager.statementResultsNumericTable(hdlStmt, nt, maxRows);
182 
183  size_t nRead = nt->getNumberOfRows();
184  _idx_last_read += nRead;
185 
186  if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
187  nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
188  nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
189  nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
190  {
191  for(size_t i = 0; i < nRead; i++)
192  {
193  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
194  }
195  }
196 
197  ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
198  if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorSQLstmtHandle)); return 0; }
199 
200  if (dataSourceStatus == DataSource::endOfData) { _connectionStatus = DataSource::endOfData; }
201 
202  NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
203  size_t nFeatures = _dict->getNumberOfFeatures();
204  ntDict->setNumberOfFeatures(nFeatures);
205  for (size_t i = 0; i < nFeatures; i++)
206  {
207  ntDict->setFeature((*_dict)[i].ntFeature, i);
208  }
209 
210  return nRead;
211  }
212 
213  size_t loadDataBlock() DAAL_C11_OVERRIDE
214  {
215  services::Status s = checkDictionary();
216  if(!s) { return 0; }
217 
218  s = checkNumericTable();
219  if(!s) { return 0; }
220 
221  return loadDataBlock(this->DataSource::_spnt.get());
222  }
223 
224  size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
225  {
226  services::Status s = checkDictionary();
227  if(!s) { return 0; }
228 
229  if( nt == NULL ) {this->_status.add(services::throwIfPossible(services::ErrorNullInputNumericTable)); return 0; }
230 
231  size_t maxRows = (_initialMaxRows > 0 ? _initialMaxRows : 10);
232  size_t nrows = 0;
233  size_t ncols = _dict->getNumberOfFeatures();
234 
235  DataCollection tables;
236 
237  for( ; ; )
238  {
239  NumericTablePtr ntCurrent = HomogenNumericTable<DAAL_DATA_TYPE>::create(ncols, maxRows, NumericTableIface::doAllocate, &s);
240  if (!s)
241  {
242  this->_status.add(services::throwIfPossible(services::ErrorNumericTableNotAllocated));
243  break;
244  }
245  tables.push_back(ntCurrent);
246  size_t rows = loadDataBlock(maxRows, ntCurrent.get());
247  nrows += rows;
248  if (rows < maxRows) { break; }
249  maxRows *= 2;
250  }
251 
252  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nrows, nt );
253  nt->setNormalizationFlag(NumericTable::nonNormalized);
254 
255  BlockDescriptor<DAAL_DATA_TYPE> blockCurrent, block;
256 
257  size_t pos = 0;
258 
259  for (size_t i = 0; i < tables.size(); i++) {
260  NumericTable *ntCurrent = (NumericTable*)(tables[i].get());
261  size_t rows = ntCurrent->getNumberOfRows();
262 
263  if (rows == 0) { continue; }
264 
265  ntCurrent->getBlockOfRows(0, rows, readOnly, blockCurrent);
266  nt->getBlockOfRows(pos, rows, writeOnly, block);
267 
268  services::daal_memcpy_s(block.getBlockPtr(), rows * ncols * sizeof(DAAL_DATA_TYPE), blockCurrent.getBlockPtr(), rows * ncols * sizeof(DAAL_DATA_TYPE));
269 
270  ntCurrent->releaseBlockOfRows(blockCurrent);
271  nt->releaseBlockOfRows(block);
272 
273  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::combineStatistics( ntCurrent, nt, pos == 0);
274  pos += rows;
275  }
276 
277  NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
278  size_t nFeatures = _dict->getNumberOfFeatures();
279  ntDict->setNumberOfFeatures(nFeatures);
280  for (size_t i = 0; i < nFeatures; i++)
281  {
282  ntDict->setFeature((*_dict)[i].ntFeature, i);
283  }
284 
285  return nrows;
286  }
287 
288  services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
289  {
290  if (_dict) { return services::throwIfPossible(services::Status(services::ErrorDictionaryAlreadyAvailable)); }
291 
292  std::string query_exec = _query + ";";
293 
294  SQLRETURN ret;
295  ret = _establishHandles();
296  if (!SQL_SUCCEEDED(ret)) { return services::throwIfPossible(services::Status(services::ErrorHandlesSQL)); }
297 
298  SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
299  ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
300  if (!SQL_SUCCEEDED(ret)) { return services::throwIfPossible(services::Status(services::ErrorSQLstmtHandle)); }
301 
302  ret = SQLPrepare(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
303  if (!SQL_SUCCEEDED(ret))
304  {
305  return services::throwIfPossible(services::Status(services::ErrorODBC));
306  }
307 
308  services::Status status;
309  _dict = DataSourceDictionary::create(&status);
310  if (!status) return status;
311 
312  featureManager.createDictionary(hdlStmt, this->_dict.get());
313 
314  ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
315  if (!SQL_SUCCEEDED(ret)) { return services::throwIfPossible(services::Status(services::ErrorSQLstmtHandle)); }
316 
317  if (featureManager.getErrors()->size() == 0) { _connectionStatus = DataSource::readyForLoad; }
318  return status;
319  }
320 
321  DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
322  {
323  return _connectionStatus;
324  }
325 
326  size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
327  {
328  SQLLEN nRows = 0;
329  SQLRETURN ret;
330 
331  ret = _establishHandles();
332  if (!SQL_SUCCEEDED(ret)) { return 0; }
333 
334  SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
335  ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
336  if (!SQL_SUCCEEDED(ret)) { return 0; }
337 
338  std::string query_exec = "SELECT COUNT(*) FROM " + _tablename + " ;";
339  ret = SQLExecDirect(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
340  if (!SQL_SUCCEEDED(ret)) { return 0; }
341 
342  ret = SQLBindCol(hdlStmt, 1, SQL_C_ULONG, (SQLPOINTER)&nRows, 0, NULL);
343  if (!SQL_SUCCEEDED(ret)) { return 0; }
344 
345  ret = SQLFetchScroll(hdlStmt, SQL_FETCH_NEXT, 1);
346  if (!SQL_SUCCEEDED(ret)) { return 0; }
347 
348  ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
349  if (!SQL_SUCCEEDED(ret)) { return ret; }
350 
351  return nRows;
352  }
353 
354  FeatureManager &getFeatureManager()
355  {
356  return featureManager;
357  }
358 
359 private:
360  std::string _dbname;
361  std::string _username;
362  std::string _password;
363  std::string _tablename;
364  std::string _query;
365  size_t _idx_last_read;
366  DataSourceIface::DataSourceStatus _connectionStatus;
367 
368  SQLHENV _hdlEnv;
369  SQLHDBC _hdlDbc;
370 
371  SQLRETURN _establishHandles()
372  {
373  if (_hdlEnv == SQL_NULL_HENV || _hdlDbc == SQL_NULL_HDBC)
374  {
375  SQLRETURN ret;
376  ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &_hdlEnv);
377  if (!SQL_SUCCEEDED(ret)) { return ret; }
378 
379  ret = SQLSetEnvAttr(_hdlEnv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
380  if (!SQL_SUCCEEDED(ret)) { return ret; }
381 
382  ret = SQLAllocHandle(SQL_HANDLE_DBC, _hdlEnv, &_hdlDbc);
383  if (!SQL_SUCCEEDED(ret)) { return ret; }
384 
385  SQLCHAR *usernamePtr;
386  SQLCHAR *passwordPtr;
387  SQLSMALLINT usernameLen;
388  SQLSMALLINT passwordLen;
389  if (_username.empty())
390  {
391  usernamePtr = NULL;
392  usernameLen = 0;
393  }
394  else
395  {
396  usernamePtr = (SQLCHAR *)_username.c_str();
397  usernameLen = SQL_NTS;
398  }
399  if (_password.empty())
400  {
401  passwordPtr = NULL;
402  passwordLen = 0;
403  }
404  else
405  {
406  passwordPtr = (SQLCHAR *)_password.c_str();
407  passwordLen = SQL_NTS;
408  }
409  ret = SQLConnect(_hdlDbc, (SQLCHAR *)_dbname.c_str(), SQL_NTS, usernamePtr, usernameLen, passwordPtr, passwordLen);
410  if (!SQL_SUCCEEDED(ret)) { return ret; }
411  }
412  return SQL_SUCCESS;
413  }
414 
415  SQLRETURN _freeHandles()
416  {
417  if (_hdlDbc == SQL_NULL_HDBC || _hdlEnv == SQL_NULL_HENV) { return SQL_SUCCESS; }
418 
419  SQLRETURN ret;
420 
421  ret = SQLDisconnect(_hdlDbc);
422  if (!SQL_SUCCEEDED(ret)) { return ret; }
423 
424  ret = SQLFreeHandle(SQL_HANDLE_DBC, _hdlDbc);
425  if (!SQL_SUCCEEDED(ret)) { return ret; }
426 
427  ret = SQLFreeHandle(SQL_HANDLE_ENV, _hdlEnv);
428  if (!SQL_SUCCEEDED(ret)) { return ret; }
429 
430  _hdlDbc = SQL_NULL_HDBC;
431  _hdlEnv = SQL_NULL_HENV;
432 
433  return SQL_SUCCESS;
434  }
435 };
437 } // namespace interface1
438 using interface1::ODBCDataSource;
439 
440 }
441 }
442 #endif
daal::data_management::interface1::ODBCDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:213
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:359
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:151
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:81
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:69
daal
Definition: algorithm_base_common.h:31
daal::data_management::interface1::ODBCDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:288
daal::services::ErrorNumericTableNotAllocated
Definition: error_indexes.h:155
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:309
daal::data_management::interface1::DataCollection
Class that provides functionality of Collection container for objects derived from SerializationIface...
Definition: data_collection.h:45
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:632
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:69
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:81
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:651
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::DataCollection::push_back
DataCollection & push_back(const SerializationIfacePtr &x)
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:275
daal::services::ErrorHandlesSQL
Definition: error_indexes.h:375
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:70
daal::data_management::interface1::ODBCDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:127
daal::data_management::interface1::ODBCDataSource::ODBCDataSource
ODBCDataSource(const std::string &dbname, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: odbc_data_source.h:95
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:463
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:296
daal::services::daal_memcpy_s
DAAL_EXPORT void daal_memcpy_s(void *dest, size_t numberOfElements, const void *src, size_t count)
daal::data_management::interface1::ODBCDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: odbc_data_source.h:144
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:345
daal::data_management::interface1::ODBCDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:321
daal::data_management::interface1::NumericTableIface::nonNormalized
Definition: numeric_table.h:317
daal::data_management::interface1::HomogenNumericTable::create
static services::SharedPtr< HomogenNumericTable< DataType > > create(NumericTableDictionaryPtr ddictForHomogenNumericTable, services::Status *stat=NULL)
Definition: homogen_numeric_table.h:91
daal::data_management::interface1::ODBCDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:326
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:574
daal::data_management::interface1::ODBCDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:224
daal::services::ErrorSQLstmtHandle
Definition: error_indexes.h:377
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:57
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:297
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:59
daal::data_management::interface1::DataCollection::size
size_t size() const
daal::data_management::interface1::NumericTableIface::doAllocate
Definition: numeric_table.h:287
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:660
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:71
daal::data_management::interface1::BlockDescriptor< DAAL_DATA_TYPE >
daal::data_management::interface1::ODBCDataSource
Connects to data sources with the ODBC API.
Definition: odbc_data_source.h:59
daal::data_management::interface1::DataSourceIface::endOfData
Definition: data_source.h:61
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:71
daal::services::ErrorODBC
Definition: error_indexes.h:376
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:79
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:298
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:183
daal::data_management::interface1::ODBCDataSource::freeHandles
void freeHandles()
Definition: odbc_data_source.h:118
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:299
daal::data_management::interface1::DataSourceIface::notReady
Definition: data_source.h:62
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:719
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0

For more complete information about compiler optimizations, see our Optimization Notice.