22 #ifndef __ARROW_NUMERIC_TABLE_H__
23 #define __ARROW_NUMERIC_TABLE_H__
25 #include "data_management/data/numeric_table.h"
26 #include "data_management/data/internal/conversion.h"
27 #include "data_management/data/internal/base_arrow_numeric_table.h"
29 #include <arrow/table.h>
33 namespace data_management
46 class DAAL_EXPORT ArrowImmutableNumericTable :
public BaseArrowImmutableNumericTable
49 DECLARE_SERIALIZABLE_IMPL();
57 static DAAL_FORCEINLINE services::SharedPtr<ArrowImmutableNumericTable> create(
const std::shared_ptr<arrow::Table> & table,
58 services::Status * stat = NULL)
62 if (stat) { stat->add(services::ErrorNullPtr); }
63 return services::SharedPtr<ArrowImmutableNumericTable>();
66 DAAL_DEFAULT_CREATE_IMPL_EX(ArrowImmutableNumericTable, table);
75 static DAAL_FORCEINLINE services::SharedPtr<ArrowImmutableNumericTable> create(
const std::shared_ptr<const arrow::Table> & table,
76 services::Status * stat = NULL)
80 if (stat) { stat->add(services::ErrorNullPtr); }
81 return services::SharedPtr<ArrowImmutableNumericTable>();
84 DAAL_DEFAULT_CREATE_IMPL_EX(ArrowImmutableNumericTable, table);
87 services::Status getBlockOfRows(
size_t vectorIdx,
size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
89 return getTBlock<double>(vectorIdx, vector_num, rwflag, block);
92 services::Status getBlockOfRows(
size_t vectorIdx,
size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
94 return getTBlock<float>(vectorIdx, vector_num, rwflag, block);
97 services::Status getBlockOfRows(
size_t vectorIdx,
size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
99 return getTBlock<int>(vectorIdx, vector_num, rwflag, block);
102 services::Status releaseBlockOfRows(BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
104 return releaseTBlock<double>(block);
107 services::Status releaseBlockOfRows(BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
109 return releaseTBlock<float>(block);
112 services::Status releaseBlockOfRows(BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
114 return releaseTBlock<int>(block);
117 services::Status getBlockOfColumnValues(
size_t featureIdx,
size_t vectorIdx,
size_t valueNum,
118 ReadWriteMode rwflag, BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
120 return getTFeature<double>(featureIdx, vectorIdx, valueNum, rwflag, block);
123 services::Status getBlockOfColumnValues(
size_t featureIdx,
size_t vectorIdx,
size_t valueNum,
124 ReadWriteMode rwflag, BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
126 return getTFeature<float>(featureIdx, vectorIdx, valueNum, rwflag, block);
129 services::Status getBlockOfColumnValues(
size_t featureIdx,
size_t vectorIdx,
size_t valueNum,
130 ReadWriteMode rwflag, BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
132 return getTFeature<int>(featureIdx, vectorIdx, valueNum, rwflag, block);
135 services::Status releaseBlockOfColumnValues(BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
137 return releaseTFeature<double>(block);
139 services::Status releaseBlockOfColumnValues(BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
141 return releaseTFeature<float>(block);
143 services::Status releaseBlockOfColumnValues(BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
145 return releaseTFeature<int>(block);
149 services::Status setNumberOfColumnsImpl(
size_t ncol) DAAL_C11_OVERRIDE
151 if (ncol == getNumberOfColumns())
return services::Status();
152 return services::Status(services::ErrorMethodNotSupported);
155 services::Status allocateDataMemoryImpl(daal::MemType type = daal::dram) DAAL_C11_OVERRIDE
157 return services::Status(services::ErrorMethodNotSupported);
160 void freeDataMemoryImpl() DAAL_C11_OVERRIDE
163 _memStatus = notAllocated;
166 template<
typename Archive,
bool onDeserialize>
167 services::Status serialImpl(Archive* arch)
171 return services::Status(services::ErrorMethodNotSupported);
174 NumericTable::serialImpl<Archive, onDeserialize>(arch);
176 const size_t ncol = _ddict->getNumberOfFeatures();
177 const size_t nrows = getNumberOfRows();
179 for (
size_t i = 0; i < ncol; ++i)
181 const NumericTableFeature& f = (*_ddict)[i];
183 const std::shared_ptr<const arrow::Column> columnPtr = _table->column(i);
184 DAAL_ASSERT(columnPtr);
185 const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = columnPtr->data();
186 DAAL_ASSERT(columnChunkedArrayPtr);
187 const arrow::ChunkedArray& columnChunkedArray = *columnChunkedArrayPtr;
188 const int chunkCount = columnChunkedArray.num_chunks();
189 DAAL_ASSERT(chunkCount > 0);
191 for (
int chunk = 0; chunk < chunkCount; ++chunk)
193 const std::shared_ptr<const arrow::Array> arrayPtr = columnChunkedArray.chunk(chunk);
194 DAAL_ASSERT(arrayPtr);
195 const int64_t chunkLength = arrayPtr->length();
196 DAAL_ASSERT(chunkLength > 0);
197 arch->set(getPtr(arrayPtr, f), chunkLength * f.typeSize);
201 return services::Status();
205 DAAL_FORCEINLINE ArrowImmutableNumericTable(
const std::shared_ptr<const arrow::Table> & table, services::Status & st)
206 : BaseArrowImmutableNumericTable(table->num_columns(), table->num_rows(), st), _table(table)
209 _memStatus = internallyAllocated;
210 if (st) st |= updateFeatures(*table);
213 std::shared_ptr<const arrow::Table> _table;
215 DAAL_FORCEINLINE services::Status updateFeatures(
const arrow::Table & table)
218 if (_ddict.get() == NULL)
220 _ddict = NumericTableDictionary::create(&s);
224 const std::shared_ptr<const arrow::Schema> schemaPtr = table.schema();
225 DAAL_ASSERT(schemaPtr);
226 const int ncols = schemaPtr->num_fields();
227 for (
int col = 0; col < ncols; ++col)
229 const arrow::Type::type type = schemaPtr->field(col)->type()->id();
232 case arrow::Type::UINT8: s |= setFeature<unsigned char>(col);
break;
233 case arrow::Type::INT8: s |= setFeature<char>(col);
break;
234 case arrow::Type::UINT16: s |= setFeature<unsigned short>(col);
break;
235 case arrow::Type::INT16: s |= setFeature<short>(col);
break;
236 case arrow::Type::UINT32: s |= setFeature<unsigned int>(col);
break;
237 case arrow::Type::DATE32:
238 case arrow::Type::TIME32:
239 case arrow::Type::INT32: s |= setFeature<int>(col);
break;
240 case arrow::Type::UINT64: s |= setFeature<DAAL_UINT64>(col);
break;
241 case arrow::Type::DATE64:
242 case arrow::Type::TIMESTAMP:
243 case arrow::Type::TIME64:
244 case arrow::Type::INT64: s |= setFeature<DAAL_INT64>(col);
break;
245 case arrow::Type::FLOAT: s |= setFeature<float>(col);
break;
246 case arrow::Type::DOUBLE: s |= setFeature<double>(col);
break;
247 default: s.add(services::ErrorDataTypeNotSupported);
return s;
253 template <
typename T>
254 services::Status setFeature(
size_t idx, features::FeatureType featureType = features::DAAL_CONTINUOUS,
size_t categoryNumber = 0)
257 services::Status s = _ddict->setFeature<T>(idx);
259 (*_ddict)[idx].featureType = featureType;
260 (*_ddict)[idx].categoryNumber = categoryNumber;
264 template <
typename T>
265 services::Status getTBlock(
size_t idx,
size_t nrows, ReadWriteMode rwFlag, BlockDescriptor<T>& block)
267 if (block.getRWFlag() & (int)writeOnly)
269 return services::Status(services::ErrorMethodNotSupported);
272 const size_t ncols = getNumberOfColumns();
273 const size_t nobs = getNumberOfRows();
274 block.setDetails(0, idx, rwFlag);
278 block.resizeBuffer(ncols, 0);
279 return services::Status();
282 nrows = (idx + nrows < nobs) ? nrows : nobs - idx;
284 if (!block.resizeBuffer(ncols, nrows)) {
return services::Status(services::ErrorMemoryAllocationFailed); }
288 T*
const buffer = block.getBlockPtr();
290 for (
size_t i = 0; i < nrows; i += di)
292 if (i + di > nrows) { di = nrows - i; }
294 for (
size_t j = 0; j < ncols; ++j)
296 const NumericTableFeature& f = (*_ddict)[j];
297 const std::shared_ptr<const arrow::Column> columnPtr = _table->column(j);
298 DAAL_ASSERT(columnPtr);
299 const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = columnPtr->data();
300 DAAL_ASSERT(columnChunkedArrayPtr);
301 const std::shared_ptr<const arrow::ChunkedArray> sliceChunkedArrayPtr = columnChunkedArrayPtr->Slice(idx + i, di);
302 DAAL_ASSERT(sliceChunkedArrayPtr);
303 const arrow::ChunkedArray& sliceChunkedArray = *sliceChunkedArrayPtr;
304 const int chunkCount = sliceChunkedArray.num_chunks();
305 DAAL_ASSERT(chunkCount > 0);
308 const char*
const ptr = getPtr(sliceChunkedArray.chunk(0), f);
310 internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(di, ptr, lbuf);
315 for (
int chunk = 0; chunk < chunkCount; ++chunk)
317 const std::shared_ptr<const arrow::Array> arrayPtr = sliceChunkedArray.chunk(chunk);
318 DAAL_ASSERT(arrayPtr);
319 const int64_t chunkLength = arrayPtr->length();
320 DAAL_ASSERT(chunkLength > 0);
321 const char*
const ptr = getPtr(arrayPtr, f);
323 internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(chunkLength, ptr, &(lbuf[offset]));
324 offset += chunkLength;
326 DAAL_ASSERT(offset == di);
329 for (
size_t ii = 0; ii < di; ++ii)
331 buffer[(i + ii) * ncols + j] = lbuf[ii];
336 return services::Status();
339 template <
typename T>
340 services::Status releaseTBlock(BlockDescriptor<T>& block)
342 if (block.getRWFlag() & (int)writeOnly)
344 return services::Status(services::ErrorMethodNotSupported);
348 return services::Status();
351 template <
typename T>
352 services::Status getTFeature(
size_t featIdx,
size_t idx,
size_t nrows,
int rwFlag, BlockDescriptor<T>& block)
354 if (block.getRWFlag() & (int)writeOnly)
356 return services::Status(services::ErrorMethodNotSupported);
359 const size_t ncols = getNumberOfColumns();
360 const size_t nobs = getNumberOfRows();
361 block.setDetails(featIdx, idx, rwFlag);
365 block.resizeBuffer(1, 0);
366 return services::Status();
369 nrows = (idx + nrows < nobs) ? nrows : nobs - idx;
371 const NumericTableFeature& f = (*_ddict)[featIdx];
373 const std::shared_ptr<const arrow::Column> columnPtr = _table->column(featIdx);
374 DAAL_ASSERT(columnPtr);
375 const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = columnPtr->data();
376 DAAL_ASSERT(columnChunkedArrayPtr);
377 const std::shared_ptr<const arrow::ChunkedArray> sliceChunkedArrayPtr = columnChunkedArrayPtr->Slice(idx, nrows);
378 DAAL_ASSERT(sliceChunkedArrayPtr);
379 const arrow::ChunkedArray& sliceChunkedArray = *sliceChunkedArrayPtr;
380 const int chunkCount = sliceChunkedArray.num_chunks();
381 DAAL_ASSERT(chunkCount > 0);
383 if (features::internal::getIndexNumType<T>() == f.indexType && chunkCount == 1)
385 const T*
const ptr = getPtr<T>(sliceChunkedArray.chunk(0), f);
387 block.setPtr(const_cast<T* const>(ptr), 1, nrows);
391 if (!block.resizeBuffer(1, nrows))
393 return services::Status(services::ErrorMemoryAllocationFailed);
396 if (!(block.getRWFlag() & (int)readOnly))
return services::Status();
400 const char*
const ptr = getPtr(sliceChunkedArray.chunk(0), f);
402 internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(nrows, ptr, block.getBlockPtr());
407 T*
const destPtr = block.getBlockPtr();
408 for (
int chunk = 0; chunk < chunkCount; ++chunk)
410 const std::shared_ptr<const arrow::Array> arrayPtr = sliceChunkedArray.chunk(chunk);
411 DAAL_ASSERT(arrayPtr);
412 const int64_t chunkLength = arrayPtr->length();
413 DAAL_ASSERT(chunkLength > 0);
414 const char*
const ptr = getPtr(arrayPtr, f);
416 internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(chunkLength, ptr, destPtr + offset);
417 offset += chunkLength;
419 DAAL_ASSERT(offset == di);
422 return services::Status();
425 template <
typename T>
426 services::Status releaseTFeature(BlockDescriptor<T>& block)
428 if (block.getRWFlag() & (int)writeOnly)
430 return services::Status(services::ErrorMethodNotSupported);
434 return services::Status();
437 template <
typename T =
char>
438 const T* getPtr(
const arrow::Array& array,
const NumericTableFeature& f,
int bufferIndex = 1)
const
440 const std::shared_ptr<const arrow::ArrayData> arrayDataPtr = array.data();
441 DAAL_ASSERT(arrayDataPtr);
442 const arrow::ArrayData& arrayData = *arrayDataPtr;
443 return reinterpret_cast<const T*
>(arrayData.template GetValues<char>(bufferIndex, arrayData.offset * f.typeSize));
446 template <
typename T =
char>
447 const T* getPtr(
const std::shared_ptr<const arrow::Array>& array,
const NumericTableFeature& f,
int bufferIndex = 1)
const
449 return getPtr<T>(*array, f, bufferIndex);
452 typedef services::SharedPtr<ArrowImmutableNumericTable> ArrowImmutableNumericTablePtr;
455 using interface1::ArrowImmutableNumericTable;
456 using interface1::ArrowImmutableNumericTablePtr;
daal::data_management::interface1::ArrowImmutableNumericTable::create
static DAAL_FORCEINLINE services::SharedPtr< ArrowImmutableNumericTable > create(const std::shared_ptr< const arrow::Table > &table, services::Status *stat=NULL)
Definition: arrow_numeric_table.h:75
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:135
daal
Definition: algorithm_base_common.h:31
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:143
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:107
daal::services::ErrorMemoryAllocationFailed
Definition: error_indexes.h:147
daal::dram
Definition: daal_defines.h:147
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:102
daal::services::ErrorDataTypeNotSupported
Definition: error_indexes.h:142
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:139
daal::services::ErrorNullPtr
Definition: error_indexes.h:139
daal::data_management::interface1::ArrowImmutableNumericTable
Class that provides methods to access data stored as a Apache Arrow table.
Definition: arrow_numeric_table.h:46
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:112
daal::data_management::interface1::BaseArrowImmutableNumericTable
Base class that provides methods to access data stored as a immutable Apache Arrow table...
Definition: base_arrow_numeric_table.h:55
daal::data_management::interface1::ArrowImmutableNumericTable::create
static DAAL_FORCEINLINE services::SharedPtr< ArrowImmutableNumericTable > create(const std::shared_ptr< arrow::Table > &table, services::Status *stat=NULL)
Definition: arrow_numeric_table.h:57
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:97
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:92
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:87
daal::data_management::interface1::BlockDescriptor
Base class that manages buffer memory for read/write operations required by numeric tables...
Definition: numeric_table.h:55
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:123
daal::algorithms::implicit_als::training::offset
Definition: implicit_als_training_types.h:148
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:186
daal::MemType
MemType
Definition: daal_defines.h:145
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:129
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:117
daal::services::ErrorMethodNotSupported
Definition: error_indexes.h:69