#include <mpi.h>
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
const size_t nBlocks = 4;
const string datasetFileNames[] =
{
"./data/distributed/svd_1.csv",
"./data/distributed/svd_2.csv",
"./data/distributed/svd_3.csv",
"./data/distributed/svd_4.csv"
};
void computestep1Local();
void computeOnMasterNode();
void finalizeComputestep1Local();
int rankId;
int commSize;
#define mpiRoot 0
data_management::DataCollectionPtr dataFromStep1ForStep3;
NumericTablePtr Sigma;
NumericTablePtr V ;
NumericTablePtr Ui ;
services::SharedPtr<byte> serializedData;
size_t perNodeArchLength;
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 4, &datasetFileNames[0], &datasetFileNames[1], &datasetFileNames[2], &datasetFileNames[3]);
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &commSize);
MPI_Comm_rank(MPI_COMM_WORLD, &rankId);
if(nBlocks != commSize)
{
if(rankId == mpiRoot)
{
printf("%d MPI ranks != %lu datasets available, so please start exactly %lu ranks.\n", commSize, nBlocks, nBlocks);
}
MPI_Finalize();
return 0;
}
computestep1Local();
if (rankId == mpiRoot)
{
computeOnMasterNode();
}
finalizeComputestep1Local();
if (rankId == mpiRoot)
{
printNumericTable(Sigma, "Singular values:");
printNumericTable(V, "Right orthogonal matrix V:");
printNumericTable(Ui , "Part of left orthogonal matrix U from root node:", 10);
}
MPI_Finalize();
return 0;
}
void computestep1Local()
{
FileDataSource<CSVFeatureManager> dataSource(datasetFileNames[rankId], DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
dataSource.loadDataBlock();
svd::Distributed<step1Local> alg;
alg.input.set( svd::data, dataSource.getNumericTable() );
alg.compute();
data_management::DataCollectionPtr dataFromStep1ForStep2;
dataFromStep1ForStep2 = alg.getPartialResult()->get( svd::outputOfStep1ForStep2 );
dataFromStep1ForStep3 = alg.getPartialResult()->get( svd::outputOfStep1ForStep3 );
InputDataArchive dataArch;
dataFromStep1ForStep2->serialize( dataArch );
perNodeArchLength = dataArch.getSizeOfArchive();
if (rankId == mpiRoot)
{
serializedData = services::SharedPtr<byte>( new byte[ perNodeArchLength * nBlocks ] );
}
byte *nodeResults = new byte[ perNodeArchLength ];
dataArch.copyArchiveToArray( nodeResults, perNodeArchLength );
MPI_Gather( nodeResults, perNodeArchLength, MPI_CHAR, serializedData.get(), perNodeArchLength, MPI_CHAR, mpiRoot,
MPI_COMM_WORLD);
delete[] nodeResults;
}
void computeOnMasterNode()
{
svd::Distributed<step2Master> alg;
for (size_t i = 0; i < nBlocks; i++)
{
OutputDataArchive dataArch( serializedData.get() + perNodeArchLength * i, perNodeArchLength );
data_management::DataCollectionPtr dataForStep2FromStep1 =
data_management::DataCollectionPtr( new data_management::DataCollection() );
dataForStep2FromStep1->deserialize(dataArch);
alg.input.add( svd::inputOfStep2FromStep1, i, dataForStep2FromStep1 );
}
alg.compute();
svd::DistributedPartialResultPtr pres = alg.getPartialResult();
KeyValueDataCollectionPtr inputForStep3FromStep2 = pres->get( svd::outputOfStep2ForStep3 );
for (size_t i = 0; i < nBlocks; i++)
{
InputDataArchive dataArch;
(*inputForStep3FromStep2)[i]->serialize( dataArch );
if( i == 0 )
{
perNodeArchLength = dataArch.getSizeOfArchive();
serializedData = services::SharedPtr<byte>( new byte[ perNodeArchLength * nBlocks ] );
}
dataArch.copyArchiveToArray( serializedData.get() + perNodeArchLength * i, perNodeArchLength );
}
svd::ResultPtr res = alg.getResult();
Sigma = res->get(svd::singularValues );
V = res->get(svd::rightSingularMatrix);
}
void finalizeComputestep1Local()
{
MPI_Bcast( &perNodeArchLength, sizeof(size_t), MPI_CHAR, mpiRoot, MPI_COMM_WORLD );
byte *nodeResults = new byte[ perNodeArchLength ];
MPI_Scatter(serializedData.get(), perNodeArchLength, MPI_CHAR, nodeResults, perNodeArchLength, MPI_CHAR, mpiRoot,
MPI_COMM_WORLD);
OutputDataArchive dataArch( nodeResults, perNodeArchLength );
data_management::DataCollectionPtr dataFromStep2ForStep3 =
data_management::DataCollectionPtr( new data_management::DataCollection() );
dataFromStep2ForStep3->deserialize(dataArch);
delete[] nodeResults;
svd::Distributed<step3Local> alg;
alg.input.set( svd::inputOfStep3FromStep1, dataFromStep1ForStep3 );
alg.input.set( svd::inputOfStep3FromStep2, dataFromStep2ForStep3 );
alg.compute();
alg.finalizeCompute();
svd::ResultPtr res = alg.getResult();
Ui = res->get(svd::leftSingularMatrix);
}