C++ API Reference for Intel® Data Analytics Acceleration Library 2018 Update 3

csv_data_source.h
1 /* file: csv_data_source.h */
2 /*******************************************************************************
3 * Copyright 2014-2018 Intel Corporation.
4 *
5 * This software and the related documents are Intel copyrighted materials, and
6 * your use of them is governed by the express license under which they were
7 * provided to you (License). Unless the License provides otherwise, you may not
8 * use, modify, copy, publish, distribute, disclose or transmit this software or
9 * the related documents without Intel's prior written permission.
10 *
11 * This software and the related documents are provided as is, with no express
12 * or implied warranties, other than those that are expressly stated in the
13 * License.
14 *******************************************************************************/
15 
16 /*
17 //++
18 // Implementation of the file data source class.
19 //--
20 */
21 
22 #ifndef __CSV_DATA_SOURCE_H__
23 #define __CSV_DATA_SOURCE_H__
24 
25 #include "services/daal_memory.h"
26 #include "data_management/data_source/data_source.h"
27 #include "data_management/data/data_dictionary.h"
28 #include "data_management/data/numeric_table.h"
29 #include "data_management/data/homogen_numeric_table.h"
30 
31 namespace daal
32 {
33 namespace data_management
34 {
35 
36 namespace interface1
37 {
47 template< typename _featureManager, typename _summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE >
48 class CsvDataSource : public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, _summaryStatisticsType>
49 {
50 public:
51  using DataSource::checkDictionary;
52  using DataSource::checkNumericTable;
53  using DataSource::freeNumericTable;
54  using DataSource::_dict;
55  using DataSource::_initialMaxRows;
56 
60  typedef _featureManager FeatureManager;
61 
62 protected:
63  typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
64 
65  FeatureManager featureManager;
66 
67 public:
76  CsvDataSource(DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
77  DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
78  size_t initialMaxRows = 10):
79  DataSourceTemplate<DefaultNumericTableType, _summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
80  _rawLineBuffer(NULL),
81  _rawLineLength(0)
82  {
83  _rawLineBufferLen = 1024;
84  _rawLineBuffer = (char *)daal::services::daal_malloc(_rawLineBufferLen);
85 
86  _contextDictFlag = false;
87  _initialMaxRows = initialMaxRows;
88  }
89 
90  ~CsvDataSource()
91  {
92  daal::services::daal_free( _rawLineBuffer );
93  DataSourceTemplate<DefaultNumericTableType, _summaryStatisticsType>::freeNumericTable();
94  }
95 
100  FeatureManager &getFeatureManager()
101  {
102  return featureManager;
103  }
104 
105 public:
106  size_t getNumericTableNumberOfColumns() DAAL_C11_OVERRIDE
107  {
108  return featureManager.getNumericTableNumberOfColumns();
109  }
110 
111  services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
112  {
113  services::Status s = DataSource::setDictionary(dict);
114  featureManager.setFeatureDetailsFromDictionary(dict);
115 
116  return s;
117  }
118 
119  size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
120  {
121  services::Status s = checkDictionary();
122  if(!s)
123  {
124  this->_status.add(services::throwIfPossible(s));
125  return 0;
126  }
127  s = checkInputNumericTable(nt);
128  if(!s)
129  {
130  this->_status.add(services::throwIfPossible(s));
131  return 0;
132  }
133 
134  size_t maxRows = (_initialMaxRows > 0 ? _initialMaxRows : 10);
135  size_t nrows = 0;
136  const size_t ncols = getNumericTableNumberOfColumns();
137  DataCollection tables;
138  for( ;; maxRows *= 2)
139  {
140  NumericTablePtr ntCurrent = HomogenNumericTable<DAAL_DATA_TYPE>::create(ncols, maxRows, NumericTableIface::doAllocate, &s);
141  if (!s)
142  {
143  this->_status.add(services::throwIfPossible(services::Status(services::ErrorNumericTableNotAllocated)));
144  break;
145  }
146  tables.push_back(ntCurrent);
147  const size_t rows = loadDataBlock(maxRows, ntCurrent.get());
148  nrows += rows;
149  if (rows < maxRows)
150  break;
151  }
152 
153  s = resetNumericTable(nt, nrows);
154  if(!s)
155  {
156  this->_status.add(services::throwIfPossible(s));
157  return 0;
158  }
159 
160  BlockDescriptor<DAAL_DATA_TYPE> blockCurrent, block;
161  size_t pos = 0;
162  for (size_t i = 0; i < tables.size(); i++)
163  {
164  NumericTable *ntCurrent = (NumericTable*)(tables[i].get());
165  size_t rows = ntCurrent->getNumberOfRows();
166 
167  if(!rows)
168  continue;
169 
170  ntCurrent->getBlockOfRows(0, rows, readOnly, blockCurrent);
171  nt->getBlockOfRows(pos, rows, writeOnly, block);
172 
173  services::daal_memcpy_s(block.getBlockPtr(), rows * ncols * sizeof(DAAL_DATA_TYPE), blockCurrent.getBlockPtr(), rows * ncols * sizeof(DAAL_DATA_TYPE));
174 
175  ntCurrent->releaseBlockOfRows(blockCurrent);
176  nt->releaseBlockOfRows(block);
177 
178  DataSourceTemplate<DefaultNumericTableType, _summaryStatisticsType>::combineStatistics( ntCurrent, nt, pos == 0);
179  pos += rows;
180  }
181  return nrows;
182  }
183 
184  size_t loadDataBlock(size_t maxRows, NumericTable* nt) DAAL_C11_OVERRIDE
185  {
186  size_t nLines = loadDataBlock(maxRows, 0, maxRows, nt);
187  nt->resize( nLines );
188  return nLines;
189  }
190 
191  size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows, NumericTable *nt) DAAL_C11_OVERRIDE
192  {
193  services::Status s = checkDictionary();
194  if(!s)
195  {
196  this->_status.add(services::throwIfPossible(s));
197  return 0;
198  }
199  s = checkInputNumericTable(nt);
200  if(!s)
201  {
202  this->_status.add(services::throwIfPossible(s));
203  return 0;
204  }
205 
206  if (rowOffset + maxRows > fullRows)
207  {
208  this->_status.add(services::throwIfPossible(services::ErrorIncorrectDataRange));
209  return 0;
210  }
211 
212  s = resetNumericTable(nt, fullRows);
213  if(!s)
214  {
215  this->_status.add(services::throwIfPossible(s));
216  return 0;
217  }
218 
219  size_t j = 0;
220  for(; j < maxRows && !iseof() ; j++ )
221  {
222  s = readLine();
223  if(!s || !_rawLineLength)
224  break;
225  featureManager.parseRowIn( _rawLineBuffer, _rawLineLength, this->_dict.get(), nt, rowOffset + j );
226 
227  DataSourceTemplate<DefaultNumericTableType, _summaryStatisticsType>::updateStatistics( j, nt, rowOffset );
228  }
229 
230  return rowOffset + j;
231  }
232 
233  size_t loadDataBlock() DAAL_C11_OVERRIDE
234  {
235  return DataSource::loadDataBlock();
236  }
237 
238  size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
239  {
240  return DataSource::loadDataBlock(maxRows);
241  }
242 
243  size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows) DAAL_C11_OVERRIDE
244  {
245  return DataSource::loadDataBlock(maxRows, rowOffset, fullRows);
246  }
247 
248 
249  services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
250  {
251  if(_dict)
252  return services::throwIfPossible(services::Status(services::ErrorDictionaryAlreadyAvailable));
253 
254  _contextDictFlag = true;
255  services::Status s;
256  _dict = DataSourceDictionary::create(&s);
257  if(!s) return s;
258 
259  s = readLine();
260  if(!s)
261  {
262  this->_dict.reset();
263  return services::throwIfPossible(s);
264  }
265 
266  featureManager.parseRowAsDictionary( _rawLineBuffer, _rawLineLength, this->_dict.get() );
267  return services::Status();
268  }
269 
270  size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
271  {
272  return 0;
273  }
274 
275 protected:
276  virtual bool iseof() const = 0;
277  virtual services::Status readLine() = 0;
278 
279  virtual services::Status resetNumericTable(NumericTable *nt, const size_t newSize)
280  {
281  services::Status s;
282 
283  NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
284  const size_t nFeatures = getNumericTableNumberOfColumns();
285  ntDict->setNumberOfFeatures(nFeatures);
286  for (size_t i = 0; i < nFeatures; i++)
287  ntDict->setFeature((*_dict)[i].ntFeature, i);
288 
289  s = DataSourceTemplate<DefaultNumericTableType, _summaryStatisticsType>::resizeNumericTableImpl(newSize, nt);
290  if(!s)
291  {
292  return s;
293  }
294 
295  nt->setNormalizationFlag(NumericTable::nonNormalized);
296  return services::Status();
297  }
298 
299  virtual services::Status checkInputNumericTable(const NumericTable* const nt) const
300  {
301  if(!nt)
302  {
303  return services::Status(services::ErrorNullInputNumericTable);
304  }
305 
306  const NumericTable::StorageLayout layout = nt->getDataLayout();
307  if (layout == NumericTable::csrArray)
308  {
309  return services::Status(services::ErrorIncorrectTypeOfInputNumericTable);
310  }
311 
312  return services::Status();
313  }
314 
315  bool enlargeBuffer()
316  {
317  int newRawLineBufferLen = _rawLineBufferLen * 2;
318  char* newRawLineBuffer = (char *)daal::services::daal_malloc( newRawLineBufferLen );
319  if(newRawLineBuffer == 0)
320  return false;
321  daal::services::daal_memcpy_s(newRawLineBuffer, newRawLineBufferLen, _rawLineBuffer, _rawLineBufferLen);
322  daal::services::daal_free( _rawLineBuffer );
323  _rawLineBuffer = newRawLineBuffer;
324  _rawLineBufferLen = newRawLineBufferLen;
325  return true;
326  }
327 
328 protected:
329  char *_rawLineBuffer;
330  int _rawLineBufferLen;
331  int _rawLineLength;
332 
333  bool _contextDictFlag;
334 };
336 } // namespace interface1
337 using interface1::CsvDataSource;
338 
339 }
340 }
341 #endif
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows, NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:191
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:359
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:151
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:81
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:69
daal::data_management::interface1::CsvDataSource::setDictionary
services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:111
daal
Definition: algorithm_base_common.h:31
daal::services::ErrorNumericTableNotAllocated
Definition: error_indexes.h:155
daal::data_management::interface1::CsvDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:249
daal::data_management::interface1::DataCollection
Class that provides functionality of Collection container for objects derived from SerializationIface...
Definition: data_collection.h:45
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:632
daal::data_management::interface1::CsvDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:270
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:69
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:119
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:81
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::DataCollection::push_back
DataCollection & push_back(const SerializationIfacePtr &x)
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:463
daal::services::daal_memcpy_s
DAAL_EXPORT void daal_memcpy_s(void *dest, size_t numberOfElements, const void *src, size_t count)
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:345
daal::data_management::interface1::NumericTable::setNormalizationFlag
NormalizationType setNormalizationFlag(NormalizationType flag)
Definition: numeric_table.h:736
daal::data_management::interface1::CsvDataSource
Specifies methods to access data stored in files.
Definition: csv_data_source.h:48
daal::data_management::interface1::NumericTableIface::nonNormalized
Definition: numeric_table.h:317
daal::services::daal_malloc
DAAL_EXPORT void * daal_malloc(size_t size, size_t alignment=DAAL_MALLOC_DEFAULT_ALIGNMENT)
daal::data_management::interface1::HomogenNumericTable::create
static services::SharedPtr< HomogenNumericTable< DataType > > create(NumericTableDictionaryPtr ddictForHomogenNumericTable, services::Status *stat=NULL)
Definition: homogen_numeric_table.h:91
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:574
daal::data_management::interface1::NumericTableIface::StorageLayout
StorageLayout
Storage layouts that may need to be supported.
Definition: numeric_table.h:326
daal::data_management::interface1::DataSourceTemplate::freeNumericTable
void freeNumericTable() DAAL_C11_OVERRIDE
Definition: data_source.h:509
daal::data_management::interface1::DataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: data_source.h:272
daal::data_management::interface1::CsvDataSource::getFeatureManager
FeatureManager & getFeatureManager()
Definition: csv_data_source.h:100
daal::data_management::interface1::CsvDataSource::CsvDataSource
CsvDataSource(DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: csv_data_source.h:76
daal::services::ErrorIncorrectTypeOfInputNumericTable
Definition: error_indexes.h:91
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:233
daal::services::daal_free
DAAL_EXPORT void daal_free(void *ptr)
daal::services::ErrorIncorrectDataRange
Definition: error_indexes.h:77
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:243
daal::data_management::interface1::DataCollection::size
size_t size() const
daal::data_management::interface1::NumericTableIface::doAllocate
Definition: numeric_table.h:287
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:238
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:660
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:71
daal::data_management::interface1::BlockDescriptor< DAAL_DATA_TYPE >
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:184
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:79
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:183
daal::data_management::interface1::CsvDataSource::FeatureManager
_featureManager FeatureManager
Definition: csv_data_source.h:60
daal::data_management::interface1::DataSource::setDictionary
services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
Definition: data_source.h:217
daal::data_management::interface1::CsvDataSource::getNumericTableNumberOfColumns
size_t getNumericTableNumberOfColumns() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:106
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0
daal::data_management::interface1::Dictionary
Class that represents a dictionary of a data set and provides methods to work with the data dictionar...
Definition: data_dictionary.h:158

For more complete information about compiler optimizations, see our Optimization Notice.