47 #ifndef __KDB_DATA_SOURCE_H__
48 #define __KDB_DATA_SOURCE_H__
52 #include "services/daal_memory.h"
53 #include "data_management/data_source/data_source.h"
54 #include "data_management/data/data_dictionary.h"
55 #include "data_management/data/numeric_table.h"
56 #include "data_management/data/homogen_numeric_table.h"
60 #include "kdb_feature_manager.h"
64 namespace data_management
79 template<
typename _featureManager,
typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
80 class KDBDataSource :
public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
83 typedef _featureManager FeatureManager;
85 using DataSourceIface::NumericTableAllocationFlag;
86 using DataSourceIface::DictionaryCreationFlag;
87 using DataSourceIface::DataSourceStatus;
89 using DataSource::checkDictionary;
90 using DataSource::checkNumericTable;
91 using DataSource::freeNumericTable;
92 using DataSource::_dict;
93 using DataSource::_initialMaxRows;
96 typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
98 FeatureManager featureManager;
117 KDBDataSource(
const std::string &dbname,
size_t port,
const std::string &tablename,
const std::string &username =
"",
118 const std::string &password =
"",
119 DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
120 DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
121 size_t initialMaxRows = 10) :
122 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
123 _dbname(dbname), _port(port), _username(username), _password(password), _tablename(tablename), _idx_last_read(0)
126 _initialMaxRows = initialMaxRows;
132 size_t loadDataBlock() DAAL_C11_OVERRIDE
135 if( this->_errors->size() != 0 ) {
return 0; }
138 if( this->_errors->size() != 0 ) {
return 0; }
140 return loadDataBlock(0, this->DataSource::_spnt.
get());
143 size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
146 if( this->_errors->size() != 0 ) {
return 0; }
148 return loadDataBlock(0, nt);
151 virtual size_t loadDataBlock(
size_t maxRows) DAAL_C11_OVERRIDE
154 if( !this->_errors->isEmpty() ) {
return 0; }
157 if( !this->_errors->isEmpty() ) {
return 0; }
159 return loadDataBlock(maxRows, this->DataSource::_spnt.
get());
168 virtual size_t loadDataBlock(
size_t maxRows, NumericTable *nt)
172 if( this->_errors->size() != 0 ) {
return 0; }
174 if( nt == NULL ) { this->_errors->add(services::ErrorNullInputNumericTable);
return 0; }
176 I handle = _kdbConnect();
178 if (handle <= 0) {
return 0; }
180 size_t nRows = getNumberOfAvailableRows();
183 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( 0, nt );
188 if (maxRows != 0 && nRows > maxRows)
194 query <<
"(" << _query <<
")[(til " << nRows <<
") + " << _idx_last_read << +
"]";
195 std::string query_exec = query.str();
197 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
201 _idx_last_read += nRows;
203 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nRows, nt );
205 if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
207 if(nt->getNumberOfRows() < nRows)
210 this->_errors->add(services::ErrorIncorrectNumberOfObservations);
213 if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
216 this->_errors->add(services::ErrorIncorrectNumberOfFeatures);
223 K columnData = kK(result->k)[1];
224 featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
226 else if (result->t == XD)
228 K columnData = kK(result)[1];
229 featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
233 featureManager.statementResultsNumericTableFromList(result, nt, nRows);
237 if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
238 nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
239 nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
240 nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
242 for(
size_t i = 0; i < nRows; i++)
244 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
248 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
249 size_t nFeatures = _dict->getNumberOfFeatures();
250 ntDict->setNumberOfFeatures(nFeatures);
251 for (
size_t i = 0; i < nFeatures; i++)
253 ntDict->setFeature((*_dict)[i].ntFeature, i);
259 services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
262 return services::Status(services::ErrorDictionaryAlreadyAvailable);
264 I handle = _kdbConnect();
266 std::string query_exec =
"(" + _query +
")[til 1]";
268 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
273 return services::Status(services::ErrorKDBNetworkError);
276 if (result->t == -128)
280 return services::Status(services::ErrorKDBServerError);
283 services::Status status;
284 _dict = DataSourceDictionary::create(&status);
285 if (!status)
return status;
289 featureManager.createDictionaryFromTable(result->k, this->_dict.get());
291 else if (result->t == XD)
293 featureManager.createDictionaryFromTable(result, this->_dict.get());
297 featureManager.createDictionaryFromList(kK(result)[0], this->_dict.get());
305 DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
307 return DataSourceIface::readyForLoad;
310 size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
312 I handle = _kdbConnect();
314 if (handle <= 0)
return 0;
316 std::string query_exec =
"count " + _query;
318 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
320 if (result->t != -KJ)
322 this->_errors->add(services::ErrorKDBWrongTypeOfOutput);
328 size_t nRows = result->j;
334 return nRows - _idx_last_read;
337 FeatureManager &getFeatureManager()
339 return featureManager;
345 std::string _username;
346 std::string _password;
347 std::string _tablename;
349 size_t _idx_last_read;
353 I handle = khpu(const_cast<char*>(_dbname.c_str ()), _port, const_cast<char*>((_username +
":" + _password).c_str ()));
357 this->_errors->add(services::ErrorKDBNoConnection);
363 this->_errors->add(services::ErrorKDBWrongCredentials);
370 void _kdbClose(I handle)
376 using interface1::KDBDataSource;
daal::data_management::interface1::KDBDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:305
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:385
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:175
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:132
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:107
daal
Definition: algorithm_base_common.h:57
daal::data_management::interface1::KDBDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:259
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:335
daal::services::ErrorKDBWrongTypeOfOutput
Definition: error_indexes.h:403
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:658
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:151
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:95
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:107
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:677
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::KDBDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:310
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:301
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:96
daal::services::ErrorKDBServerError
Definition: error_indexes.h:401
daal::data_management::interface1::KDBDataSource
Connects to data sources with the KDB API.
Definition: kdb_data_source.h:80
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:108
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:489
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:322
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:371
daal::services::ErrorKDBWrongCredentials
Definition: error_indexes.h:399
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: kdb_data_source.h:168
daal::services::ErrorKDBNoConnection
Definition: error_indexes.h:398
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:600
daal::data_management::interface1::KDBDataSource::KDBDataSource
KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: kdb_data_source.h:117
daal::services::ErrorKDBNetworkError
Definition: error_indexes.h:400
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:83
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:323
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:85
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:686
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:97
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:97
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:105
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:324
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:209
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:325
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:745
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:143