C++ API Reference for Intel® Data Analytics Acceleration Library 2019 Update 5

kdb_data_source.h
1 /* file: kdb_data_source.h */
2 /*******************************************************************************
3 * Copyright 2014-2019 Intel Corporation.
4 *
5 * This software and the related documents are Intel copyrighted materials, and
6 * your use of them is governed by the express license under which they were
7 * provided to you (License). Unless the License provides otherwise, you may not
8 * use, modify, copy, publish, distribute, disclose or transmit this software or
9 * the related documents without Intel's prior written permission.
10 *
11 * This software and the related documents are provided as is, with no express
12 * or implied warranties, other than those that are expressly stated in the
13 * License.
14 *******************************************************************************/
15 
16 /*
17 //++
18 // Implementation of the KDB data source class
19 //--
20 */
21 #ifndef __KDB_DATA_SOURCE_H__
22 #define __KDB_DATA_SOURCE_H__
23 
24 #include <sstream>
25 #include <fstream>
26 #include "services/daal_memory.h"
27 #include "data_management/data_source/data_source.h"
28 #include "data_management/data/data_dictionary.h"
29 #include "data_management/data/numeric_table.h"
30 #include "data_management/data/homogen_numeric_table.h"
31 
32 #include <k.h>
33 
34 #include "kdb_feature_manager.h"
35 
36 namespace daal
37 {
38 namespace data_management
39 {
40 
44 namespace interface1
45 {
53 template<typename _featureManager, typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
54 class KDBDataSource : public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
55 {
56 public:
57  typedef _featureManager FeatureManager;
58 
59  using DataSourceIface::NumericTableAllocationFlag;
60  using DataSourceIface::DictionaryCreationFlag;
61  using DataSourceIface::DataSourceStatus;
62 
63  using DataSource::checkDictionary;
64  using DataSource::checkNumericTable;
65  using DataSource::freeNumericTable;
66  using DataSource::_dict;
67  using DataSource::_initialMaxRows;
68 
69 protected:
70  typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
71 
72  FeatureManager featureManager;
73 
74 public:
91  KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username = "",
92  const std::string &password = "",
93  DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
94  DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
95  size_t initialMaxRows = 10) :
96  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
97  _dbname(dbname), _port(port), _username(username), _password(password), _tablename(tablename), _idx_last_read(0)
98  {
99  _query = _tablename;
100  _initialMaxRows = initialMaxRows;
101  }
102 
104  ~KDBDataSource() {}
105 
106  size_t loadDataBlock() DAAL_C11_OVERRIDE
107  {
108  checkDictionary();
109  if( this->_errors->size() != 0 ) { return 0; }
110 
111  checkNumericTable();
112  if( this->_errors->size() != 0 ) { return 0; }
113 
114  return loadDataBlock(0, this->DataSource::_spnt.get());
115  }
116 
117  size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
118  {
119  checkDictionary();
120  if( this->_errors->size() != 0 ) { return 0; }
121 
122  return loadDataBlock(0, nt);
123  }
124 
125  virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
126  {
127  checkDictionary();
128  if( !this->_errors->isEmpty() ) { return 0; }
129 
130  checkNumericTable();
131  if( !this->_errors->isEmpty() ) { return 0; }
132 
133  return loadDataBlock(maxRows, this->DataSource::_spnt.get());
134  }
135 
142  virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
143  {
144  checkDictionary();
145 
146  if( this->_errors->size() != 0 ) { return 0; }
147 
148  if( nt == NULL ) { this->_errors->add(services::ErrorNullInputNumericTable); return 0; }
149 
150  I handle = _kdbConnect();
151 
152  if (handle <= 0) { return 0; }
153 
154  size_t nRows = getNumberOfAvailableRows();
155 
156  if (nRows == 0) {
157  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( 0, nt );
158  _kdbClose(handle);
159  return 0;
160  }
161 
162  if (maxRows != 0 && nRows > maxRows)
163  {
164  nRows = maxRows;
165  }
166 
167  std::ostringstream query;
168  query << "(" << _query << ")[(til " << nRows << ") + " << _idx_last_read << + "]";
169  std::string query_exec = query.str();
170 
171  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
172 
173  _kdbClose(handle);
174 
175  _idx_last_read += nRows;
176 
177  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nRows, nt );
178 
179  if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
180  {
181  if(nt->getNumberOfRows() < nRows)
182  {
183  r0(result);
184  this->_errors->add(services::ErrorIncorrectNumberOfObservations);
185  return 0;
186  }
187  if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
188  {
189  r0(result);
190  this->_errors->add(services::ErrorIncorrectNumberOfFeatures);
191  return 0;
192  }
193  }
194 
195  if (result->t == XT)
196  {
197  K columnData = kK(result->k)[1];
198  featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
199  }
200  else if (result->t == XD)
201  {
202  K columnData = kK(result)[1];
203  featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
204  }
205  else
206  {
207  featureManager.statementResultsNumericTableFromList(result, nt, nRows);
208  }
209  r0(result);
210 
211  if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
212  nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
213  nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
214  nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
215  {
216  for(size_t i = 0; i < nRows; i++)
217  {
218  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
219  }
220  }
221 
222  NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
223  size_t nFeatures = _dict->getNumberOfFeatures();
224  ntDict->setNumberOfFeatures(nFeatures);
225  for (size_t i = 0; i < nFeatures; i++)
226  {
227  ntDict->setFeature((*_dict)[i].ntFeature, i);
228  }
229 
230  return nRows;
231  }
232 
233  services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
234  {
235  if(_dict)
236  return services::Status(services::ErrorDictionaryAlreadyAvailable);
237 
238  I handle = _kdbConnect();
239 
240  std::string query_exec = "(" + _query + ")[til 1]";
241 
242  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
243 
244  if (!result)
245  {
246  _kdbClose(handle);
247  return services::Status(services::ErrorKDBNetworkError);
248  }
249 
250  if (result->t == -128)
251  {
252  r0(result);
253  _kdbClose(handle);
254  return services::Status(services::ErrorKDBServerError);
255  }
256 
257  services::Status status;
258  _dict = DataSourceDictionary::create(&status);
259  if (!status) return status;
260 
261  if (result->t == XT)
262  {
263  featureManager.createDictionaryFromTable(result->k, this->_dict.get());
264  }
265  else if (result->t == XD)
266  {
267  featureManager.createDictionaryFromTable(result, this->_dict.get());
268  }
269  else
270  {
271  featureManager.createDictionaryFromList(kK(result)[0], this->_dict.get());
272  }
273  r0(result);
274 
275  _kdbClose(handle);
276  return status;
277  }
278 
279  DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
280  {
281  return DataSourceIface::readyForLoad;
282  }
283 
284  size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
285  {
286  I handle = _kdbConnect();
287 
288  if (handle <= 0) return 0;
289 
290  std::string query_exec = "count " + _query;
291 
292  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
293 
294  if (result->t != -KJ)
295  {
296  this->_errors->add(services::ErrorKDBWrongTypeOfOutput);
297  r0(result);
298  _kdbClose(handle);
299  return 0;
300  }
301 
302  size_t nRows = result->j;
303 
304  r0(result);
305 
306  _kdbClose(handle);
307 
308  return nRows - _idx_last_read;
309  }
310 
311  FeatureManager &getFeatureManager()
312  {
313  return featureManager;
314  }
315 
316 private:
317  std::string _dbname;
318  size_t _port;
319  std::string _username;
320  std::string _password;
321  std::string _tablename;
322  std::string _query;
323  size_t _idx_last_read;
324 
325  I _kdbConnect()
326  {
327  I handle = khpu(const_cast<char*>(_dbname.c_str ()), _port, const_cast<char*>((_username + ":" + _password).c_str ()));
328 
329  if (handle < 0)
330  {
331  this->_errors->add(services::ErrorKDBNoConnection);
332  return handle;
333  }
334 
335  if (handle == 0)
336  {
337  this->_errors->add(services::ErrorKDBWrongCredentials);
338  return handle;
339  }
340 
341  return handle;
342  }
343 
344  void _kdbClose(I handle)
345  {
346  kclose(handle);
347  }
348 };
349 } // namespace interface1
350 using interface1::KDBDataSource;
351 
352 }
353 }
354 #endif
daal::data_management::interface1::KDBDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:279
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:359
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:152
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:106
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:81
daal
Definition: algorithm_base_common.h:31
daal::data_management::interface1::KDBDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:233
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:309
daal::services::ErrorKDBWrongTypeOfOutput
Definition: error_indexes.h:390
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:633
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:125
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:69
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:81
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:652
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::KDBDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:284
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:275
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:70
daal::services::ErrorKDBServerError
Definition: error_indexes.h:388
daal::data_management::interface1::KDBDataSource
Connects to data sources with the KDB API.
Definition: kdb_data_source.h:54
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:463
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:296
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:345
daal::services::ErrorKDBWrongCredentials
Definition: error_indexes.h:386
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: kdb_data_source.h:142
daal::services::ErrorKDBNoConnection
Definition: error_indexes.h:385
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:575
daal::data_management::interface1::KDBDataSource::KDBDataSource
KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: kdb_data_source.h:91
daal::services::ErrorKDBNetworkError
Definition: error_indexes.h:387
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:57
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:297
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:59
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:661
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:71
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:71
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:79
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:298
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:186
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:299
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:720
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:117

For more complete information about compiler optimizations, see our Optimization Notice.