#include <mpi.h>
#include "daal.h"
#include "service.h"
#include "stdio.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
typedef std::vector<byte> ByteBuffer;
typedef float algorithmFPType;
const size_t nClusters = 20;
const size_t nIterations = 5;
const size_t nBlocks = 4;
const string dataFileNames[4] =
{
"./data/distributed/kmeans_dense.csv", "./data/distributed/kmeans_dense.csv",
"./data/distributed/kmeans_dense.csv", "./data/distributed/kmeans_dense.csv"
};
#define mpi_root 0
NumericTablePtr loadData(int rankId)
{
FileDataSource<CSVFeatureManager> dataSource(dataFileNames[rankId], DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
dataSource.loadDataBlock();
return dataSource.getNumericTable();
}
NumericTablePtr init(int rankId, const NumericTablePtr& pData);
NumericTablePtr compute(int rankId, const NumericTablePtr& pData, const NumericTablePtr& initialCentroids);
int main(int argc, char *argv[])
{
int rankId, comm_size;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
MPI_Comm_rank(MPI_COMM_WORLD, &rankId);
NumericTablePtr pData = loadData(rankId);
NumericTablePtr centroids = init(rankId, pData);
for(size_t it = 0; it < nIterations; it++)
centroids = compute(rankId, pData, centroids);
if(rankId == mpi_root)
printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10);
MPI_Finalize();
return 0;
}
NumericTablePtr init(int rankId, const NumericTablePtr& pData)
{
const bool isRoot = (rankId == mpi_root);
const size_t nVectorsInBlock = pData->getNumberOfRows();
kmeans::init::Distributed<step1Local, algorithmFPType, kmeans::init::randomDense> localInit(nClusters, nBlocks * nVectorsInBlock, rankId * nVectorsInBlock);
localInit.input.set(kmeans::init::data, pData);
localInit.compute();
InputDataArchive dataArch;
localInit.getPartialResult()->serialize(dataArch);
const int perNodeArchLength = (int)dataArch.getSizeOfArchive();
int aPerNodeArchLength[nBlocks];
MPI_Gather(&perNodeArchLength, sizeof(int), MPI_CHAR, isRoot ? &aPerNodeArchLength[0] : NULL,
sizeof(int), MPI_CHAR, mpi_root, MPI_COMM_WORLD);
ByteBuffer serializedData;
int totalArchLength = 0;
int displs[nBlocks];
if(isRoot)
{
for(size_t i = 0; i < nBlocks; ++i)
{
displs[i] = totalArchLength;
totalArchLength += aPerNodeArchLength[i];
}
serializedData.resize(totalArchLength);
}
ByteBuffer nodeResults(perNodeArchLength);
dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength);
MPI_Gatherv(&nodeResults[0], perNodeArchLength, MPI_CHAR, serializedData.size() ? &serializedData[0] : NULL,
aPerNodeArchLength, displs, MPI_CHAR, mpi_root, MPI_COMM_WORLD);
if(isRoot)
{
kmeans::init::Distributed<step2Master, algorithmFPType, kmeans::init::randomDense> masterInit(nClusters);
for(size_t i = 0, shift = 0; i < nBlocks; shift += aPerNodeArchLength[i], ++i)
{
OutputDataArchive dataArch(&serializedData[shift], aPerNodeArchLength[i]);
kmeans::init::PartialResultPtr dataForStep2FromStep1(new kmeans::init::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);
masterInit.input.add(kmeans::init::partialResults, dataForStep2FromStep1);
}
masterInit.compute();
masterInit.finalizeCompute();
return masterInit.getResult()->get(kmeans::init::centroids);
}
return NumericTablePtr();
}
NumericTablePtr compute(int rankId, const NumericTablePtr& pData, const NumericTablePtr& initialCentroids)
{
const bool isRoot = (rankId == mpi_root);
size_t CentroidsArchLength = 0;
InputDataArchive inputArch;
if(isRoot)
{
initialCentroids->serialize(inputArch);
CentroidsArchLength = inputArch.getSizeOfArchive();
}
MPI_Bcast(&CentroidsArchLength, sizeof(size_t), MPI_CHAR, mpi_root, MPI_COMM_WORLD);
ByteBuffer nodeCentroids(CentroidsArchLength);
if(isRoot)
inputArch.copyArchiveToArray(&nodeCentroids[0], CentroidsArchLength);
MPI_Bcast(&nodeCentroids[0], CentroidsArchLength, MPI_CHAR, mpi_root, MPI_COMM_WORLD);
OutputDataArchive outArch(nodeCentroids.size() ? &nodeCentroids[0] : NULL, CentroidsArchLength);
NumericTablePtr centroids(new HomogenNumericTable<>());
centroids->deserialize(outArch);
kmeans::Distributed<step1Local> localAlgorithm(nClusters);
localAlgorithm.input.set(kmeans::data, pData);
localAlgorithm.input.set(kmeans::inputCentroids, centroids);
localAlgorithm.compute();
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize(dataArch);
size_t perNodeArchLength = dataArch.getSizeOfArchive();
ByteBuffer serializedData;
if(isRoot)
serializedData.resize(perNodeArchLength * nBlocks);
ByteBuffer nodeResults(perNodeArchLength);
dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength);
MPI_Gather(&nodeResults[0], perNodeArchLength, MPI_CHAR, serializedData.size() ? &serializedData[0] : NULL,
perNodeArchLength, MPI_CHAR, mpi_root, MPI_COMM_WORLD);
if(isRoot)
{
kmeans::Distributed<step2Master> masterAlgorithm(nClusters);
for(size_t i = 0; i < nBlocks; i++)
{
OutputDataArchive dataArch(&serializedData[perNodeArchLength * i], perNodeArchLength);
kmeans::PartialResultPtr dataForStep2FromStep1(new kmeans::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);
masterAlgorithm.input.add(kmeans::partialResults, dataForStep2FromStep1);
}
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();
return masterAlgorithm.getResult()->get(kmeans::centroids);
}
return NumericTablePtr();
}