47 #ifndef __ODBC_DATA_SOURCE_H__ 48 #define __ODBC_DATA_SOURCE_H__ 52 #include "services/daal_memory.h" 53 #include "data_management/data_source/data_source.h" 54 #include "data_management/data/data_dictionary.h" 55 #include "data_management/data/numeric_table.h" 56 #include "data_management/data/homogen_numeric_table.h" 61 #include "mysql_feature_manager.h" 65 namespace data_management
84 template<
typename _featureManager,
typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
85 class ODBCDataSource :
public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
88 typedef _featureManager FeatureManager;
90 using DataSourceIface::NumericTableAllocationFlag;
91 using DataSourceIface::DictionaryCreationFlag;
92 using DataSourceIface::DataSourceStatus;
94 using DataSource::checkDictionary;
95 using DataSource::checkNumericTable;
96 using DataSource::freeNumericTable;
97 using DataSource::_dict;
98 using DataSource::_initialMaxRows;
101 typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
103 FeatureManager featureManager;
121 ODBCDataSource(
const std::string &dbname,
const std::string &tablename,
const std::string &username =
"",
122 const std::string &password =
"",
123 DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
124 DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
125 size_t initialMaxRows = 10) :
126 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
127 _dbname(dbname), _username(username), _password(password), _tablename(tablename),
128 _idx_last_read(0), _hdlDbc(SQL_NULL_HDBC), _hdlEnv(SQL_NULL_HENV)
130 _query =
"SELECT * FROM " + _tablename;
131 _connectionStatus = DataSource::notReady;
132 _initialMaxRows = initialMaxRows;
146 SQLRETURN ret = _freeHandles();
147 if (!SQL_SUCCEEDED(ret))
149 this->_status.add(services::ErrorODBC);
153 virtual size_t loadDataBlock(
size_t maxRows) DAAL_C11_OVERRIDE
155 services::Status s = checkDictionary();
158 s = checkNumericTable();
161 return loadDataBlock(maxRows, this->DataSource::_spnt.
get());
170 virtual size_t loadDataBlock(
size_t maxRows, NumericTable *nt)
172 services::Status s = checkDictionary();
175 if( nt == NULL ) { this->_status.add(services::throwIfPossible(services::ErrorNullInputNumericTable));
return 0; }
177 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( maxRows, nt );
179 if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
181 if(nt->getNumberOfRows() < maxRows)
183 this->_status.add(services::throwIfPossible(services::ErrorIncorrectNumberOfObservations));
186 if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
188 this->_status.add(services::throwIfPossible(services::ErrorIncorrectNumberOfFeatures));
193 std::string query_exec = featureManager.setLimitQuery(_query, _idx_last_read, maxRows);
195 ret = _establishHandles();
196 if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorHandlesSQL));
return 0; }
198 SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
199 ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
200 if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorSQLstmtHandle));
return 0; }
202 ret = SQLExecDirect(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
203 if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorODBC));
return 0; }
205 DataSourceIface::DataSourceStatus dataSourceStatus;
207 dataSourceStatus = featureManager.statementResultsNumericTable(hdlStmt, nt, maxRows);
209 size_t nRead = nt->getNumberOfRows();
210 _idx_last_read += nRead;
212 if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
213 nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
214 nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
215 nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
217 for(
size_t i = 0; i < nRead; i++)
219 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
223 ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
224 if (!SQL_SUCCEEDED(ret)) { this->_status.add(services::throwIfPossible(services::ErrorSQLstmtHandle));
return 0; }
226 if (dataSourceStatus == DataSource::endOfData) { _connectionStatus = DataSource::endOfData; }
228 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
229 size_t nFeatures = _dict->getNumberOfFeatures();
230 ntDict->setNumberOfFeatures(nFeatures);
231 for (
size_t i = 0; i < nFeatures; i++)
233 ntDict->setFeature((*_dict)[i].ntFeature, i);
239 size_t loadDataBlock() DAAL_C11_OVERRIDE
241 services::Status s = checkDictionary();
244 s = checkNumericTable();
247 return loadDataBlock(this->DataSource::_spnt.
get());
250 size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
252 services::Status s = checkDictionary();
255 if( nt == NULL ) {this->_status.add(services::throwIfPossible(services::ErrorNullInputNumericTable));
return 0; }
257 size_t maxRows = (_initialMaxRows > 0 ? _initialMaxRows : 10);
259 size_t ncols = _dict->getNumberOfFeatures();
261 DataCollection tables;
265 NumericTablePtr ntCurrent = HomogenNumericTable<DAAL_DATA_TYPE>::create(ncols, maxRows, NumericTableIface::doAllocate, &s);
268 this->_status.add(services::throwIfPossible(services::ErrorNumericTableNotAllocated));
271 tables.push_back(ntCurrent);
272 size_t rows = loadDataBlock(maxRows, ntCurrent.get());
274 if (rows < maxRows) {
break; }
278 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nrows, nt );
279 nt->setNormalizationFlag(NumericTable::nonNormalized);
281 BlockDescriptor<DAAL_DATA_TYPE> blockCurrent, block;
285 for (
size_t i = 0; i < tables.size(); i++) {
286 NumericTable *ntCurrent = (NumericTable*)(tables[i].
get());
287 size_t rows = ntCurrent->getNumberOfRows();
289 if (rows == 0) {
continue; }
291 ntCurrent->getBlockOfRows(0, rows, readOnly, blockCurrent);
292 nt->getBlockOfRows(pos, rows, writeOnly, block);
294 services::daal_memcpy_s(block.getBlockPtr(), rows * ncols *
sizeof(DAAL_DATA_TYPE), blockCurrent.getBlockPtr(), rows * ncols *
sizeof(DAAL_DATA_TYPE));
296 ntCurrent->releaseBlockOfRows(blockCurrent);
297 nt->releaseBlockOfRows(block);
299 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::combineStatistics( ntCurrent, nt, pos == 0);
303 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
304 size_t nFeatures = _dict->getNumberOfFeatures();
305 ntDict->setNumberOfFeatures(nFeatures);
306 for (
size_t i = 0; i < nFeatures; i++)
308 ntDict->setFeature((*_dict)[i].ntFeature, i);
314 services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
316 if (_dict) {
return services::throwIfPossible(services::Status(services::ErrorDictionaryAlreadyAvailable)); }
318 std::string query_exec = _query +
";";
321 ret = _establishHandles();
322 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::Status(services::ErrorHandlesSQL)); }
324 SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
325 ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
326 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::Status(services::ErrorSQLstmtHandle)); }
328 ret = SQLPrepare(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
329 if (!SQL_SUCCEEDED(ret))
331 return services::throwIfPossible(services::Status(services::ErrorODBC));
334 services::Status status;
335 _dict = DataSourceDictionary::create(&status);
336 if (!status)
return status;
338 featureManager.createDictionary(hdlStmt, this->_dict.get());
340 ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
341 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::Status(services::ErrorSQLstmtHandle)); }
343 if (featureManager.getErrors()->size() == 0) { _connectionStatus = DataSource::readyForLoad; }
347 DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
349 return _connectionStatus;
352 size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
357 ret = _establishHandles();
358 if (!SQL_SUCCEEDED(ret)) {
return 0; }
360 SQLHSTMT hdlStmt = SQL_NULL_HSTMT;
361 ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &hdlStmt);
362 if (!SQL_SUCCEEDED(ret)) {
return 0; }
364 std::string query_exec =
"SELECT COUNT(*) FROM " + _tablename +
" ;";
365 ret = SQLExecDirect(hdlStmt, (SQLCHAR *)query_exec.c_str(), SQL_NTS);
366 if (!SQL_SUCCEEDED(ret)) {
return 0; }
368 ret = SQLBindCol(hdlStmt, 1, SQL_C_ULONG, (SQLPOINTER)&nRows, 0, NULL);
369 if (!SQL_SUCCEEDED(ret)) {
return 0; }
371 ret = SQLFetchScroll(hdlStmt, SQL_FETCH_NEXT, 1);
372 if (!SQL_SUCCEEDED(ret)) {
return 0; }
374 ret = SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
375 if (!SQL_SUCCEEDED(ret)) {
return ret; }
380 FeatureManager &getFeatureManager()
382 return featureManager;
387 std::string _username;
388 std::string _password;
389 std::string _tablename;
391 size_t _idx_last_read;
392 DataSourceIface::DataSourceStatus _connectionStatus;
397 SQLRETURN _establishHandles()
399 if (_hdlEnv == SQL_NULL_HENV || _hdlDbc == SQL_NULL_HDBC)
402 ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &_hdlEnv);
403 if (!SQL_SUCCEEDED(ret)) {
return ret; }
405 ret = SQLSetEnvAttr(_hdlEnv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
406 if (!SQL_SUCCEEDED(ret)) {
return ret; }
408 ret = SQLAllocHandle(SQL_HANDLE_DBC, _hdlEnv, &_hdlDbc);
409 if (!SQL_SUCCEEDED(ret)) {
return ret; }
411 SQLCHAR *usernamePtr;
412 SQLCHAR *passwordPtr;
413 SQLSMALLINT usernameLen;
414 SQLSMALLINT passwordLen;
415 if (_username.empty())
422 usernamePtr = (SQLCHAR *)_username.c_str();
423 usernameLen = SQL_NTS;
425 if (_password.empty())
432 passwordPtr = (SQLCHAR *)_password.c_str();
433 passwordLen = SQL_NTS;
435 ret = SQLConnect(_hdlDbc, (SQLCHAR *)_dbname.c_str(), SQL_NTS, usernamePtr, usernameLen, passwordPtr, passwordLen);
436 if (!SQL_SUCCEEDED(ret)) {
return ret; }
441 SQLRETURN _freeHandles()
443 if (_hdlDbc == SQL_NULL_HDBC || _hdlEnv == SQL_NULL_HENV) {
return SQL_SUCCESS; }
447 ret = SQLDisconnect(_hdlDbc);
448 if (!SQL_SUCCEEDED(ret)) {
return ret; }
450 ret = SQLFreeHandle(SQL_HANDLE_DBC, _hdlDbc);
451 if (!SQL_SUCCEEDED(ret)) {
return ret; }
453 ret = SQLFreeHandle(SQL_HANDLE_ENV, _hdlEnv);
454 if (!SQL_SUCCEEDED(ret)) {
return ret; }
456 _hdlDbc = SQL_NULL_HDBC;
457 _hdlEnv = SQL_NULL_HENV;
464 using interface1::ODBCDataSource;
daal::data_management::interface1::ODBCDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:239
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:385
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:177
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:85
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:107
daal::services::interface1::Status
Class that holds the results of API calls. In case of API routine failure it contains the list of err...
Definition: error_handling.h:491
daal::data_management::interface1::DataSourceIface::endOfData
Definition: data_source.h:87
daal
Definition: algorithm_base_common.h:57
daal::data_management::interface1::ODBCDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:314
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:108
daal::services::ErrorNumericTableNotAllocated
Definition: error_indexes.h:181
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:105
daal::services::interface1::Status::add
Status & add(ErrorID id)
daal::data_management::interface1::DataCollection
Class that provides functionality of Collection container for objects derived from SerializationIface...
Definition: data_collection.h:71
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:658
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:97
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:83
daal::data_management::interface1::DataCollection::push_back
DataCollection & push_back(const SerializationIfacePtr &x)
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:95
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:301
daal::services::ErrorHandlesSQL
Definition: error_indexes.h:399
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:96
daal::data_management::interface1::ODBCDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:153
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:107
daal::data_management::interface1::ODBCDataSource::ODBCDataSource
ODBCDataSource(const std::string &dbname, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: odbc_data_source.h:121
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:489
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:322
daal::services::daal_memcpy_s
DAAL_EXPORT void daal_memcpy_s(void *dest, size_t numberOfElements, const void *src, size_t count)
daal::data_management::interface1::ODBCDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: odbc_data_source.h:170
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:371
daal::data_management::interface1::ODBCDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:347
daal::services::interface1::SharedPtr
Shared pointer that retains shared ownership of an object through a pointer. Several SharedPtr object...
Definition: daal_shared_ptr.h:187
daal::data_management::interface1::DataCollection::size
size_t size() const
daal::data_management::interface1::NumericTableIface::nonNormalized
Definition: numeric_table.h:343
daal::data_management::interface1::HomogenNumericTable::create
static services::SharedPtr< HomogenNumericTable< DataType > > create(NumericTableDictionaryPtr ddictForHomogenNumericTable, services::Status *stat=NULL)
Definition: homogen_numeric_table.h:117
daal::data_management::interface1::ODBCDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:352
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:95
daal::services::interface1::SharedPtr::get
T * get() const
Definition: daal_shared_ptr.h:332
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:600
daal::data_management::interface1::ODBCDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:250
daal::services::ErrorSQLstmtHandle
Definition: error_indexes.h:401
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:323
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:745
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:335
daal::data_management::interface1::NumericTableIface::doAllocate
Definition: numeric_table.h:313
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:686
daal::data_management::interface1::BlockDescriptor< DAAL_DATA_TYPE >
daal::data_management::interface1::ODBCDataSource
Connects to data sources with the ODBC API.
Definition: odbc_data_source.h:85
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:677
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:97
daal::services::ErrorODBC
Definition: error_indexes.h:400
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:324
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:209
daal::data_management::interface1::ODBCDataSource::freeHandles
void freeHandles()
Definition: odbc_data_source.h:144
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:325
daal::data_management::interface1::DataSourceIface::notReady
Definition: data_source.h:88
daal::data_management::interface1::HomogenNumericTable
Class that provides methods to access data stored as a contiguous array of homogeneous feature vector...
Definition: homogen_numeric_table.h:76
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0