#include <mpi.h>
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
const size_t nBlocks = 4;
int rankId, comm_size;
#define mpi_root 0
const string datasetFileNames[] =
{
"./data/distributed/covcormoments_dense_1.csv",
"./data/distributed/covcormoments_dense_2.csv",
"./data/distributed/covcormoments_dense_3.csv",
"./data/distributed/covcormoments_dense_4.csv"
};
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 4, &datasetFileNames[0], &datasetFileNames[1], &datasetFileNames[2], &datasetFileNames[3]);
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
MPI_Comm_rank(MPI_COMM_WORLD, &rankId);
FileDataSource<CSVFeatureManager> dataSource(datasetFileNames[rankId], DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
dataSource.loadDataBlock();
covariance::Distributed<step1Local> localAlgorithm;
localAlgorithm.input.set(covariance::data, dataSource.getNumericTable());
localAlgorithm.compute();
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize( dataArch );
size_t perNodeArchLength = dataArch.getSizeOfArchive();
if (rankId == mpi_root)
{
serializedData = services::SharedPtr<byte>( new byte[ perNodeArchLength * nBlocks ] );
}
byte *nodeResults = new byte[ perNodeArchLength ];
dataArch.copyArchiveToArray( nodeResults, perNodeArchLength );
MPI_Gather( nodeResults, perNodeArchLength, MPI_CHAR, serializedData.get(), perNodeArchLength, MPI_CHAR, mpi_root,
MPI_COMM_WORLD);
delete[] nodeResults;
if(rankId == mpi_root)
{
covariance::Distributed<step2Master> masterAlgorithm;
for( size_t i = 0; i < nBlocks ; i++ )
{
OutputDataArchive dataArch( serializedData.get() + perNodeArchLength * i, perNodeArchLength );
covariance::PartialResultPtr dataForStep2FromStep1 =
covariance::PartialResultPtr( new covariance::PartialResult() );
dataForStep2FromStep1->deserialize(dataArch);
masterAlgorithm.input.add(covariance::partialResults, dataForStep2FromStep1 );
}
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();
covariance::ResultPtr result = masterAlgorithm.getResult();
printNumericTable(result->get(covariance::covariance), "Covariance matrix:");
printNumericTable(result->get(covariance::mean), "Mean vector:");
}
MPI_Finalize();
return 0;
}