21 #ifndef __ODBC_DATA_SOURCE_H__
22 #define __ODBC_DATA_SOURCE_H__
30 #include "services/daal_memory.h"
31 #include "data_management/data_source/data_source.h"
32 #include "data_management/data/data_dictionary.h"
33 #include "data_management/data/numeric_table.h"
34 #include "data_management/data/homogen_numeric_table.h"
35 #include "data_management/data_source/mysql_feature_manager.h"
36 #include "data_management/data_source/internal/data_source_options.h"
40 namespace data_management
57 class ODBCDataSourceOptions
63 allocateNumericTable = 1 << 0,
64 createDictionaryFromContext = 1 << 1
67 static ODBCDataSourceOptions::Value unite(
const ODBCDataSourceOptions::Value &lhs,
68 const ODBCDataSourceOptions::Value &rhs)
70 return internal::DataSourceOptionsImpl<Value>::unite(lhs, rhs);
73 ODBCDataSourceOptions(Value flags = byDefault) :
76 DataSource::NumericTableAllocationFlag getNumericTableAllocationFlag()
const
78 return (_impl.getFlag(allocateNumericTable))
79 ? DataSource::doAllocateNumericTable
80 : DataSource::notAllocateNumericTable;
83 DataSource::DictionaryCreationFlag getDictionaryCreationFlag()
const
85 return (_impl.getFlag(createDictionaryFromContext))
86 ? DataSource::doDictionaryFromContext
87 : DataSource::notDictionaryFromContext;
91 internal::DataSourceOptionsImpl<Value> _impl;
100 template<
typename FeatureManager,
typename SummaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
101 class ODBCDataSource :
public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, SummaryStatisticsType>
104 typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
105 typedef DataSourceTemplate<DefaultNumericTableType, SummaryStatisticsType> super;
110 using super::_initialMaxRows;
111 using super::_autoNumericTableFlag;
112 using super::_autoDictionaryFlag;
113 using super::_status;
129 ODBCDataSource(
const std::string &dbname,
130 const std::string &tableName =
"",
131 const std::string &userName =
"",
132 const std::string &password =
"",
133 DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
134 DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
135 size_t initialMaxRows = 10) :
136 super(doAllocateNumericTable,
137 doCreateDictionaryFromContext)
139 initialize(initialMaxRows);
140 _status |= connectUsingUserNameAndPassword(dbname, userName, password);
141 if (!_status) {
return; }
142 _status |= executeSelectAllQuery(tableName);
155 ODBCDataSource(
const std::string &dbname,
156 const std::string &tableName,
157 const std::string &userName,
158 const std::string &password,
159 const ODBCDataSourceOptions &options,
160 size_t initialMaxRows = 10) :
161 super(options.getNumericTableAllocationFlag(),
162 options.getDictionaryCreationFlag())
164 initialize(initialMaxRows);
165 _status |= connectUsingUserNameAndPassword(dbname, userName, password);
166 if (!_status) {
return; }
167 _status |= executeSelectAllQuery(tableName);
177 ODBCDataSource(
const std::string &connectionString,
178 const ODBCDataSourceOptions &options,
179 size_t initialMaxRows = 10) :
180 super(options.getNumericTableAllocationFlag(),
181 options.getDictionaryCreationFlag())
183 initialize(initialMaxRows);
184 _status |= connectUsingConnectionString(connectionString);
187 virtual ~ODBCDataSource()
189 freeHandlesInternal();
192 services::Status executeQuery(
const std::string &query)
196 if (_autoNumericTableFlag == DataSource::doAllocateNumericTable)
199 if (_autoDictionaryFlag == DataSource::doDictionaryFromContext)
204 SQLRETURN ret = SQLFreeHandle(SQL_HANDLE_STMT, _hdlStmt);
205 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::ErrorSQLstmtHandle); }
206 _hdlStmt = SQL_NULL_HSTMT;
209 SQLRETURN ret = SQLAllocHandle(SQL_HANDLE_STMT, _hdlDbc, &_hdlStmt);
210 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::ErrorSQLstmtHandle); }
212 ret = SQLExecDirect(_hdlStmt, (SQLCHAR *)query.c_str(), SQL_NTS);
213 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::ErrorODBC); }
215 _connectionStatus = DataSource::readyForLoad;
216 return services::Status();
222 services::Status freeHandles()
224 SQLRETURN ret = freeHandlesInternal();
225 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::ErrorODBC); }
227 _connectionStatus = DataSource::notReady;
228 return services::Status();
231 virtual size_t loadDataBlock(
size_t maxRows) DAAL_C11_OVERRIDE
233 services::Status s = checkConnection();
234 if (!s) {
return 0; }
236 s = super::checkDictionary();
237 if (!s) {
return 0; }
239 s = super::checkNumericTable();
240 if (!s) {
return 0; }
242 return loadDataBlock(maxRows, _spnt.get());
251 virtual size_t loadDataBlock(
size_t maxRows, NumericTable *nt)
253 services::Status s = checkConnection();
256 s = super::checkDictionary();
259 if( nt == NULL ) { this->_status.add(services::throwIfPossible(services::ErrorNullInputNumericTable));
return 0; }
261 super::resizeNumericTableImpl( maxRows, nt );
263 if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
265 if(nt->getNumberOfRows() < maxRows)
267 this->_status.add(services::throwIfPossible(services::ErrorIncorrectNumberOfObservations));
270 if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
272 this->_status.add(services::throwIfPossible(services::ErrorIncorrectNumberOfFeatures));
277 _connectionStatus = _featureManager.statementResultsNumericTable(_hdlStmt, nt, maxRows);
279 size_t nRead = nt->getNumberOfRows();
280 _idxLastRead += nRead;
282 if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
283 nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
284 nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
285 nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
287 for(
size_t i = 0; i < nRead; i++)
289 super::updateStatistics( i, nt );
293 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
294 size_t nFeatures = _dict->getNumberOfFeatures();
295 ntDict->setNumberOfFeatures(nFeatures);
296 for (
size_t i = 0; i < nFeatures; i++)
298 ntDict->setFeature((*_dict)[i].ntFeature, i);
304 size_t loadDataBlock() DAAL_C11_OVERRIDE
308 s = checkConnection();
311 s = super::checkDictionary();
314 s = super::checkNumericTable();
317 return loadDataBlock(_spnt.get());
320 size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
324 s = checkConnection();
327 s = super::checkDictionary();
330 if( nt == NULL ) {this->_status.add(services::throwIfPossible(services::ErrorNullInputNumericTable));
return 0; }
332 size_t maxRows = (_initialMaxRows > 0 ? _initialMaxRows : 10);
334 size_t ncols = _dict->getNumberOfFeatures();
336 DataCollection tables;
340 NumericTablePtr ntCurrent = HomogenNumericTable<DAAL_DATA_TYPE>::create(ncols, maxRows, NumericTableIface::doAllocate, &s);
343 this->_status.add(services::throwIfPossible(services::ErrorNumericTableNotAllocated));
346 tables.push_back(ntCurrent);
347 size_t rows = loadDataBlock(maxRows, ntCurrent.get());
349 if (rows < maxRows) {
break; }
353 super::resizeNumericTableImpl( nrows, nt );
354 nt->setNormalizationFlag(NumericTable::nonNormalized);
356 BlockDescriptor<DAAL_DATA_TYPE> blockCurrent, block;
360 for (
size_t i = 0; i < tables.size(); i++) {
361 NumericTable *ntCurrent = (NumericTable*)(tables[i].
get());
362 size_t rows = ntCurrent->getNumberOfRows();
364 if (rows == 0) {
continue; }
366 ntCurrent->getBlockOfRows(0, rows, readOnly, blockCurrent);
367 nt->getBlockOfRows(pos, rows, writeOnly, block);
369 services::daal_memcpy_s(block.getBlockPtr(), rows * ncols *
sizeof(DAAL_DATA_TYPE),
370 blockCurrent.getBlockPtr(), rows * ncols *
sizeof(DAAL_DATA_TYPE));
372 ntCurrent->releaseBlockOfRows(blockCurrent);
373 nt->releaseBlockOfRows(block);
375 super::combineStatistics( ntCurrent, nt, pos == 0);
379 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
380 size_t nFeatures = _dict->getNumberOfFeatures();
381 ntDict->setNumberOfFeatures(nFeatures);
382 for (
size_t i = 0; i < nFeatures; i++)
384 ntDict->setFeature((*_dict)[i].ntFeature, i);
390 services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
392 services::Status status = checkConnection();
393 DAAL_CHECK_STATUS_VAR(status);
395 _connectionStatus = DataSource::notReady;
398 {
return services::throwIfPossible(services::ErrorDictionaryAlreadyAvailable); }
400 _dict = DataSourceDictionary::create(&status);
401 DAAL_CHECK_STATUS_VAR(status);
403 status |= _featureManager.createDictionary(_hdlStmt, _dict.get());
404 DAAL_CHECK_STATUS_VAR(status);
406 _connectionStatus = DataSource::readyForLoad;
410 DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
412 return _connectionStatus;
415 size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
420 FeatureManager &getFeatureManager()
422 return _featureManager;
426 void initialize(
size_t initialMaxRows)
428 _hdlDbc = SQL_NULL_HDBC;
429 _hdlEnv = SQL_NULL_HENV;
430 _hdlStmt = SQL_NULL_HSTMT;
433 _initialMaxRows = initialMaxRows;
434 _connectionStatus = DataSource::notReady;
437 services::Status connectUsingUserNameAndPassword(
const std::string &dbname,
438 const std::string &username,
439 const std::string &password)
441 SQLRETURN ret = setupHandlesInternal();
442 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::ErrorHandlesSQL); }
444 ret = connectInternal(dbname, username, password);
445 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::ErrorODBC); }
447 return services::Status();
450 services::Status connectUsingConnectionString(
const std::string &connectionString)
452 SQLRETURN ret = setupHandlesInternal();
453 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::ErrorHandlesSQL); }
455 ret = connectDriverInternal(connectionString);
456 if (!SQL_SUCCEEDED(ret)) {
return services::throwIfPossible(services::ErrorODBC); }
458 return services::Status();
461 services::Status executeSelectAllQuery(
const std::string &tableName)
463 if (!tableName.empty())
465 return executeQuery(
"SELECT * FROM " + tableName);
467 return services::Status();
470 SQLRETURN connectInternal(
const std::string &dbname,
471 const std::string &username,
472 const std::string &password)
474 return SQLConnect(_hdlDbc, (SQLCHAR *)dbname.c_str(), (SQLSMALLINT)dbname.size(),
475 (SQLCHAR *)username.c_str(), (SQLSMALLINT)username.size(),
476 (SQLCHAR *)password.c_str(), (SQLSMALLINT)password.size());
479 SQLRETURN connectDriverInternal(
const std::string &connectionString)
481 SQLSMALLINT outConnectionStringLength;
482 return SQLDriverConnect(_hdlDbc, SQL_NULL_HANDLE,
483 (SQLCHAR *)connectionString.c_str(),
484 (SQLSMALLINT)connectionString.size(),
487 &outConnectionStringLength,
488 SQL_DRIVER_NOPROMPT);
491 SQLRETURN setupHandlesInternal()
493 SQLRETURN ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &_hdlEnv);
494 if (!SQL_SUCCEEDED(ret)) {
return ret; }
496 ret = SQLSetEnvAttr(_hdlEnv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
497 if (!SQL_SUCCEEDED(ret)) {
return ret; }
499 ret = SQLAllocHandle(SQL_HANDLE_DBC, _hdlEnv, &_hdlDbc);
500 if (!SQL_SUCCEEDED(ret)) {
return ret; }
505 SQLRETURN freeHandlesInternal()
507 if (_hdlDbc == SQL_NULL_HDBC || _hdlEnv == SQL_NULL_HENV) {
return SQL_SUCCESS; }
509 SQLRETURN ret = SQLDisconnect(_hdlDbc);
510 if (!SQL_SUCCEEDED(ret)) {
return ret; }
512 ret = SQLFreeHandle(SQL_HANDLE_DBC, _hdlDbc);
513 if (!SQL_SUCCEEDED(ret)) {
return ret; }
515 ret = SQLFreeHandle(SQL_HANDLE_ENV, _hdlEnv);
516 if (!SQL_SUCCEEDED(ret)) {
return ret; }
518 _hdlDbc = SQL_NULL_HDBC;
519 _hdlEnv = SQL_NULL_HENV;
524 services::Status checkConnection()
526 if (_connectionStatus == DataSource::notReady)
527 {
return services::throwIfPossible(services::ErrorSourceDataNotAvailable); }
529 return services::Status();
534 FeatureManager _featureManager;
535 DataSourceIface::DataSourceStatus _connectionStatus;
545 using interface1::ODBCDataSource;
546 using interface1::ODBCDataSourceOptions;
548 inline ODBCDataSourceOptions::Value operator |(
const ODBCDataSourceOptions::Value &lhs,
549 const ODBCDataSourceOptions::Value &rhs)
550 {
return ODBCDataSourceOptions::unite(lhs, rhs); }
daal::data_management::interface1::ODBCDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:304
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:359
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:152
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:81
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:69
daal
Definition: algorithm_base_common.h:31
daal::data_management::interface1::ODBCDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:390
daal::data_management::interface1::ODBCDataSource::freeHandles
services::Status freeHandles()
Definition: odbc_data_source.h:222
daal::services::ErrorNumericTableNotAllocated
Definition: error_indexes.h:156
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:309
daal::data_management::interface1::DataCollection
Class that provides functionality of Collection container for objects derived from SerializationIface...
Definition: data_collection.h:45
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:633
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:69
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:81
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:652
daal::data_management::interface1::DataCollection::push_back
DataCollection & push_back(const SerializationIfacePtr &x)
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:275
daal::services::ErrorHandlesSQL
Definition: error_indexes.h:379
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:70
daal::data_management::interface1::ODBCDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:231
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:463
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:296
daal::services::daal_memcpy_s
DAAL_EXPORT void daal_memcpy_s(void *dest, size_t numberOfElements, const void *src, size_t count)
daal::data_management::interface1::ODBCDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: odbc_data_source.h:251
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:345
daal::data_management::interface1::ODBCDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:410
daal::data_management::interface1::NumericTableIface::nonNormalized
Definition: numeric_table.h:317
daal::data_management::interface1::HomogenNumericTable::create
static services::SharedPtr< HomogenNumericTable< DataType > > create(NumericTableDictionaryPtr ddictForHomogenNumericTable, services::Status *stat=NULL)
Definition: homogen_numeric_table.h:93
daal::data_management::interface1::ODBCDataSource::ODBCDataSource
ODBCDataSource(const std::string &dbname, const std::string &tableName="", const std::string &userName="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: odbc_data_source.h:129
daal::data_management::interface1::ODBCDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:415
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:575
daal::data_management::interface1::ODBCDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: odbc_data_source.h:320
daal::data_management::interface1::DataSourceIface::doDictionaryFromContext
Definition: data_source.h:72
daal::services::ErrorSQLstmtHandle
Definition: error_indexes.h:381
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:57
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:297
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:59
daal::data_management::interface1::DataCollection::size
size_t size() const
daal::data_management::interface1::NumericTableIface::doAllocate
Definition: numeric_table.h:287
daal::data_management::interface1::ODBCDataSource::ODBCDataSource
ODBCDataSource(const std::string &connectionString, const ODBCDataSourceOptions &options, size_t initialMaxRows=10)
Definition: odbc_data_source.h:177
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:661
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:71
daal::data_management::interface1::BlockDescriptor< DAAL_DATA_TYPE >
daal::data_management::interface1::ODBCDataSource
Connects to data sources with the ODBC API.
Definition: odbc_data_source.h:101
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:71
daal::services::ErrorODBC
Definition: error_indexes.h:380
daal::data_management::interface1::ODBCDataSourceOptions
Options of ODBC data source.
Definition: odbc_data_source.h:57
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:79
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:298
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:186
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:299
daal::services::ErrorSourceDataNotAvailable
Definition: error_indexes.h:163
daal::data_management::interface1::DataSourceIface::notReady
Definition: data_source.h:62
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:720
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0
daal::data_management::internal::DataSourceOptionsImpl
Class that helps to define data source options.
Definition: data_source_options.h:31
daal::data_management::interface1::ODBCDataSource::ODBCDataSource
ODBCDataSource(const std::string &dbname, const std::string &tableName, const std::string &userName, const std::string &password, const ODBCDataSourceOptions &options, size_t initialMaxRows=10)
Definition: odbc_data_source.h:155