C++ API Reference for Intel® Data Analytics Acceleration Library 2019 Update 5

arrow_numeric_table.h
1 /* file: arrow_numeric_table.h */
2 /*******************************************************************************
3 * Copyright 2014-2019 Intel Corporation.
4 *
5 * This software and the related documents are Intel copyrighted materials, and
6 * your use of them is governed by the express license under which they were
7 * provided to you (License). Unless the License provides otherwise, you may not
8 * use, modify, copy, publish, distribute, disclose or transmit this software or
9 * the related documents without Intel's prior written permission.
10 *
11 * This software and the related documents are provided as is, with no express
12 * or implied warranties, other than those that are expressly stated in the
13 * License.
14 *******************************************************************************/
15 
16 /*
17 //++
18 // Implementation of a numeric table stored as a Apache Arrow table.
19 //--
20 */
21 
22 #ifndef __ARROW_NUMERIC_TABLE_H__
23 #define __ARROW_NUMERIC_TABLE_H__
24 
25 #include "data_management/data/numeric_table.h"
26 #include "data_management/data/internal/conversion.h"
27 #include "data_management/data/internal/base_arrow_numeric_table.h"
28 #include <memory>
29 #include <arrow/table.h>
30 
31 namespace daal
32 {
33 namespace data_management
34 {
35 
36 namespace interface1
37 {
46 class DAAL_EXPORT ArrowImmutableNumericTable : public BaseArrowImmutableNumericTable
47 {
48 public:
49  DECLARE_SERIALIZABLE_IMPL();
50 
57  static DAAL_FORCEINLINE services::SharedPtr<ArrowImmutableNumericTable> create(const std::shared_ptr<arrow::Table> & table,
58  services::Status * stat = NULL)
59  {
60  if (!table)
61  {
62  if (stat) { stat->add(services::ErrorNullPtr); }
63  return services::SharedPtr<ArrowImmutableNumericTable>();
64  }
65 
66  DAAL_DEFAULT_CREATE_IMPL_EX(ArrowImmutableNumericTable, table);
67  }
68 
75  static DAAL_FORCEINLINE services::SharedPtr<ArrowImmutableNumericTable> create(const std::shared_ptr<const arrow::Table> & table,
76  services::Status * stat = NULL)
77  {
78  if (!table)
79  {
80  if (stat) { stat->add(services::ErrorNullPtr); }
81  return services::SharedPtr<ArrowImmutableNumericTable>();
82  }
83 
84  DAAL_DEFAULT_CREATE_IMPL_EX(ArrowImmutableNumericTable, table);
85  }
86 
87  services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
88  {
89  return getTBlock<double>(vectorIdx, vector_num, rwflag, block);
90  }
91 
92  services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
93  {
94  return getTBlock<float>(vectorIdx, vector_num, rwflag, block);
95  }
96 
97  services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
98  {
99  return getTBlock<int>(vectorIdx, vector_num, rwflag, block);
100  }
101 
102  services::Status releaseBlockOfRows(BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
103  {
104  return releaseTBlock<double>(block);
105  }
106 
107  services::Status releaseBlockOfRows(BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
108  {
109  return releaseTBlock<float>(block);
110  }
111 
112  services::Status releaseBlockOfRows(BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
113  {
114  return releaseTBlock<int>(block);
115  }
116 
117  services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum,
118  ReadWriteMode rwflag, BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
119  {
120  return getTFeature<double>(featureIdx, vectorIdx, valueNum, rwflag, block);
121  }
122 
123  services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum,
124  ReadWriteMode rwflag, BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
125  {
126  return getTFeature<float>(featureIdx, vectorIdx, valueNum, rwflag, block);
127  }
128 
129  services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum,
130  ReadWriteMode rwflag, BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
131  {
132  return getTFeature<int>(featureIdx, vectorIdx, valueNum, rwflag, block);
133  }
134 
135  services::Status releaseBlockOfColumnValues(BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
136  {
137  return releaseTFeature<double>(block);
138  }
139  services::Status releaseBlockOfColumnValues(BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
140  {
141  return releaseTFeature<float>(block);
142  }
143  services::Status releaseBlockOfColumnValues(BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
144  {
145  return releaseTFeature<int>(block);
146  }
147 
148 protected:
149  services::Status setNumberOfColumnsImpl(size_t ncol) DAAL_C11_OVERRIDE
150  {
151  if (ncol == getNumberOfColumns()) return services::Status();
152  return services::Status(services::ErrorMethodNotSupported);
153  }
154 
155  services::Status allocateDataMemoryImpl(daal::MemType type = daal::dram) DAAL_C11_OVERRIDE
156  {
157  return services::Status(services::ErrorMethodNotSupported);
158  }
159 
160  void freeDataMemoryImpl() DAAL_C11_OVERRIDE
161  {
162  _table.reset();
163  _memStatus = notAllocated;
164  }
165 
166  template<typename Archive, bool onDeserialize>
167  services::Status serialImpl(Archive* arch)
168  {
169  if (onDeserialize)
170  {
171  return services::Status(services::ErrorMethodNotSupported);
172  }
173 
174  NumericTable::serialImpl<Archive, onDeserialize>(arch);
175 
176  const size_t ncol = _ddict->getNumberOfFeatures();
177  const size_t nrows = getNumberOfRows();
178 
179  for (size_t i = 0; i < ncol; ++i)
180  {
181  const NumericTableFeature& f = (*_ddict)[i];
182 
183  const std::shared_ptr<const arrow::Column> columnPtr = _table->column(i);
184  DAAL_ASSERT(columnPtr);
185  const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = columnPtr->data();
186  DAAL_ASSERT(columnChunkedArrayPtr);
187  const arrow::ChunkedArray& columnChunkedArray = *columnChunkedArrayPtr;
188  const int chunkCount = columnChunkedArray.num_chunks();
189  DAAL_ASSERT(chunkCount > 0);
190 
191  for (int chunk = 0; chunk < chunkCount; ++chunk)
192  {
193  const std::shared_ptr<const arrow::Array> arrayPtr = columnChunkedArray.chunk(chunk);
194  DAAL_ASSERT(arrayPtr);
195  const int64_t chunkLength = arrayPtr->length();
196  DAAL_ASSERT(chunkLength > 0);
197  arch->set(getPtr(arrayPtr, f), chunkLength * f.typeSize);
198  }
199  }
200 
201  return services::Status();
202  }
203 
204 private:
205  DAAL_FORCEINLINE ArrowImmutableNumericTable(const std::shared_ptr<const arrow::Table> & table, services::Status & st)
206  : BaseArrowImmutableNumericTable(table->num_columns(), table->num_rows(), st), _table(table)
207  {
208  _layout = arrow;
209  _memStatus = internallyAllocated;
210  if (st) st |= updateFeatures(*table);
211  }
212 
213  std::shared_ptr<const arrow::Table> _table;
214 
215  DAAL_FORCEINLINE services::Status updateFeatures(const arrow::Table & table)
216  {
217  services::Status s;
218  if (_ddict.get() == NULL)
219  {
220  _ddict = NumericTableDictionary::create(&s);
221  }
222  if (!s) return s;
223 
224  const std::shared_ptr<const arrow::Schema> schemaPtr = table.schema();
225  DAAL_ASSERT(schemaPtr);
226  const int ncols = schemaPtr->num_fields();
227  for (int col = 0; col < ncols; ++col)
228  {
229  const arrow::Type::type type = schemaPtr->field(col)->type()->id();
230  switch (type)
231  {
232  case arrow::Type::UINT8: s |= setFeature<unsigned char>(col); break;
233  case arrow::Type::INT8: s |= setFeature<char>(col); break;
234  case arrow::Type::UINT16: s |= setFeature<unsigned short>(col); break;
235  case arrow::Type::INT16: s |= setFeature<short>(col); break;
236  case arrow::Type::UINT32: s |= setFeature<unsigned int>(col); break;
237  case arrow::Type::DATE32:
238  case arrow::Type::TIME32:
239  case arrow::Type::INT32: s |= setFeature<int>(col); break;
240  case arrow::Type::UINT64: s |= setFeature<DAAL_UINT64>(col); break;
241  case arrow::Type::DATE64:
242  case arrow::Type::TIMESTAMP:
243  case arrow::Type::TIME64:
244  case arrow::Type::INT64: s |= setFeature<DAAL_INT64>(col); break;
245  case arrow::Type::FLOAT: s |= setFeature<float>(col); break;
246  case arrow::Type::DOUBLE: s |= setFeature<double>(col); break;
247  default: s.add(services::ErrorDataTypeNotSupported); return s;
248  }
249  }
250  return s;
251  }
252 
253  template <typename T>
254  services::Status setFeature(size_t idx, features::FeatureType featureType = features::DAAL_CONTINUOUS, size_t categoryNumber = 0)
255  {
256  DAAL_ASSERT(_ddict);
257  services::Status s = _ddict->setFeature<T>(idx);
258  if (!s) return s;
259  (*_ddict)[idx].featureType = featureType;
260  (*_ddict)[idx].categoryNumber = categoryNumber;
261  return s;
262  }
263 
264  template <typename T>
265  services::Status getTBlock(size_t idx, size_t nrows, ReadWriteMode rwFlag, BlockDescriptor<T>& block)
266  {
267  if (block.getRWFlag() & (int)writeOnly)
268  {
269  return services::Status(services::ErrorMethodNotSupported);
270  }
271 
272  const size_t ncols = getNumberOfColumns();
273  const size_t nobs = getNumberOfRows();
274  block.setDetails(0, idx, rwFlag);
275 
276  if (idx >= nobs)
277  {
278  block.resizeBuffer(ncols, 0);
279  return services::Status();
280  }
281 
282  nrows = (idx + nrows < nobs) ? nrows : nobs - idx;
283 
284  if (!block.resizeBuffer(ncols, nrows)) { return services::Status(services::ErrorMemoryAllocationFailed); }
285 
286  T lbuf[32];
287  size_t di = 32;
288  T* const buffer = block.getBlockPtr();
289 
290  for (size_t i = 0; i < nrows; i += di)
291  {
292  if (i + di > nrows) { di = nrows - i; }
293 
294  for (size_t j = 0; j < ncols; ++j)
295  {
296  const NumericTableFeature& f = (*_ddict)[j];
297  const std::shared_ptr<const arrow::Column> columnPtr = _table->column(j);
298  DAAL_ASSERT(columnPtr);
299  const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = columnPtr->data();
300  DAAL_ASSERT(columnChunkedArrayPtr);
301  const std::shared_ptr<const arrow::ChunkedArray> sliceChunkedArrayPtr = columnChunkedArrayPtr->Slice(idx + i, di);
302  DAAL_ASSERT(sliceChunkedArrayPtr);
303  const arrow::ChunkedArray& sliceChunkedArray = *sliceChunkedArrayPtr;
304  const int chunkCount = sliceChunkedArray.num_chunks();
305  DAAL_ASSERT(chunkCount > 0);
306  if (chunkCount == 1)
307  {
308  const char* const ptr = getPtr(sliceChunkedArray.chunk(0), f);
309  DAAL_ASSERT(ptr);
310  internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(di, ptr, lbuf);
311  }
312  else
313  {
314  size_t offset = 0;
315  for (int chunk = 0; chunk < chunkCount; ++chunk)
316  {
317  const std::shared_ptr<const arrow::Array> arrayPtr = sliceChunkedArray.chunk(chunk);
318  DAAL_ASSERT(arrayPtr);
319  const int64_t chunkLength = arrayPtr->length();
320  DAAL_ASSERT(chunkLength > 0);
321  const char* const ptr = getPtr(arrayPtr, f);
322  DAAL_ASSERT(ptr);
323  internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(chunkLength, ptr, &(lbuf[offset]));
324  offset += chunkLength;
325  }
326  DAAL_ASSERT(offset == di);
327  }
328 
329  for (size_t ii = 0; ii < di; ++ii)
330  {
331  buffer[(i + ii) * ncols + j] = lbuf[ii];
332  }
333  }
334  }
335 
336  return services::Status();
337  }
338 
339  template <typename T>
340  services::Status releaseTBlock(BlockDescriptor<T>& block)
341  {
342  if (block.getRWFlag() & (int)writeOnly)
343  {
344  return services::Status(services::ErrorMethodNotSupported);
345  }
346 
347  block.reset();
348  return services::Status();
349  }
350 
351  template <typename T>
352  services::Status getTFeature(size_t featIdx, size_t idx, size_t nrows, int rwFlag, BlockDescriptor<T>& block)
353  {
354  if (block.getRWFlag() & (int)writeOnly)
355  {
356  return services::Status(services::ErrorMethodNotSupported);
357  }
358 
359  const size_t ncols = getNumberOfColumns();
360  const size_t nobs = getNumberOfRows();
361  block.setDetails(featIdx, idx, rwFlag);
362 
363  if (idx >= nobs)
364  {
365  block.resizeBuffer(1, 0);
366  return services::Status();
367  }
368 
369  nrows = (idx + nrows < nobs) ? nrows : nobs - idx;
370 
371  const NumericTableFeature& f = (*_ddict)[featIdx];
372 
373  const std::shared_ptr<const arrow::Column> columnPtr = _table->column(featIdx);
374  DAAL_ASSERT(columnPtr);
375  const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = columnPtr->data();
376  DAAL_ASSERT(columnChunkedArrayPtr);
377  const std::shared_ptr<const arrow::ChunkedArray> sliceChunkedArrayPtr = columnChunkedArrayPtr->Slice(idx, nrows);
378  DAAL_ASSERT(sliceChunkedArrayPtr);
379  const arrow::ChunkedArray& sliceChunkedArray = *sliceChunkedArrayPtr;
380  const int chunkCount = sliceChunkedArray.num_chunks();
381  DAAL_ASSERT(chunkCount > 0);
382 
383  if (features::internal::getIndexNumType<T>() == f.indexType && chunkCount == 1)
384  {
385  const T* const ptr = getPtr<T>(sliceChunkedArray.chunk(0), f);
386  DAAL_ASSERT(ptr);
387  block.setPtr(const_cast<T* const>(ptr), 1, nrows);
388  }
389  else
390  {
391  if (!block.resizeBuffer(1, nrows))
392  {
393  return services::Status(services::ErrorMemoryAllocationFailed);
394  }
395 
396  if (!(block.getRWFlag() & (int)readOnly)) return services::Status();
397 
398  if (chunkCount == 1)
399  {
400  const char* const ptr = getPtr(sliceChunkedArray.chunk(0), f);
401  DAAL_ASSERT(ptr);
402  internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(nrows, ptr, block.getBlockPtr());
403  }
404  else
405  {
406  size_t offset = 0;
407  T* const destPtr = block.getBlockPtr();
408  for (int chunk = 0; chunk < chunkCount; ++chunk)
409  {
410  const std::shared_ptr<const arrow::Array> arrayPtr = sliceChunkedArray.chunk(chunk);
411  DAAL_ASSERT(arrayPtr);
412  const int64_t chunkLength = arrayPtr->length();
413  DAAL_ASSERT(chunkLength > 0);
414  const char* const ptr = getPtr(arrayPtr, f);
415  DAAL_ASSERT(ptr);
416  internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(chunkLength, ptr, destPtr + offset);
417  offset += chunkLength;
418  }
419  DAAL_ASSERT(offset == di);
420  }
421  }
422  return services::Status();
423  }
424 
425  template <typename T>
426  services::Status releaseTFeature(BlockDescriptor<T>& block)
427  {
428  if (block.getRWFlag() & (int)writeOnly)
429  {
430  return services::Status(services::ErrorMethodNotSupported);
431  }
432 
433  block.reset();
434  return services::Status();
435  }
436 
437  template <typename T = char>
438  const T* getPtr(const arrow::Array& array, const NumericTableFeature& f, int bufferIndex = 1) const
439  {
440  const std::shared_ptr<const arrow::ArrayData> arrayDataPtr = array.data();
441  DAAL_ASSERT(arrayDataPtr);
442  const arrow::ArrayData& arrayData = *arrayDataPtr;
443  return reinterpret_cast<const T*>(arrayData.template GetValues<char>(bufferIndex, arrayData.offset * f.typeSize));
444  }
445 
446  template <typename T = char>
447  const T* getPtr(const std::shared_ptr<const arrow::Array>& array, const NumericTableFeature& f, int bufferIndex = 1) const
448  {
449  return getPtr<T>(*array, f, bufferIndex);
450  }
451 };
452 typedef services::SharedPtr<ArrowImmutableNumericTable> ArrowImmutableNumericTablePtr;
454 } // namespace interface1
455 using interface1::ArrowImmutableNumericTable;
456 using interface1::ArrowImmutableNumericTablePtr;
457 
458 } // namespace data_management
459 } // namespace daal
460 #endif
daal::data_management::interface1::ArrowImmutableNumericTable::create
static DAAL_FORCEINLINE services::SharedPtr< ArrowImmutableNumericTable > create(const std::shared_ptr< const arrow::Table > &table, services::Status *stat=NULL)
Definition: arrow_numeric_table.h:75
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:135
daal
Definition: algorithm_base_common.h:31
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:143
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:107
daal::services::ErrorMemoryAllocationFailed
Definition: error_indexes.h:147
daal::dram
Definition: daal_defines.h:147
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:102
daal::services::ErrorDataTypeNotSupported
Definition: error_indexes.h:142
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:139
daal::services::ErrorNullPtr
Definition: error_indexes.h:139
daal::data_management::interface1::ArrowImmutableNumericTable
Class that provides methods to access data stored as a Apache Arrow table.
Definition: arrow_numeric_table.h:46
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:112
daal::data_management::interface1::BaseArrowImmutableNumericTable
Base class that provides methods to access data stored as a immutable Apache Arrow table...
Definition: base_arrow_numeric_table.h:55
daal::data_management::interface1::ArrowImmutableNumericTable::create
static DAAL_FORCEINLINE services::SharedPtr< ArrowImmutableNumericTable > create(const std::shared_ptr< arrow::Table > &table, services::Status *stat=NULL)
Definition: arrow_numeric_table.h:57
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:97
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:92
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:87
daal::data_management::interface1::BlockDescriptor
Base class that manages buffer memory for read/write operations required by numeric tables...
Definition: numeric_table.h:55
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:123
daal::algorithms::implicit_als::training::offset
Definition: implicit_als_training_types.h:148
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:186
daal::MemType
MemType
Definition: daal_defines.h:145
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:129
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:117
daal::services::ErrorMethodNotSupported
Definition: error_indexes.h:69

For more complete information about compiler optimizations, see our Optimization Notice.