C++ API Reference for Intel® Data Analytics Acceleration Library 2018 Update 1

kdb_data_source.h
1 /* file: kdb_data_source.h */
2 /*******************************************************************************
3 * Copyright 2014-2017 Intel Corporation
4 * All Rights Reserved.
5 *
6 * If this software was obtained under the Intel Simplified Software License,
7 * the following terms apply:
8 *
9 * The source code, information and material ("Material") contained herein is
10 * owned by Intel Corporation or its suppliers or licensors, and title to such
11 * Material remains with Intel Corporation or its suppliers or licensors. The
12 * Material contains proprietary information of Intel or its suppliers and
13 * licensors. The Material is protected by worldwide copyright laws and treaty
14 * provisions. No part of the Material may be used, copied, reproduced,
15 * modified, published, uploaded, posted, transmitted, distributed or disclosed
16 * in any way without Intel's prior express written permission. No license under
17 * any patent, copyright or other intellectual property rights in the Material
18 * is granted to or conferred upon you, either expressly, by implication,
19 * inducement, estoppel or otherwise. Any license under such intellectual
20 * property rights must be express and approved by Intel in writing.
21 *
22 * Unless otherwise agreed by Intel in writing, you may not remove or alter this
23 * notice or any other notice embedded in Materials by Intel or Intel's
24 * suppliers or licensors in any way.
25 *
26 *
27 * If this software was obtained under the Apache License, Version 2.0 (the
28 * "License"), the following terms apply:
29 *
30 * You may not use this file except in compliance with the License. You may
31 * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
32 *
33 *
34 * Unless required by applicable law or agreed to in writing, software
35 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
36 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
37 *
38 * See the License for the specific language governing permissions and
39 * limitations under the License.
40 *******************************************************************************/
41 
42 /*
43 //++
44 // Implementation of the KDB data source class
45 //--
46 */
47 #ifndef __KDB_DATA_SOURCE_H__
48 #define __KDB_DATA_SOURCE_H__
49 
50 #include <sstream>
51 #include <fstream>
52 #include "services/daal_memory.h"
53 #include "data_management/data_source/data_source.h"
54 #include "data_management/data/data_dictionary.h"
55 #include "data_management/data/numeric_table.h"
56 #include "data_management/data/homogen_numeric_table.h"
57 
58 #include <k.h>
59 
60 #include "kdb_feature_manager.h"
61 
62 namespace daal
63 {
64 namespace data_management
65 {
66 
70 namespace interface1
71 {
79 template<typename _featureManager, typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
80 class KDBDataSource : public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
81 {
82 public:
83  typedef _featureManager FeatureManager;
84 
85  using DataSourceIface::NumericTableAllocationFlag;
86  using DataSourceIface::DictionaryCreationFlag;
87  using DataSourceIface::DataSourceStatus;
88 
89  using DataSource::checkDictionary;
90  using DataSource::checkNumericTable;
91  using DataSource::freeNumericTable;
92  using DataSource::_dict;
93  using DataSource::_initialMaxRows;
94 
95 protected:
96  typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
97 
98  FeatureManager featureManager;
99 
100 public:
117  KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username = "",
118  const std::string &password = "",
119  DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
120  DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
121  size_t initialMaxRows = 10) :
122  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
123  _dbname(dbname), _port(port), _username(username), _password(password), _tablename(tablename), _idx_last_read(0)
124  {
125  _query = _tablename;
126  _initialMaxRows = initialMaxRows;
127  }
128 
130  ~KDBDataSource() {}
131 
132  size_t loadDataBlock() DAAL_C11_OVERRIDE
133  {
134  checkDictionary();
135  if( this->_errors->size() != 0 ) { return 0; }
136 
137  checkNumericTable();
138  if( this->_errors->size() != 0 ) { return 0; }
139 
140  return loadDataBlock(0, this->DataSource::_spnt.get());
141  }
142 
143  size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
144  {
145  checkDictionary();
146  if( this->_errors->size() != 0 ) { return 0; }
147 
148  return loadDataBlock(0, nt);
149  }
150 
151  virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
152  {
153  checkDictionary();
154  if( !this->_errors->isEmpty() ) { return 0; }
155 
156  checkNumericTable();
157  if( !this->_errors->isEmpty() ) { return 0; }
158 
159  return loadDataBlock(maxRows, this->DataSource::_spnt.get());
160  }
161 
168  virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
169  {
170  checkDictionary();
171 
172  if( this->_errors->size() != 0 ) { return 0; }
173 
174  if( nt == NULL ) { this->_errors->add(services::ErrorNullInputNumericTable); return 0; }
175 
176  I handle = _kdbConnect();
177 
178  if (handle <= 0) { return 0; }
179 
180  size_t nRows = getNumberOfAvailableRows();
181 
182  if (nRows == 0) {
183  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( 0, nt );
184  _kdbClose(handle);
185  return 0;
186  }
187 
188  if (maxRows != 0 && nRows > maxRows)
189  {
190  nRows = maxRows;
191  }
192 
193  ostringstream query;
194  query << "(" << _query << ")[(til " << nRows << ") + " << _idx_last_read << + "]";
195  std::string query_exec = query.str();
196 
197  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
198 
199  _kdbClose(handle);
200 
201  _idx_last_read += nRows;
202 
203  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nRows, nt );
204 
205  if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
206  {
207  if(nt->getNumberOfRows() < nRows)
208  {
209  r0(result);
210  this->_errors->add(services::ErrorIncorrectNumberOfObservations);
211  return 0;
212  }
213  if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
214  {
215  r0(result);
216  this->_errors->add(services::ErrorIncorrectNumberOfFeatures);
217  return 0;
218  }
219  }
220 
221  if (result->t == XT)
222  {
223  K columnData = kK(result->k)[1];
224  featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
225  }
226  else if (result->t == XD)
227  {
228  K columnData = kK(result)[1];
229  featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
230  }
231  else
232  {
233  featureManager.statementResultsNumericTableFromList(result, nt, nRows);
234  }
235  r0(result);
236 
237  if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
238  nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
239  nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
240  nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
241  {
242  for(size_t i = 0; i < nRows; i++)
243  {
244  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
245  }
246  }
247 
248  NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
249  size_t nFeatures = _dict->getNumberOfFeatures();
250  ntDict->setNumberOfFeatures(nFeatures);
251  for (size_t i = 0; i < nFeatures; i++)
252  {
253  ntDict->setFeature((*_dict)[i].ntFeature, i);
254  }
255 
256  return nRows;
257  }
258 
259  services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
260  {
261  if(_dict)
262  return services::Status(services::ErrorDictionaryAlreadyAvailable);
263 
264  I handle = _kdbConnect();
265 
266  std::string query_exec = "(" + _query + ")[til 1]";
267 
268  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
269 
270  if (!result)
271  {
272  _kdbClose(handle);
273  return services::Status(services::ErrorKDBNetworkError);
274  }
275 
276  if (result->t == -128)
277  {
278  r0(result);
279  _kdbClose(handle);
280  return services::Status(services::ErrorKDBServerError);
281  }
282 
283  services::Status status;
284  _dict = DataSourceDictionary::create(&status);
285  if (!status) return status;
286 
287  if (result->t == XT)
288  {
289  featureManager.createDictionaryFromTable(result->k, this->_dict.get());
290  }
291  else if (result->t == XD)
292  {
293  featureManager.createDictionaryFromTable(result, this->_dict.get());
294  }
295  else
296  {
297  featureManager.createDictionaryFromList(kK(result)[0], this->_dict.get());
298  }
299  r0(result);
300 
301  _kdbClose(handle);
302  return status;
303  }
304 
305  DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
306  {
307  return DataSourceIface::readyForLoad;
308  }
309 
310  size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
311  {
312  I handle = _kdbConnect();
313 
314  if (handle <= 0) return 0;
315 
316  std::string query_exec = "count " + _query;
317 
318  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
319 
320  if (result->t != -KJ)
321  {
322  this->_errors->add(services::ErrorKDBWrongTypeOfOutput);
323  r0(result);
324  _kdbClose(handle);
325  return 0;
326  }
327 
328  size_t nRows = result->j;
329 
330  r0(result);
331 
332  _kdbClose(handle);
333 
334  return nRows - _idx_last_read;
335  }
336 
337  FeatureManager &getFeatureManager()
338  {
339  return featureManager;
340  }
341 
342 private:
343  std::string _dbname;
344  size_t _port;
345  std::string _username;
346  std::string _password;
347  std::string _tablename;
348  std::string _query;
349  size_t _idx_last_read;
350 
351  I _kdbConnect()
352  {
353  I handle = khpu(const_cast<char*>(_dbname.c_str ()), _port, const_cast<char*>((_username + ":" + _password).c_str ()));
354 
355  if (handle < 0)
356  {
357  this->_errors->add(services::ErrorKDBNoConnection);
358  return handle;
359  }
360 
361  if (handle == 0)
362  {
363  this->_errors->add(services::ErrorKDBWrongCredentials);
364  return handle;
365  }
366 
367  return handle;
368  }
369 
370  void _kdbClose(I handle)
371  {
372  kclose(handle);
373  }
374 };
375 } // namespace interface1
376 using interface1::KDBDataSource;
377 
378 }
379 }
380 #endif
daal::data_management::interface1::KDBDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:305
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:385
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:175
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:132
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:107
daal
Definition: algorithm_base_common.h:57
daal::data_management::interface1::KDBDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:259
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:335
daal::services::ErrorKDBWrongTypeOfOutput
Definition: error_indexes.h:403
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:658
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:151
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:95
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:107
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:677
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::KDBDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:310
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:301
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:96
daal::services::ErrorKDBServerError
Definition: error_indexes.h:401
daal::data_management::interface1::KDBDataSource
Connects to data sources with the KDB API.
Definition: kdb_data_source.h:80
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:108
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:489
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:322
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:371
daal::services::ErrorKDBWrongCredentials
Definition: error_indexes.h:399
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: kdb_data_source.h:168
daal::services::ErrorKDBNoConnection
Definition: error_indexes.h:398
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:600
daal::data_management::interface1::KDBDataSource::KDBDataSource
KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: kdb_data_source.h:117
daal::services::ErrorKDBNetworkError
Definition: error_indexes.h:400
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:83
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:323
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:85
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:686
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:97
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:97
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:105
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:324
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:209
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:325
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:745
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:143

For more complete information about compiler optimizations, see our Optimization Notice.