C++ API Reference for Intel® Data Analytics Acceleration Library 2019 Update 4

mysql_feature_manager.h
1 /* file: mysql_feature_manager.h */
2 /*******************************************************************************
3 * Copyright 2014-2019 Intel Corporation.
4 *
5 * This software and the related documents are Intel copyrighted materials, and
6 * your use of them is governed by the express license under which they were
7 * provided to you (License). Unless the License provides otherwise, you may not
8 * use, modify, copy, publish, distribute, disclose or transmit this software or
9 * the related documents without Intel's prior written permission.
10 *
11 * This software and the related documents are provided as is, with no express
12 * or implied warranties, other than those that are expressly stated in the
13 * License.
14 *******************************************************************************/
15 
16 /*
17 //++
18 // Implementation of the MYSQL data source class.
19 //--
20 */
21 #ifndef __MYSQL_FEATURE_MANAGER_H__
22 #define __MYSQL_FEATURE_MANAGER_H__
23 
24 #include <sstream>
25 
26 #include "data_management/data/numeric_table.h"
27 #include "data_management/features/shortcuts.h"
28 #include "data_management/data_source/data_source.h"
29 #include "data_management/data_source/internal/sql_feature_utils.h"
30 #include "data_management/data_source/modifiers/sql/shortcuts.h"
31 #include "data_management/data_source/modifiers/sql/internal/engine.h"
32 
33 namespace daal
34 {
35 namespace data_management
36 {
37 namespace interface1
38 {
48 class SQLFeatureManager
49 {
50 public:
58  SQLFeatureManager &addModifier(const features::FeatureIdCollectionIfacePtr &featureIds,
59  const modifiers::sql::FeatureModifierIfacePtr &modifier,
60  services::Status *status = NULL)
61  {
62  services::Status localStatus;
63  if (!_modifiersManager)
64  {
65  _modifiersManager = modifiers::sql::internal::ModifiersManager::create(&localStatus);
66  if (!localStatus)
67  {
68  services::internal::tryAssignStatusAndThrow(status, localStatus);
69  return *this;
70  }
71  }
72 
73  localStatus |= _modifiersManager->addModifier(featureIds, modifier);
74  if (!localStatus)
75  {
76  services::internal::tryAssignStatusAndThrow(status, localStatus);
77  return *this;
78  }
79 
80  return *this;
81  }
82 
89  DataSourceIface::DataSourceStatus statementResultsNumericTable(SQLHSTMT hdlStmt, NumericTable *nt, size_t maxRows)
90  {
91  DAAL_ASSERT( nt );
92  DAAL_ASSERT( hdlStmt );
93 
94  nt->resize(maxRows);
95 
96  nt->getBlockOfRows(0, maxRows, writeOnly, _currentRowBlock);
97  DAAL_DATA_TYPE *ntBuffer = _currentRowBlock.getBlockPtr();
98  const size_t nColumns = _currentRowBlock.getNumberOfColumns();
99 
100  SQLRETURN ret;
101  size_t read = 0;
102  while (SQL_SUCCEEDED(ret = SQLFetchScroll(hdlStmt, SQL_FETCH_NEXT, 1)))
103  {
104  services::BufferView<DAAL_DATA_TYPE> rowBuffer(ntBuffer + read * nColumns, nColumns);
105 
106  if (_modifiersManager)
107  {
108  _modifiersManager->applyModifiers(rowBuffer);
109  }
110  else
111  {
112  _fetchBuffer->copyTo(rowBuffer);
113  }
114 
115  read++;
116  if (read >= maxRows) { break; }
117  }
118 
119  nt->releaseBlockOfRows(_currentRowBlock);
120  nt->resize(read);
121 
122  DataSourceIface::DataSourceStatus status = DataSourceIface::readyForLoad;
123  if (ret != SQL_NO_DATA)
124  {
125  if (!SQL_SUCCEEDED(ret))
126  {
127  status = DataSourceIface::notReady;
128  _errors->add(services::ErrorODBC);
129  }
130  }
131  else
132  {
133  if (read < maxRows)
134  {
135  status = DataSourceIface::endOfData;
136  }
137  }
138  return status;
139  }
140 
146  services::Status createDictionary(SQLHSTMT hdlStmt, DataSourceDictionary *dictionary)
147  {
148  DAAL_ASSERT( dictionary );
149  DAAL_ASSERT( hdlStmt );
150 
151  services::Status status;
152 
153  const internal::SQLFeaturesInfo &featuresInfo = getFeaturesInfo(hdlStmt, &status);
154  DAAL_CHECK_STATUS_VAR( status );
155 
156  DAAL_CHECK_STATUS( status, bindSQLColumns(hdlStmt, featuresInfo) );
157  DAAL_CHECK_STATUS( status, fillDictionary(*dictionary, featuresInfo) );
158 
159  return status;
160  }
161 
169  std::string setLimitQuery(std::string &query, size_t idx_last_read, size_t maxRows)
170  {
171  std::stringstream ss;
172  ss << query << " LIMIT " << idx_last_read << ", " << maxRows << ";";
173  return ss.str();
174  }
175 
176  services::ErrorCollectionPtr getErrors()
177  {
178  return services::ErrorCollectionPtr(new services::ErrorCollection());
179  }
180 
181 private:
182  internal::SQLFeaturesInfo getFeaturesInfo(SQLHSTMT hdlStmt, services::Status *status = NULL)
183  {
184  SQLSMALLINT nFeatures = 0;
185  SQLRETURN ret = SQLNumResultCols(hdlStmt, &nFeatures);
186  if (!SQL_SUCCEEDED(ret))
187  {
188  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
189  return internal::SQLFeaturesInfo();
190  }
191 
192  SQLLEN sqlType;
193  SQLLEN sqlIsUnsigned;
194  SQLLEN sqlTypeLength;
195  internal::SQLFeaturesInfo featuresInfo;
196 
197  for (int i = 0 ; i < nFeatures; i++)
198  {
199  const int bufferSize = 128;
200  char label[bufferSize];
201 
202  SQLLEN sqlType;
203  SQLLEN sqlOctetLength;
204  SQLSMALLINT labelLenUsed;
205 
206  SQLLEN sqlIsUnsigned;
207  ret = SQLColAttributes(hdlStmt, (SQLUSMALLINT)(i + 1), SQL_DESC_UNSIGNED, NULL, 0, NULL, &sqlIsUnsigned);
208  if (!SQL_SUCCEEDED(ret))
209  {
210  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
211  return internal::SQLFeaturesInfo();
212  }
213 
214  ret = SQLColAttributes(hdlStmt, (SQLUSMALLINT)(i + 1), SQL_DESC_TYPE, NULL, 0, NULL, &sqlType);
215  if (!SQL_SUCCEEDED(ret))
216  {
217  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
218  return internal::SQLFeaturesInfo();
219  }
220 
221  ret = SQLColAttributes(hdlStmt, (SQLUSMALLINT)(i + 1), SQL_DESC_OCTET_LENGTH, NULL, 0, NULL, &sqlOctetLength);
222  if (!SQL_SUCCEEDED(ret))
223  {
224  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
225  return internal::SQLFeaturesInfo();
226  }
227 
228  ret = SQLColAttributes(hdlStmt, (SQLUSMALLINT)(i + 1), SQL_DESC_NAME, (SQLPOINTER)label,
229  (SQLSMALLINT)bufferSize, &labelLenUsed, NULL);
230  if (!SQL_SUCCEEDED(ret))
231  {
232  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
233  return internal::SQLFeaturesInfo();
234  }
235 
236  const services::String labelStr(label, label + labelLenUsed);
237  const bool isSigned = sqlIsUnsigned == SQL_FALSE;
238 
239  featuresInfo.add( internal::SQLFeatureInfo(labelStr,
240  sqlType,
241  sqlOctetLength,
242  isSigned) );
243  }
244 
245  return featuresInfo;
246  }
247 
248  services::Status bindSQLColumns(SQLHSTMT hdlStmt, const internal::SQLFeaturesInfo &featuresInfo)
249  {
250  DAAL_ASSERT( hdlStmt );
251 
252  services::Status status;
253 
254  const internal::SQLFetchMode::Value fetchMode = _modifiersManager
255  ? internal::SQLFetchMode::useNativeSQLTypes
256  : internal::SQLFetchMode::castToFloatingPointType;
257  _fetchBuffer = internal::SQLFetchBuffer::create(featuresInfo, fetchMode, &status);
258  DAAL_CHECK_STATUS_VAR(status);
259 
260  SQLRETURN ret = SQLFreeStmt(hdlStmt, SQL_UNBIND);
261  if (!SQL_SUCCEEDED(ret)) { return services::throwIfPossible(services::ErrorODBC); }
262 
263  const SQLSMALLINT targetSQLType = internal::SQLFetchMode::getTargetType(fetchMode);
264  for (size_t i = 0; i < featuresInfo.getNumberOfFeatures(); i++)
265  {
266  char * const buffer = _fetchBuffer->getBufferForFeature(i);
267  const SQLLEN bufferSize = _fetchBuffer->getBufferSizeForFeature(i);
268  SQLLEN * const actualSizeBuffer = _fetchBuffer->getActualDataSizeBufferForFeature(i);
269 
270  SQLLEN strLenOrIndPtr;
271  ret = SQLBindCol(hdlStmt, (SQLUSMALLINT)(i + 1), targetSQLType,
272  (SQLPOINTER)buffer, bufferSize, actualSizeBuffer);
273  if (!SQL_SUCCEEDED(ret)) { return services::throwIfPossible(services::ErrorODBC); }
274  }
275 
276  if (_modifiersManager)
277  {
278  DAAL_CHECK_STATUS( status, _modifiersManager->prepare(featuresInfo, *_fetchBuffer) );
279  }
280 
281  return status;
282  }
283 
284  services::Status fillDictionary(DataSourceDictionary &dictionary,
285  const internal::SQLFeaturesInfo &featuresInfo)
286  {
287  if (_modifiersManager)
288  {
289  return _modifiersManager->fillDictionary(dictionary);
290  }
291 
292  const size_t nFeatures = featuresInfo.getNumberOfFeatures();
293  services::Status status = dictionary.setNumberOfFeatures(nFeatures);
294  if (!status) { return services::throwIfPossible(status); }
295 
296  for (size_t i = 0; i < nFeatures; i++)
297  {
298  dictionary[i].ntFeature.setType<DAAL_DATA_TYPE>();
299  dictionary[i].ntFeature.featureType = features::DAAL_CONTINUOUS;
300  }
301 
302  return status;
303  }
304 
305 private:
306  internal::SQLFetchBufferPtr _fetchBuffer;
307  BlockDescriptor<DAAL_DATA_TYPE> _currentRowBlock;
308  services::SharedPtr<services::ErrorCollection> _errors;
309  modifiers::sql::internal::ModifiersManagerPtr _modifiersManager;
310 };
311 
312 typedef SQLFeatureManager MySQLFeatureManager;
313 
315 } // namespace interface1
316 
317 using interface1::SQLFeatureManager;
318 using interface1::MySQLFeatureManager;
319 
320 } // namespace data_management
321 } // namespace daal
322 
323 #endif
daal::data_management::interface1::SQLFeatureManager
Interprets the response of SQL data base and fill provided numeric table and dictionary.
Definition: mysql_feature_manager.h:48
daal::data_management::interface1::SQLFeatureManager::addModifier
SQLFeatureManager & addModifier(const features::FeatureIdCollectionIfacePtr &featureIds, const modifiers::sql::FeatureModifierIfacePtr &modifier, services::Status *status=NULL)
Definition: mysql_feature_manager.h:58
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:69
daal
Definition: algorithm_base_common.h:31
daal::data_management::interface1::SQLFeatureManager::setLimitQuery
std::string setLimitQuery(std::string &query, size_t idx_last_read, size_t maxRows)
Definition: mysql_feature_manager.h:169
daal::data_management::interface1::BlockDescriptor::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:95
daal::data_management::interface1::SQLFeatureManager::statementResultsNumericTable
DataSourceIface::DataSourceStatus statementResultsNumericTable(SQLHSTMT hdlStmt, NumericTable *nt, size_t maxRows)
Definition: mysql_feature_manager.h:89
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:574
daal::data_management::interface1::NumericTable::resize
virtual services::Status resize(size_t nrows) DAAL_C11_OVERRIDE
Definition: numeric_table.h:636
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:57
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:59
daal::data_management::interface1::DataSourceIface::endOfData
Definition: data_source.h:61
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::services::ErrorODBC
Definition: error_indexes.h:379
daal::data_management::interface1::SQLFeatureManager::createDictionary
services::Status createDictionary(SQLHSTMT hdlStmt, DataSourceDictionary *dictionary)
Definition: mysql_feature_manager.h:146
daal::data_management::interface1::DataSourceIface::notReady
Definition: data_source.h:62
daal::data_management::internal::SQLFeaturesInfo
Class that holds auxiliary information about multiple SQL columns.
Definition: sql_feature_utils.h:70
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0
daal::data_management::interface1::Dictionary
Class that represents a dictionary of a data set and provides methods to work with the data dictionar...
Definition: data_dictionary.h:161

For more complete information about compiler optimizations, see our Optimization Notice.