C++ API Reference for Intel® Data Analytics Acceleration Library 2019 Update 4

csv_data_source.h
1 /* file: csv_data_source.h */
2 /*******************************************************************************
3 * Copyright 2014-2019 Intel Corporation.
4 *
5 * This software and the related documents are Intel copyrighted materials, and
6 * your use of them is governed by the express license under which they were
7 * provided to you (License). Unless the License provides otherwise, you may not
8 * use, modify, copy, publish, distribute, disclose or transmit this software or
9 * the related documents without Intel's prior written permission.
10 *
11 * This software and the related documents are provided as is, with no express
12 * or implied warranties, other than those that are expressly stated in the
13 * License.
14 *******************************************************************************/
15 
16 /*
17 //++
18 // Implementation of the file data source class.
19 //--
20 */
21 
22 #ifndef __CSV_DATA_SOURCE_H__
23 #define __CSV_DATA_SOURCE_H__
24 
25 #include "services/daal_memory.h"
26 #include "data_management/data_source/data_source.h"
27 #include "data_management/data/data_dictionary.h"
28 #include "data_management/data/numeric_table.h"
29 #include "data_management/data/homogen_numeric_table.h"
30 #include "data_management/data_source/internal/data_source_options.h"
31 
32 namespace daal
33 {
34 namespace data_management
35 {
36 namespace interface1
37 {
47 class CsvDataSourceOptions
48 {
49 public:
50  enum Value
51  {
52  byDefault = 0,
53  allocateNumericTable = 1 << 0,
54  createDictionaryFromContext = 1 << 1,
55  parseHeader = 1 << 2
56  };
57 
58  static CsvDataSourceOptions::Value unite(const CsvDataSourceOptions::Value &lhs,
59  const CsvDataSourceOptions::Value &rhs)
60  {
61  return internal::DataSourceOptionsImpl<Value>::unite(lhs, rhs);
62  }
63 
64  CsvDataSourceOptions(Value flags = byDefault) :
65  _impl(flags) { }
66 
67  DataSource::NumericTableAllocationFlag getNumericTableAllocationFlag() const
68  {
69  return (_impl.getFlag(allocateNumericTable))
70  ? DataSource::doAllocateNumericTable
71  : DataSource::notAllocateNumericTable;
72  }
73 
74  DataSource::DictionaryCreationFlag getDictionaryCreationFlag() const
75  {
76  return (_impl.getFlag(createDictionaryFromContext))
77  ? DataSource::doDictionaryFromContext
78  : DataSource::notDictionaryFromContext;
79  }
80 
81  bool getParseHeaderFlag() const
82  {
83  return _impl.getFlag(parseHeader);
84  }
85 
86 private:
87  internal::DataSourceOptionsImpl<Value> _impl;
88 };
89 
96 template< typename FeatureManager, typename SummaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
97 class CsvDataSource : public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, SummaryStatisticsType>
98 {
99 private:
100  typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
101  typedef DataSourceTemplate<DefaultNumericTableType, SummaryStatisticsType> super;
102 
103 protected:
104  using super::_dict;
105  using super::_initialMaxRows;
106 
107 public:
116  CsvDataSource(DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
117  DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
118  size_t initialMaxRows = 10) :
119  super(doAllocateNumericTable,
120  doCreateDictionaryFromContext)
121  {
122  initialize(initialMaxRows);
123  }
124 
130  CsvDataSource(const CsvDataSourceOptions &options, size_t initialMaxRows = 10) :
131  super(options.getNumericTableAllocationFlag(),
132  options.getDictionaryCreationFlag())
133  {
134  initialize(initialMaxRows);
135  _parseHeader = options.getParseHeaderFlag();
136  }
137 
138  virtual ~CsvDataSource()
139  {
140  daal::services::daal_free(_rawLineBuffer);
141  }
142 
146  FeatureManager &getFeatureManager()
147  {
148  return _featureManager;
149  }
150 
151  size_t getNumericTableNumberOfColumns() DAAL_C11_OVERRIDE
152  {
153  return _featureManager.getNumericTableNumberOfColumns();
154  }
155 
156  services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
157  {
158  services::Status s = DataSource::setDictionary(dict);
159  _featureManager.setFeatureDetailsFromDictionary(dict);
160 
161  return s;
162  }
163 
164  size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
165  {
166  services::Status s = super::checkDictionary();
167  if(!s)
168  {
169  this->_status.add(services::throwIfPossible(s));
170  return 0;
171  }
172  s = checkInputNumericTable(nt);
173  if(!s)
174  {
175  this->_status.add(services::throwIfPossible(s));
176  return 0;
177  }
178 
179  size_t maxRows = (_initialMaxRows > 0 ? _initialMaxRows : 10);
180  size_t nrows = 0;
181  const size_t ncols = getNumericTableNumberOfColumns();
182  DataCollection tables;
183  for( ;; maxRows *= 2)
184  {
185  NumericTablePtr ntCurrent = HomogenNumericTable<DAAL_DATA_TYPE>::create(ncols, maxRows, NumericTableIface::doAllocate, &s);
186  if (!s)
187  {
188  this->_status.add(services::throwIfPossible(services::Status(services::ErrorNumericTableNotAllocated)));
189  break;
190  }
191  tables.push_back(ntCurrent);
192  const size_t rows = loadDataBlock(maxRows, ntCurrent.get());
193  nrows += rows;
194  if (rows < maxRows)
195  break;
196  }
197 
198  s = resetNumericTable(nt, nrows);
199  if(!s)
200  {
201  this->_status.add(services::throwIfPossible(s));
202  return 0;
203  }
204 
205  BlockDescriptor<DAAL_DATA_TYPE> blockCurrent, block;
206  size_t pos = 0;
207  for (size_t i = 0; i < tables.size(); i++)
208  {
209  NumericTable *ntCurrent = (NumericTable*)(tables[i].get());
210  size_t rows = ntCurrent->getNumberOfRows();
211 
212  if(!rows)
213  continue;
214 
215  ntCurrent->getBlockOfRows(0, rows, readOnly, blockCurrent);
216  nt->getBlockOfRows(pos, rows, writeOnly, block);
217 
218  services::daal_memcpy_s(block.getBlockPtr(), rows * ncols * sizeof(DAAL_DATA_TYPE), blockCurrent.getBlockPtr(), rows * ncols * sizeof(DAAL_DATA_TYPE));
219 
220  ntCurrent->releaseBlockOfRows(blockCurrent);
221  nt->releaseBlockOfRows(block);
222 
223  super::combineStatistics( ntCurrent, nt, pos == 0);
224  pos += rows;
225  }
226  return nrows;
227  }
228 
229  size_t loadDataBlock(size_t maxRows, NumericTable* nt) DAAL_C11_OVERRIDE
230  {
231  size_t nLines = loadDataBlock(maxRows, 0, maxRows, nt);
232  nt->resize( nLines );
233  return nLines;
234  }
235 
236  size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows, NumericTable *nt) DAAL_C11_OVERRIDE
237  {
238  services::Status s = super::checkDictionary();
239  if(!s)
240  {
241  this->_status.add(services::throwIfPossible(s));
242  return 0;
243  }
244  s = checkInputNumericTable(nt);
245  if(!s)
246  {
247  this->_status.add(services::throwIfPossible(s));
248  return 0;
249  }
250 
251  if (rowOffset + maxRows > fullRows)
252  {
253  this->_status.add(services::throwIfPossible(services::ErrorIncorrectDataRange));
254  return 0;
255  }
256 
257  s = resetNumericTable(nt, fullRows);
258  if(!s)
259  {
260  this->_status.add(services::throwIfPossible(s));
261  return 0;
262  }
263 
264  if (_parseHeader && !_firstRowRead)
265  {
266  // Skip header
267  readLine();
268  _firstRowRead = true;
269  }
270 
271  size_t j = 0;
272  for(; j < maxRows && !iseof() ; j++ )
273  {
274  s = readLine();
275  if(!s || !_rawLineLength)
276  {
277  break;
278  }
279 
280  _featureManager.parseRowIn( _rawLineBuffer, _rawLineLength, this->_dict.get(), nt, rowOffset + j );
281 
282  super::updateStatistics( j, nt, rowOffset );
283  }
284  _featureManager.finalize(this->_dict.get());
285 
286  return rowOffset + j;
287  }
288 
289  size_t loadDataBlock() DAAL_C11_OVERRIDE
290  {
291  return DataSource::loadDataBlock();
292  }
293 
294  size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
295  {
296  return DataSource::loadDataBlock(maxRows);
297  }
298 
299  size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows) DAAL_C11_OVERRIDE
300  {
301  return DataSource::loadDataBlock(maxRows, rowOffset, fullRows);
302  }
303 
304 
305  services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
306  {
307  services::Status s;
308 
309  if (_dict)
310  {
311  return services::throwIfPossible(services::Status(services::ErrorDictionaryAlreadyAvailable));
312  }
313 
314  _dict = DataSourceDictionary::create(&s);
315  if (!s) { return s; }
316  _contextDictFlag = true;
317 
318  if (_parseHeader)
319  {
320  s = readLine();
321  if (!s) { return services::throwIfPossible(s); }
322  _featureManager.parseRowAsHeader(_rawLineBuffer, _rawLineLength);
323  }
324 
325  s = readLine();
326  if (!s) { return services::throwIfPossible(s); }
327  _featureManager.parseRowAsDictionary(_rawLineBuffer, _rawLineLength, this->_dict.get());
328 
329  return services::Status();
330  }
331 
332  size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
333  {
334  return 0;
335  }
336 
337 protected:
338  virtual bool iseof() const = 0;
339  virtual services::Status readLine() = 0;
340 
341  virtual services::Status resetNumericTable(NumericTable *nt, const size_t newSize)
342  {
343  services::Status s;
344 
345  NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
346  const size_t nFeatures = getNumericTableNumberOfColumns();
347  ntDict->setNumberOfFeatures(nFeatures);
348  for (size_t i = 0; i < nFeatures; i++)
349  ntDict->setFeature((*_dict)[i].ntFeature, i);
350 
351  s = super::resizeNumericTableImpl(newSize, nt);
352  if(!s)
353  {
354  return s;
355  }
356 
357  nt->setNormalizationFlag(NumericTable::nonNormalized);
358  return services::Status();
359  }
360 
361  virtual services::Status checkInputNumericTable(const NumericTable* const nt) const
362  {
363  if(!nt)
364  {
365  return services::Status(services::ErrorNullInputNumericTable);
366  }
367 
368  const NumericTable::StorageLayout layout = nt->getDataLayout();
369  if (layout == NumericTable::csrArray)
370  {
371  return services::Status(services::ErrorIncorrectTypeOfInputNumericTable);
372  }
373 
374  return services::Status();
375  }
376 
377  bool enlargeBuffer()
378  {
379  int newRawLineBufferLen = _rawLineBufferLen * 2;
380  char* newRawLineBuffer = (char *)daal::services::daal_malloc( newRawLineBufferLen );
381  if(newRawLineBuffer == 0)
382  return false;
383  daal::services::daal_memcpy_s(newRawLineBuffer, newRawLineBufferLen, _rawLineBuffer, _rawLineBufferLen);
384  daal::services::daal_free( _rawLineBuffer );
385  _rawLineBuffer = newRawLineBuffer;
386  _rawLineBufferLen = newRawLineBufferLen;
387  return true;
388  }
389 
390 private:
391  services::Status initialize(size_t initialMaxRows)
392  {
393  _parseHeader = false;
394  _firstRowRead = false;
395  _contextDictFlag = false;
396  _rawLineLength = 0;
397  _initialMaxRows = initialMaxRows;
398 
399  _rawLineBufferLen = (int)INITIAL_LINE_BUFFER_LENGTH;
400  _rawLineBuffer = (char *)daal::services::daal_malloc(_rawLineBufferLen);
401  if (!_rawLineBuffer) { return services::throwIfPossible(services::ErrorMemoryAllocationFailed); }
402 
403  return services::Status();
404  }
405 
406 protected:
407  char *_rawLineBuffer;
408  int _rawLineBufferLen;
409  int _rawLineLength;
410 
411 private:
412  bool _parseHeader;
413  bool _firstRowRead;
414  bool _contextDictFlag;
415  FeatureManager _featureManager;
416 
417  static const size_t INITIAL_LINE_BUFFER_LENGTH = 1024;
418 };
419 
421 } // namespace interface1
422 
423 using interface1::CsvDataSource;
424 using interface1::CsvDataSourceOptions;
425 
426 inline CsvDataSourceOptions::Value operator |(const CsvDataSourceOptions::Value &lhs,
427  const CsvDataSourceOptions::Value &rhs)
428 { return CsvDataSourceOptions::unite(lhs, rhs); }
429 
430 } // namespace data_management
431 } // namespace daal
432 
433 #endif
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows, NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:236
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:359
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:151
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:81
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:69
daal::data_management::interface1::CsvDataSource::setDictionary
services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:156
daal
Definition: algorithm_base_common.h:31
daal::services::ErrorNumericTableNotAllocated
Definition: error_indexes.h:155
daal::data_management::interface1::CsvDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:305
daal::data_management::interface1::DataCollection
Class that provides functionality of Collection container for objects derived from SerializationIface...
Definition: data_collection.h:45
daal::data_management::interface1::CsvDataSource::CsvDataSource
CsvDataSource(const CsvDataSourceOptions &options, size_t initialMaxRows=10)
Definition: csv_data_source.h:130
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:632
daal::data_management::interface1::CsvDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:332
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:69
daal::services::ErrorMemoryAllocationFailed
Definition: error_indexes.h:146
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:164
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:81
daal::data_management::interface1::DataCollection::push_back
DataCollection & push_back(const SerializationIfacePtr &x)
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:463
daal::services::daal_memcpy_s
DAAL_EXPORT void daal_memcpy_s(void *dest, size_t numberOfElements, const void *src, size_t count)
daal::data_management::interface1::NumericTable::setNormalizationFlag
NormalizationType setNormalizationFlag(NormalizationType flag)
Definition: numeric_table.h:736
daal::data_management::interface1::CsvDataSource
Specifies methods to access data stored in files.
Definition: csv_data_source.h:97
daal::data_management::interface1::NumericTableIface::nonNormalized
Definition: numeric_table.h:317
daal::services::daal_malloc
DAAL_EXPORT void * daal_malloc(size_t size, size_t alignment=DAAL_MALLOC_DEFAULT_ALIGNMENT)
daal::data_management::interface1::HomogenNumericTable::create
static services::SharedPtr< HomogenNumericTable< DataType > > create(NumericTableDictionaryPtr ddictForHomogenNumericTable, services::Status *stat=NULL)
Definition: homogen_numeric_table.h:93
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:574
daal::data_management::interface1::NumericTableIface::StorageLayout
StorageLayout
Storage layouts that may need to be supported.
Definition: numeric_table.h:326
daal::data_management::interface1::DataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: data_source.h:272
daal::data_management::interface1::CsvDataSource::getFeatureManager
FeatureManager & getFeatureManager()
Definition: csv_data_source.h:146
daal::data_management::interface1::DataSourceIface::doDictionaryFromContext
Definition: data_source.h:72
daal::data_management::interface1::CsvDataSource::CsvDataSource
CsvDataSource(DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: csv_data_source.h:116
daal::services::ErrorIncorrectTypeOfInputNumericTable
Definition: error_indexes.h:91
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:289
daal::services::daal_free
DAAL_EXPORT void daal_free(void *ptr)
daal::services::ErrorIncorrectDataRange
Definition: error_indexes.h:77
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:299
daal::data_management::interface1::DataCollection::size
size_t size() const
daal::data_management::interface1::NumericTableIface::doAllocate
Definition: numeric_table.h:287
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:294
daal::data_management::interface1::CsvDataSourceOptions
Options of CSV data source.
Definition: csv_data_source.h:47
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:660
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:71
daal::data_management::interface1::BlockDescriptor< DAAL_DATA_TYPE >
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:229
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:79
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:186
daal::data_management::interface1::DataSource::setDictionary
services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
Definition: data_source.h:217
daal::data_management::interface1::CsvDataSource::getNumericTableNumberOfColumns
size_t getNumericTableNumberOfColumns() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:151
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0
daal::data_management::internal::DataSourceOptionsImpl
Class that helps to define data source options.
Definition: data_source_options.h:31
daal::data_management::interface1::Dictionary
Class that represents a dictionary of a data set and provides methods to work with the data dictionar...
Definition: data_dictionary.h:161

For more complete information about compiler optimizations, see our Optimization Notice.