C++ API Reference for Intel® Data Analytics Acceleration Library 2019

kmeans_init_csr_distr.cpp

/* file: kmeans_init_csr_distr.cpp */
/*******************************************************************************
* Copyright 2014-2018 Intel Corporation.
*
* This software and the related documents are Intel copyrighted materials, and
* your use of them is governed by the express license under which they were
* provided to you (License). Unless the License provides otherwise, you may not
* use, modify, copy, publish, distribute, disclose or transmit this software or
* the related documents without Intel's prior written permission.
*
* This software and the related documents are provided as is, with no express
* or implied warranties, other than those that are expressly stated in the
* License.
*******************************************************************************/
/*
! Content:
! C++ example of CSR K-Means clustering in the distributed processing mode
!******************************************************************************/
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
typedef float algorithmFPType; /* Algorithm floating-point type */
/* K-Means algorithm parameters */
const size_t nClusters = 20;
const size_t nIterations = 5;
const size_t nBlocks = 4;
const size_t nVectorsInBlock = 8000;
const string dataFileNames[] =
{
"../data/distributed/kmeans_csr_1.csv", "../data/distributed/kmeans_csr_2.csv",
"../data/distributed/kmeans_csr_3.csv", "../data/distributed/kmeans_csr_4.csv"
};
void loadData(NumericTablePtr data[nBlocks])
{
for(size_t i = 0; i < nBlocks; i++)
data[i] = CSRNumericTablePtr(createSparseTable<float>(dataFileNames[i]));
}
template <kmeans::init::Method method>
NumericTablePtr initCentroids(const NumericTablePtr data[nBlocks]);
void calculateCentroids(const NumericTablePtr& initialCentroids, const NumericTablePtr data[nBlocks]);
template <kmeans::init::Method method>
void runKMeans(const NumericTablePtr data[nBlocks], const char* methodName)
{
std::cout << "K-means init parameters: method = " << methodName << std::endl;
NumericTablePtr centroids = initCentroids<method>(data);
calculateCentroids(centroids, data);
}
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 4, &dataFileNames[0], &dataFileNames[1], &dataFileNames[2], &dataFileNames[3]);
NumericTablePtr data[nBlocks];
loadData(data);
runKMeans<kmeans::init::plusPlusCSR>(data, "plusPlusCSR");
runKMeans<kmeans::init::parallelPlusCSR>(data, "parallelPlusCSR");
return 0;
}
template <kmeans::init::Method method>
NumericTablePtr initStep1(const NumericTablePtr data[nBlocks])
{
for(size_t i = 0; i < nBlocks; i++)
{
/* Create an algorithm object for the K-Means algorithm */
kmeans::init::Distributed<step1Local, algorithmFPType, method> local(nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock);
local.input.set(kmeans::init::data, data[i]);
local.compute();
NumericTablePtr pNewCenters = local.getPartialResult()->get(kmeans::init::partialCentroids);
if(pNewCenters.get())
return pNewCenters;
}
return NumericTablePtr();
}
template <kmeans::init::Method method>
void initStep23(const NumericTablePtr data[nBlocks], DataCollectionPtr localNodeData[nBlocks], const NumericTablePtr& step2Input,
kmeans::init::Distributed<step3Master, algorithmFPType, method>& step3, bool bFirstIteration)
{
for(size_t i = 0; i < nBlocks; i++)
{
kmeans::init::Distributed<step2Local, algorithmFPType, method> step2(nClusters, bFirstIteration);
step2.input.set(kmeans::init::data, data[i]);
step2.input.set(kmeans::init::internalInput, localNodeData[i]);
step2.input.set(kmeans::init::inputOfStep2, step2Input);
step2.compute();
if(bFirstIteration)
localNodeData[i] = step2.getPartialResult()->get(kmeans::init::internalResult);
step3.input.add(kmeans::init::inputOfStep3FromStep2, i, step2.getPartialResult()->get(kmeans::init::outputOfStep2ForStep3));
}
step3.compute();
}
template <kmeans::init::Method method>
NumericTablePtr initStep4(const NumericTablePtr data[nBlocks], DataCollectionPtr localNodeData[nBlocks],
kmeans::init::Distributed<step3Master, algorithmFPType, method>& step3)
{
std::vector<NumericTablePtr> aRes;
for(size_t i = 0; i < nBlocks; ++i)
{
/* Get an input for step 4 on this node if any */
NumericTablePtr step3Output = step3.getPartialResult()->get(kmeans::init::outputOfStep3ForStep4, i);
if(!step3Output)
continue; /* can be null */
/* Create an algorithm object for the step 4 */
kmeans::init::Distributed<step4Local, algorithmFPType, method> step4(nClusters);
/* Set the input data to the algorithm */
step4.input.set(kmeans::init::data, data[i]);
step4.input.set(kmeans::init::internalInput, localNodeData[i]);
step4.input.set(kmeans::init::inputOfStep4FromStep3, step3Output);
/* Compute and get the result */
step4.compute();
aRes.push_back(step4.getPartialResult()->get(kmeans::init::outputOfStep4));
}
if(!aRes.size())
return NumericTablePtr();
if(aRes.size() == 1)
return aRes[0];
/* For parallelPlus algorithm */
RowMergedNumericTablePtr pMerged(new RowMergedNumericTable());
for(size_t i = 0; i < aRes.size(); ++i)
pMerged->addNumericTable(aRes[i]);
return NumericTable::cast(pMerged);
}
template<>
NumericTablePtr initCentroids<kmeans::init::plusPlusCSR>(const NumericTablePtr data[nBlocks])
{
/* Internal data to be stored on the local nodes */
DataCollectionPtr localNodeData[nBlocks];
/* Numeric table to collect the results */
RowMergedNumericTablePtr pCentroids(new RowMergedNumericTable());
/* First step on the local nodes */
NumericTablePtr pNewCentroids = initStep1<kmeans::init::plusPlusCSR>(data);
pCentroids->addNumericTable(pNewCentroids);
/* Create an algorithm object for the step 3 */
kmeans::init::Distributed<step3Master, algorithmFPType, kmeans::init::plusPlusCSR> step3(nClusters);
for(size_t iCenter = 1; iCenter < nClusters; ++iCenter)
{
/* Perform steps 2 and 3 */
initStep23<kmeans::init::plusPlusCSR>(data, localNodeData, pNewCentroids, step3, iCenter == 1);
/* Perform steps 4 */
pNewCentroids = initStep4<kmeans::init::plusPlusCSR>(data, localNodeData, step3);
pCentroids->addNumericTable(pNewCentroids);
}
return NumericTable::cast(pCentroids);
}
template<>
NumericTablePtr initCentroids<kmeans::init::parallelPlusCSR>(const NumericTablePtr data[nBlocks])
{
/* Internal data to be stored on the local nodes */
DataCollectionPtr localNodeData[nBlocks];
/* First step on the local nodes */
NumericTablePtr pNewCentroids = initStep1<kmeans::init::parallelPlusCSR>(data);
/* Create an algorithm object for the step 5 */
kmeans::init::Distributed<step5Master, algorithmFPType, kmeans::init::parallelPlusCSR> step5(nClusters);
step5.input.add(kmeans::init::inputCentroids, pNewCentroids);
/* Create an algorithm object for the step 3 */
kmeans::init::Distributed<step3Master, algorithmFPType, kmeans::init::parallelPlusCSR> step3(nClusters);
for(size_t iRound = 0; iRound < step5.parameter.nRounds; ++iRound)
{
/* Perform steps 2 and 3 */
initStep23<kmeans::init::parallelPlusCSR>(data, localNodeData, pNewCentroids, step3, iRound == 0);
/* Perform step 4 */
pNewCentroids = initStep4<kmeans::init::parallelPlusCSR>(data, localNodeData, step3);
step5.input.add(kmeans::init::inputCentroids, pNewCentroids);
}
/* One more step 2 */
for(size_t i = 0; i < nBlocks; i++)
{
/* Create an algorithm object for the step 2 */
kmeans::init::Distributed<step2Local, algorithmFPType, kmeans::init::parallelPlusCSR> local(nClusters, false);
local.parameter.outputForStep5Required = true;
/* Set the input data to the algorithm */
local.input.set(kmeans::init::data, data[i]);
local.input.set(kmeans::init::internalInput, localNodeData[i]);
local.input.set(kmeans::init::inputOfStep2, pNewCentroids);
/* Compute and get the result */
local.compute();
/* Add the result to the input of step 5 */
step5.input.add(kmeans::init::inputOfStep5FromStep2, local.getPartialResult()->get(kmeans::init::outputOfStep2ForStep5));
}
step5.input.set(kmeans::init::inputOfStep5FromStep3, step3.getPartialResult()->get(kmeans::init::outputOfStep3ForStep5));
step5.compute();
step5.finalizeCompute();
return step5.getResult()->get(kmeans::init::centroids);
}
void calculateCentroids(const NumericTablePtr& initialCentroids, const NumericTablePtr data[nBlocks])
{
kmeans::Distributed<step2Master, algorithmFPType, kmeans::lloydCSR> masterAlgorithm(nClusters);
const size_t nRows = initialCentroids->getNumberOfRows();
const size_t nCols = initialCentroids->getNumberOfColumns();
NumericTablePtr assignments[nBlocks];
NumericTablePtr centroids = initialCentroids;
NumericTablePtr objectiveFunction;
/* Calculate centroids */
for(size_t it = 0; it < nIterations; it++)
{
for(size_t i = 0; i < nBlocks; i++)
{
/* Create an algorithm object for the K-Means algorithm */
kmeans::Distributed<step1Local, algorithmFPType, kmeans::lloydCSR> localAlgorithm(nClusters, false);
/* Set the input data to the algorithm */
localAlgorithm.input.set(kmeans::data, data[i]);
localAlgorithm.input.set(kmeans::inputCentroids, centroids);
localAlgorithm.compute();
masterAlgorithm.input.add(kmeans::partialResults, localAlgorithm.getPartialResult());
}
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();
centroids = masterAlgorithm.getResult()->get(kmeans::centroids);
objectiveFunction = masterAlgorithm.getResult()->get(kmeans::objectiveFunction);
}
/* Calculate assignments */
for(size_t i = 0; i < nBlocks; i++)
{
/* Create an algorithm object for the K-Means algorithm */
kmeans::Batch<algorithmFPType, kmeans::lloydCSR> localAlgorithm(nClusters, 0);
/* Set the input data to the algorithm */
localAlgorithm.input.set(kmeans::data, data[i]);
localAlgorithm.input.set(kmeans::inputCentroids, centroids);
localAlgorithm.compute();
assignments[i] = localAlgorithm.getResult()->get(kmeans::assignments);
}
/* Print the clusterization results */
printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10);
printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10);
printNumericTable(objectiveFunction, "Objective function value:");
}

For more complete information about compiler optimizations, see our Optimization Notice.