#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
typedef float algorithmFPType;
const size_t nClusters = 20;
const size_t nIterations = 5;
const size_t nBlocks = 4;
const size_t nVectorsInBlock = 8000;
const string dataFileNames[] =
{
"../data/distributed/kmeans_csr_1.csv", "../data/distributed/kmeans_csr_2.csv",
"../data/distributed/kmeans_csr_3.csv", "../data/distributed/kmeans_csr_4.csv"
};
void loadData(NumericTablePtr data[nBlocks])
{
for(size_t i = 0; i < nBlocks; i++)
data[i] = CSRNumericTablePtr(createSparseTable<float>(dataFileNames[i]));
}
template <kmeans::init::Method method>
NumericTablePtr initCentroids(const NumericTablePtr data[nBlocks]);
void calculateCentroids(const NumericTablePtr& initialCentroids, const NumericTablePtr data[nBlocks]);
template <kmeans::init::Method method>
void runKMeans(const NumericTablePtr data[nBlocks], const char* methodName)
{
std::cout << "K-means init parameters: method = " << methodName << std::endl;
NumericTablePtr centroids = initCentroids<method>(data);
calculateCentroids(centroids, data);
}
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 4, &dataFileNames[0], &dataFileNames[1], &dataFileNames[2], &dataFileNames[3]);
NumericTablePtr data[nBlocks];
loadData(data);
runKMeans<kmeans::init::plusPlusCSR>(data, "plusPlusCSR");
runKMeans<kmeans::init::parallelPlusCSR>(data, "parallelPlusCSR");
return 0;
}
template <kmeans::init::Method method>
NumericTablePtr initStep1(const NumericTablePtr data[nBlocks])
{
for(size_t i = 0; i < nBlocks; i++)
{
kmeans::init::Distributed<step1Local, algorithmFPType, method> local(nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock);
local.input.set(kmeans::init::data, data[i]);
local.compute();
NumericTablePtr pNewCenters = local.getPartialResult()->get(kmeans::init::partialCentroids);
if(pNewCenters.get())
return pNewCenters;
}
return NumericTablePtr();
}
template <kmeans::init::Method method>
void initStep23(const NumericTablePtr data[nBlocks], DataCollectionPtr localNodeData[nBlocks], const NumericTablePtr& step2Input,
kmeans::init::Distributed<step3Master, algorithmFPType, method>& step3, bool bFirstIteration)
{
for(size_t i = 0; i < nBlocks; i++)
{
kmeans::init::Distributed<step2Local, algorithmFPType, method> step2(nClusters, bFirstIteration);
step2.input.set(kmeans::init::data, data[i]);
step2.input.set(kmeans::init::internalInput, localNodeData[i]);
step2.input.set(kmeans::init::inputOfStep2, step2Input);
step2.compute();
if(bFirstIteration)
localNodeData[i] = step2.getPartialResult()->get(kmeans::init::internalResult);
step3.input.add(kmeans::init::inputOfStep3FromStep2, i, step2.getPartialResult()->get(kmeans::init::outputOfStep2ForStep3));
}
step3.compute();
}
template <kmeans::init::Method method>
NumericTablePtr initStep4(const NumericTablePtr data[nBlocks], DataCollectionPtr localNodeData[nBlocks],
kmeans::init::Distributed<step3Master, algorithmFPType, method>& step3)
{
std::vector<NumericTablePtr> aRes;
for(size_t i = 0; i < nBlocks; ++i)
{
NumericTablePtr step3Output = step3.getPartialResult()->get(kmeans::init::outputOfStep3ForStep4, i);
if(!step3Output)
continue;
kmeans::init::Distributed<step4Local, algorithmFPType, method> step4(nClusters);
step4.input.set(kmeans::init::data, data[i]);
step4.input.set(kmeans::init::internalInput, localNodeData[i]);
step4.input.set(kmeans::init::inputOfStep4FromStep3, step3Output);
step4.compute();
aRes.push_back(step4.getPartialResult()->get(kmeans::init::outputOfStep4));
}
if(!aRes.size())
return NumericTablePtr();
if(aRes.size() == 1)
return aRes[0];
RowMergedNumericTablePtr pMerged(new RowMergedNumericTable());
for(size_t i = 0; i < aRes.size(); ++i)
pMerged->addNumericTable(aRes[i]);
return NumericTable::cast(pMerged);
}
template<>
NumericTablePtr initCentroids<kmeans::init::plusPlusCSR>(const NumericTablePtr data[nBlocks])
{
DataCollectionPtr localNodeData[nBlocks];
RowMergedNumericTablePtr pCentroids(new RowMergedNumericTable());
NumericTablePtr pNewCentroids = initStep1<kmeans::init::plusPlusCSR>(data);
pCentroids->addNumericTable(pNewCentroids);
kmeans::init::Distributed<step3Master, algorithmFPType, kmeans::init::plusPlusCSR> step3(nClusters);
for(size_t iCenter = 1; iCenter < nClusters; ++iCenter)
{
initStep23<kmeans::init::plusPlusCSR>(data, localNodeData, pNewCentroids, step3, iCenter == 1);
pNewCentroids = initStep4<kmeans::init::plusPlusCSR>(data, localNodeData, step3);
pCentroids->addNumericTable(pNewCentroids);
}
return NumericTable::cast(pCentroids);
}
template<>
NumericTablePtr initCentroids<kmeans::init::parallelPlusCSR>(const NumericTablePtr data[nBlocks])
{
DataCollectionPtr localNodeData[nBlocks];
NumericTablePtr pNewCentroids = initStep1<kmeans::init::parallelPlusCSR>(data);
kmeans::init::Distributed<step5Master, algorithmFPType, kmeans::init::parallelPlusCSR> step5(nClusters);
step5.input.add(kmeans::init::inputCentroids, pNewCentroids);
kmeans::init::Distributed<step3Master, algorithmFPType, kmeans::init::parallelPlusCSR> step3(nClusters);
for(size_t iRound = 0; iRound < step5.parameter.nRounds; ++iRound)
{
initStep23<kmeans::init::parallelPlusCSR>(data, localNodeData, pNewCentroids, step3, iRound == 0);
pNewCentroids = initStep4<kmeans::init::parallelPlusCSR>(data, localNodeData, step3);
step5.input.add(kmeans::init::inputCentroids, pNewCentroids);
}
for(size_t i = 0; i < nBlocks; i++)
{
kmeans::init::Distributed<step2Local, algorithmFPType, kmeans::init::parallelPlusCSR> local(nClusters, false);
local.parameter.outputForStep5Required = true;
local.input.set(kmeans::init::data, data[i]);
local.input.set(kmeans::init::internalInput, localNodeData[i]);
local.input.set(kmeans::init::inputOfStep2, pNewCentroids);
local.compute();
step5.input.add(kmeans::init::inputOfStep5FromStep2, local.getPartialResult()->get(kmeans::init::outputOfStep2ForStep5));
}
step5.input.set(kmeans::init::inputOfStep5FromStep3, step3.getPartialResult()->get(kmeans::init::outputOfStep3ForStep5));
step5.compute();
step5.finalizeCompute();
return step5.getResult()->get(kmeans::init::centroids);
}
void calculateCentroids(const NumericTablePtr& initialCentroids, const NumericTablePtr data[nBlocks])
{
kmeans::Distributed<step2Master, algorithmFPType, kmeans::lloydCSR> masterAlgorithm(nClusters);
const size_t nRows = initialCentroids->getNumberOfRows();
const size_t nCols = initialCentroids->getNumberOfColumns();
NumericTablePtr assignments[nBlocks];
NumericTablePtr centroids = initialCentroids;
NumericTablePtr objectiveFunction;
for(size_t it = 0; it < nIterations; it++)
{
for(size_t i = 0; i < nBlocks; i++)
{
kmeans::Distributed<step1Local, algorithmFPType, kmeans::lloydCSR> localAlgorithm(nClusters, false);
localAlgorithm.input.set(kmeans::data, data[i]);
localAlgorithm.input.set(kmeans::inputCentroids, centroids);
localAlgorithm.compute();
masterAlgorithm.input.add(kmeans::partialResults, localAlgorithm.getPartialResult());
}
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();
centroids = masterAlgorithm.getResult()->get(kmeans::centroids);
objectiveFunction = masterAlgorithm.getResult()->get(kmeans::objectiveFunction);
}
for(size_t i = 0; i < nBlocks; i++)
{
kmeans::Batch<algorithmFPType, kmeans::lloydCSR> localAlgorithm(nClusters, 0);
localAlgorithm.input.set(kmeans::data, data[i]);
localAlgorithm.input.set(kmeans::inputCentroids, centroids);
localAlgorithm.compute();
assignments[i] = localAlgorithm.getResult()->get(kmeans::assignments);
}
printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10);
printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10);
printNumericTable(objectiveFunction, "Objective function value:");
}