21 #ifndef __KDB_DATA_SOURCE_H__
22 #define __KDB_DATA_SOURCE_H__
26 #include "services/daal_memory.h"
27 #include "data_management/data_source/data_source.h"
28 #include "data_management/data/data_dictionary.h"
29 #include "data_management/data/numeric_table.h"
30 #include "data_management/data/homogen_numeric_table.h"
34 #include "kdb_feature_manager.h"
38 namespace data_management
53 template<
typename _featureManager,
typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
54 class KDBDataSource :
public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
57 typedef _featureManager FeatureManager;
59 using DataSourceIface::NumericTableAllocationFlag;
60 using DataSourceIface::DictionaryCreationFlag;
61 using DataSourceIface::DataSourceStatus;
63 using DataSource::checkDictionary;
64 using DataSource::checkNumericTable;
65 using DataSource::freeNumericTable;
66 using DataSource::_dict;
67 using DataSource::_initialMaxRows;
70 typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
72 FeatureManager featureManager;
91 KDBDataSource(
const std::string &dbname,
size_t port,
const std::string &tablename,
const std::string &username =
"",
92 const std::string &password =
"",
93 DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
94 DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
95 size_t initialMaxRows = 10) :
96 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
97 _dbname(dbname), _port(port), _username(username), _password(password), _tablename(tablename), _idx_last_read(0)
100 _initialMaxRows = initialMaxRows;
106 size_t loadDataBlock() DAAL_C11_OVERRIDE
109 if( this->_errors->size() != 0 ) {
return 0; }
112 if( this->_errors->size() != 0 ) {
return 0; }
114 return loadDataBlock(0, this->DataSource::_spnt.
get());
117 size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
120 if( this->_errors->size() != 0 ) {
return 0; }
122 return loadDataBlock(0, nt);
125 virtual size_t loadDataBlock(
size_t maxRows) DAAL_C11_OVERRIDE
128 if( !this->_errors->isEmpty() ) {
return 0; }
131 if( !this->_errors->isEmpty() ) {
return 0; }
133 return loadDataBlock(maxRows, this->DataSource::_spnt.
get());
142 virtual size_t loadDataBlock(
size_t maxRows, NumericTable *nt)
146 if( this->_errors->size() != 0 ) {
return 0; }
148 if( nt == NULL ) { this->_errors->add(services::ErrorNullInputNumericTable);
return 0; }
150 I handle = _kdbConnect();
152 if (handle <= 0) {
return 0; }
154 size_t nRows = getNumberOfAvailableRows();
157 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( 0, nt );
162 if (maxRows != 0 && nRows > maxRows)
167 std::ostringstream query;
168 query <<
"(" << _query <<
")[(til " << nRows <<
") + " << _idx_last_read << +
"]";
169 std::string query_exec = query.str();
171 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
175 _idx_last_read += nRows;
177 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nRows, nt );
179 if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
181 if(nt->getNumberOfRows() < nRows)
184 this->_errors->add(services::ErrorIncorrectNumberOfObservations);
187 if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
190 this->_errors->add(services::ErrorIncorrectNumberOfFeatures);
197 K columnData = kK(result->k)[1];
198 featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
200 else if (result->t == XD)
202 K columnData = kK(result)[1];
203 featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
207 featureManager.statementResultsNumericTableFromList(result, nt, nRows);
211 if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
212 nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
213 nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
214 nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
216 for(
size_t i = 0; i < nRows; i++)
218 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
222 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
223 size_t nFeatures = _dict->getNumberOfFeatures();
224 ntDict->setNumberOfFeatures(nFeatures);
225 for (
size_t i = 0; i < nFeatures; i++)
227 ntDict->setFeature((*_dict)[i].ntFeature, i);
233 services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
236 return services::Status(services::ErrorDictionaryAlreadyAvailable);
238 I handle = _kdbConnect();
240 std::string query_exec =
"(" + _query +
")[til 1]";
242 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
247 return services::Status(services::ErrorKDBNetworkError);
250 if (result->t == -128)
254 return services::Status(services::ErrorKDBServerError);
257 services::Status status;
258 _dict = DataSourceDictionary::create(&status);
259 if (!status)
return status;
263 featureManager.createDictionaryFromTable(result->k, this->_dict.get());
265 else if (result->t == XD)
267 featureManager.createDictionaryFromTable(result, this->_dict.get());
271 featureManager.createDictionaryFromList(kK(result)[0], this->_dict.get());
279 DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
281 return DataSourceIface::readyForLoad;
284 size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
286 I handle = _kdbConnect();
288 if (handle <= 0)
return 0;
290 std::string query_exec =
"count " + _query;
292 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
294 if (result->t != -KJ)
296 this->_errors->add(services::ErrorKDBWrongTypeOfOutput);
302 size_t nRows = result->j;
308 return nRows - _idx_last_read;
311 FeatureManager &getFeatureManager()
313 return featureManager;
319 std::string _username;
320 std::string _password;
321 std::string _tablename;
323 size_t _idx_last_read;
327 I handle = khpu(const_cast<char*>(_dbname.c_str ()), _port, const_cast<char*>((_username +
":" + _password).c_str ()));
331 this->_errors->add(services::ErrorKDBNoConnection);
337 this->_errors->add(services::ErrorKDBWrongCredentials);
344 void _kdbClose(I handle)
350 using interface1::KDBDataSource;
daal::data_management::interface1::KDBDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:279
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:359
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:152
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:106
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:81
daal
Definition: algorithm_base_common.h:31
daal::data_management::interface1::KDBDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:233
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:309
daal::services::ErrorKDBWrongTypeOfOutput
Definition: error_indexes.h:390
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:633
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:125
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:69
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:81
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:652
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::KDBDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:284
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:275
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:70
daal::services::ErrorKDBServerError
Definition: error_indexes.h:388
daal::data_management::interface1::KDBDataSource
Connects to data sources with the KDB API.
Definition: kdb_data_source.h:54
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:463
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:296
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:345
daal::services::ErrorKDBWrongCredentials
Definition: error_indexes.h:386
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: kdb_data_source.h:142
daal::services::ErrorKDBNoConnection
Definition: error_indexes.h:385
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:575
daal::data_management::interface1::KDBDataSource::KDBDataSource
KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: kdb_data_source.h:91
daal::services::ErrorKDBNetworkError
Definition: error_indexes.h:387
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:57
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:297
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:59
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:661
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:71
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:71
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:79
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:298
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:186
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:299
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:720
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:117