21 #ifndef __ODBC_DATA_SOURCE_H__
22 #define __ODBC_DATA_SOURCE_H__
26 #include "services/daal_memory.h"
27 #include "data_management/data_source/data_source.h"
28 #include "data_management/data/data_dictionary.h"
29 #include "data_management/data/numeric_table.h"
30 #include "data_management/data/homogen_numeric_table.h"
35 #include "mysql_feature_manager.h"
39 namespace data_management
58 template<
typename _featureManager,
typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
59 class ODBCDataSource :
public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
62 typedef _featureManager FeatureManager;
64 using DataSourceIface::NumericTableAllocationFlag;
65 using DataSourceIface::DictionaryCreationFlag;
66 using DataSourceIface::DataSourceStatus;
68 using DataSource::checkDictionary;
69 using DataSource::checkNumericTable;
70 using DataSource::freeNumericTable;
71 using DataSource::_dict;
72 using DataSource::_initialMaxRows;
75 typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
77 FeatureManager featureManager;
95 ODBCDataSource(
const std::string &dbname,
const std::string &tablename,
const std::string &username =
"",
96 const std::string &password =
"",
97 DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
98 DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
99 size_t initialMaxRows = 10) :
100 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
101 _dbname(dbname), _username(username), _password(password), _tablename(tablename),
102 _idx_last_read(0), _hdlDbc(SQL_NULL_HDBC), _hdlEnv(SQL_NULL_HENV)
104 _query =
"SELECT * FROM " + _tablename;
105 _connectionStatus = DataSource::notReady;
106 _initialMaxRows = initialMaxRows;
120 SQLRETURN ret = _freeHandles();
121 if (!SQL_SUCCEEDED(ret))
123 this->_status.add(services::ErrorODBC);
127 virtual size_t loadDataBlock(
size_t maxRows) DAAL_C11_OVERRIDE
129 services::Status s = checkDictionary();
132 s = checkNumericTable();
135 return loadDataBlock(maxRows, this->DataSource::_spnt.
get());
144 virtual size_t loadDataBlock(
size_t maxRows, NumericTable *nt)
146 services::Status s = checkDictionary();
149 if( nt == NULL ) { this->_status.add(services::throwIfPossible(services::ErrorNullInputNumericTable));
return 0; }
151 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( maxRows, nt );
153 if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
155 if(nt->getNumberOfRows() < maxRows)
157 this->_status.add(services::throwIfPossible(services::ErrorIncorrectNumberOfObservations));
160 if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
162 this->_status.add(services::throwIfPossible(services::ErrorIncorrectNumberOfFeatures));
167 std::string query_exec = featureManager.setLimitQuery(_query, _idx_last_read, maxRows);
169 ret = _establishHandles();
170 if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorHandlesSQL));
return 0; }
172 SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
173 ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
174 if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorSQLstmtHandle));
return 0; }
176 ret = SQLExecDirect(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
177 if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorODBC));
return 0; }
179 DataSourceIface::DataSourceStatus dataSourceStatus;
181 dataSourceStatus = featureManager.statementResultsNumericTable(hdlStmt, nt, maxRows);
183 size_t nRead = nt->getNumberOfRows();
184 _idx_last_read += nRead;
186 if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
187 nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
188 nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
189 nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
191 for(
size_t i = 0; i < nRead; i++)
193 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
197 ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
198 if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorSQLstmtHandle));
return 0; }
200 if (dataSourceStatus == DataSource::endOfData) { _connectionStatus = DataSource::endOfData; }
202 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
203 size_t nFeatures = _dict->getNumberOfFeatures();
204 ntDict->setNumberOfFeatures(nFeatures);
205 for (
size_t i = 0; i < nFeatures; i++)
207 ntDict->setFeature((*_dict)[i].ntFeature, i);
213 size_t loadDataBlock() DAAL_C11_OVERRIDE
215 services::Status s = checkDictionary();
218 s = checkNumericTable();
221 return loadDataBlock(this->DataSource::_spnt.
get());
224 size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
226 services::Status s = checkDictionary();
229 if( nt == NULL ) {this->_status.add(services::throwIfPossible(services::ErrorNullInputNumericTable));
return 0; }
231 size_t maxRows = (_initialMaxRows > 0 ? _initialMaxRows : 10);
233 size_t ncols = _dict->getNumberOfFeatures();
235 DataCollection tables;
239 NumericTablePtr ntCurrent = HomogenNumericTable<DAAL_DATA_TYPE>::create(ncols, maxRows, NumericTableIface::doAllocate, &s);
242 this->_status.add(services::throwIfPossible(services::ErrorNumericTableNotAllocated));
245 tables.push_back(ntCurrent);
246 size_t rows = loadDataBlock(maxRows, ntCurrent.get());
248 if (rows < maxRows) {
break; }
252 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nrows, nt );
253 nt->setNormalizationFlag(NumericTable::nonNormalized);
255 BlockDescriptor<DAAL_DATA_TYPE> blockCurrent, block;
259 for (
size_t i = 0; i < tables.size(); i++) {
260 NumericTable *ntCurrent = (NumericTable*)(tables[i].
get());
261 size_t rows = ntCurrent->getNumberOfRows();
263 if (rows == 0) {
continue; }
265 ntCurrent->getBlockOfRows(0, rows, readOnly, blockCurrent);
266 nt->getBlockOfRows(pos, rows, writeOnly, block);
268 services::daal_memcpy_s(block.getBlockPtr(), rows * ncols *
sizeof(DAAL_DATA_TYPE), blockCurrent.getBlockPtr(), rows * ncols *
sizeof(DAAL_DATA_TYPE));
270 ntCurrent->releaseBlockOfRows(blockCurrent);
271 nt->releaseBlockOfRows(block);
273 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::combineStatistics( ntCurrent, nt, pos == 0);
277 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
278 size_t nFeatures = _dict->getNumberOfFeatures();
279 ntDict->setNumberOfFeatures(nFeatures);
280 for (
size_t i = 0; i < nFeatures; i++)
282 ntDict->setFeature((*_dict)[i].ntFeature, i);
288 services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
290 if (_dict) {
return services::throwIfPossible(services::Status(services::ErrorDictionaryAlreadyAvailable)); }
292 std::string query_exec = _query +
";";
295 ret = _establishHandles();
296 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::Status(services::ErrorHandlesSQL)); }
298 SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
299 ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
300 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::Status(services::ErrorSQLstmtHandle)); }
302 ret = SQLPrepare(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
303 if (!SQL_SUCCEEDED(ret))
305 return services::throwIfPossible(services::Status(services::ErrorODBC));
308 services::Status status;
309 _dict = DataSourceDictionary::create(&status);
310 if (!status)
return status;
312 featureManager.createDictionary(hdlStmt, this->_dict.get());
314 ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
315 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::Status(services::ErrorSQLstmtHandle)); }
317 if (featureManager.getErrors()->size() == 0) { _connectionStatus = DataSource::readyForLoad; }
321 DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
323 return _connectionStatus;
326 size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
331 ret = _establishHandles();
332 if (!SQL_SUCCEEDED(ret)) {
return 0; }
334 SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
335 ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
336 if (!SQL_SUCCEEDED(ret)) {
return 0; }
338 std::string query_exec =
"SELECT COUNT(*) FROM " + _tablename +
" ;";
339 ret = SQLExecDirect(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
340 if (!SQL_SUCCEEDED(ret)) {
return 0; }
342 ret = SQLBindCol(hdlStmt, 1, SQL_C_ULONG, (SQLPOINTER)&nRows, 0, NULL);
343 if (!SQL_SUCCEEDED(ret)) {
return 0; }
345 ret = SQLFetchScroll(hdlStmt, SQL_FETCH_NEXT, 1);
346 if (!SQL_SUCCEEDED(ret)) {
return 0; }
348 ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
349 if (!SQL_SUCCEEDED(ret)) {
return ret; }
354 FeatureManager &getFeatureManager()
356 return featureManager;
361 std::string _username;
362 std::string _password;
363 std::string _tablename;
365 size_t _idx_last_read;
366 DataSourceIface::DataSourceStatus _connectionStatus;
371 SQLRETURN _establishHandles()
373 if (_hdlEnv == SQL_NULL_HENV || _hdlDbc == SQL_NULL_HDBC)
376 ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &_hdlEnv);
377 if (!SQL_SUCCEEDED(ret)) {
return ret; }
379 ret = SQLSetEnvAttr(_hdlEnv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
380 if (!SQL_SUCCEEDED(ret)) {
return ret; }
382 ret = SQLAllocHandle(SQL_HANDLE_DBC, _hdlEnv, &_hdlDbc);
383 if (!SQL_SUCCEEDED(ret)) {
return ret; }
385 SQLCHAR *usernamePtr;
386 SQLCHAR *passwordPtr;
387 SQLSMALLINT usernameLen;
388 SQLSMALLINT passwordLen;
389 if (_username.empty())
396 usernamePtr = (SQLCHAR *)_username.c_str();
397 usernameLen = SQL_NTS;
399 if (_password.empty())
406 passwordPtr = (SQLCHAR *)_password.c_str();
407 passwordLen = SQL_NTS;
409 ret = SQLConnect(_hdlDbc, (SQLCHAR *)_dbname.c_str(), SQL_NTS, usernamePtr, usernameLen, passwordPtr, passwordLen);
410 if (!SQL_SUCCEEDED(ret)) {
return ret; }
415 SQLRETURN _freeHandles()
417 if (_hdlDbc == SQL_NULL_HDBC || _hdlEnv == SQL_NULL_HENV) {
return SQL_SUCCESS; }
421 ret = SQLDisconnect(_hdlDbc);
422 if (!SQL_SUCCEEDED(ret)) {
return ret; }
424 ret = SQLFreeHandle(SQL_HANDLE_DBC, _hdlDbc);
425 if (!SQL_SUCCEEDED(ret)) {
return ret; }
427 ret = SQLFreeHandle(SQL_HANDLE_ENV, _hdlEnv);
428 if (!SQL_SUCCEEDED(ret)) {
return ret; }
430 _hdlDbc = SQL_NULL_HDBC;
431 _hdlEnv = SQL_NULL_HENV;
438 using interface1::ODBCDataSource;
daal::data_management::interface1::ODBCDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:213
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:359
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:151
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:81
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:69
daal
Definition: algorithm_base_common.h:31
daal::data_management::interface1::ODBCDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:288
daal::services::ErrorNumericTableNotAllocated
Definition: error_indexes.h:155
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:309
daal::data_management::interface1::DataCollection
Class that provides functionality of Collection container for objects derived from SerializationIface...
Definition: data_collection.h:45
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:632
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:69
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:81
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:651
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::DataCollection::push_back
DataCollection & push_back(const SerializationIfacePtr &x)
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:275
daal::services::ErrorHandlesSQL
Definition: error_indexes.h:375
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:70
daal::data_management::interface1::ODBCDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:127
daal::data_management::interface1::ODBCDataSource::ODBCDataSource
ODBCDataSource(const std::string &dbname, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: odbc_data_source.h:95
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:463
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:296
daal::services::daal_memcpy_s
DAAL_EXPORT void daal_memcpy_s(void *dest, size_t numberOfElements, const void *src, size_t count)
daal::data_management::interface1::ODBCDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: odbc_data_source.h:144
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:345
daal::data_management::interface1::ODBCDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:321
daal::data_management::interface1::NumericTableIface::nonNormalized
Definition: numeric_table.h:317
daal::data_management::interface1::HomogenNumericTable::create
static services::SharedPtr< HomogenNumericTable< DataType > > create(NumericTableDictionaryPtr ddictForHomogenNumericTable, services::Status *stat=NULL)
Definition: homogen_numeric_table.h:91
daal::data_management::interface1::ODBCDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:326
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:574
daal::data_management::interface1::ODBCDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:224
daal::services::ErrorSQLstmtHandle
Definition: error_indexes.h:377
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:57
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:297
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:59
daal::data_management::interface1::DataCollection::size
size_t size() const
daal::data_management::interface1::NumericTableIface::doAllocate
Definition: numeric_table.h:287
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:660
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:71
daal::data_management::interface1::BlockDescriptor< DAAL_DATA_TYPE >
daal::data_management::interface1::ODBCDataSource
Connects to data sources with the ODBC API.
Definition: odbc_data_source.h:59
daal::data_management::interface1::DataSourceIface::endOfData
Definition: data_source.h:61
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:71
daal::services::ErrorODBC
Definition: error_indexes.h:376
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:79
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:298
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:183
daal::data_management::interface1::ODBCDataSource::freeHandles
void freeHandles()
Definition: odbc_data_source.h:118
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:299
daal::data_management::interface1::DataSourceIface::notReady
Definition: data_source.h:62
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:719
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0