#include <mpi.h>
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
const size_t nBlocks = 4;
typedef float algorithmFPType;
int rankId, comm_size;
#define mpi_root 0
const string datasetFileNames[] =
{
"./data/distributed/covcormoments_csr_1.csv",
"./data/distributed/covcormoments_csr_2.csv",
"./data/distributed/covcormoments_csr_3.csv",
"./data/distributed/covcormoments_csr_4.csv"
};
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 4, &datasetFileNames[0], &datasetFileNames[1], &datasetFileNames[2], &datasetFileNames[3]);
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
MPI_Comm_rank(MPI_COMM_WORLD, &rankId);
CSRNumericTable *dataTable = createSparseTable<float>(datasetFileNames[rankId]);
low_order_moments::Distributed<step1Local, algorithmFPType, low_order_moments::fastCSR> localAlgorithm;
localAlgorithm.input.set(low_order_moments::data, CSRNumericTablePtr(dataTable));
localAlgorithm.compute();
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize( dataArch );
size_t perNodeArchLength = dataArch.getSizeOfArchive();
if (rankId == mpi_root)
{
serializedData = services::SharedPtr<byte>( new byte[ perNodeArchLength * nBlocks ] );
}
byte *nodeResults = new byte[ perNodeArchLength ];
dataArch.copyArchiveToArray( nodeResults, perNodeArchLength );
MPI_Gather( nodeResults, perNodeArchLength, MPI_CHAR, serializedData.get(), perNodeArchLength, MPI_CHAR, mpi_root,
MPI_COMM_WORLD);
delete[] nodeResults;
if(rankId == mpi_root)
{
low_order_moments::Distributed<step2Master, algorithmFPType, low_order_moments::fastCSR> masterAlgorithm;
for( size_t i = 0; i < nBlocks ; i++ )
{
OutputDataArchive dataArch( serializedData.get() + perNodeArchLength * i, perNodeArchLength );
low_order_moments::PartialResultPtr dataForStep2FromStep1 = low_order_moments::PartialResultPtr(
new low_order_moments::PartialResult() );
dataForStep2FromStep1->deserialize(dataArch);
masterAlgorithm.input.add(low_order_moments::partialResults, dataForStep2FromStep1 );
}
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();
low_order_moments::ResultPtr res = masterAlgorithm.getResult();
printNumericTable(res->get(low_order_moments::minimum), "Minimum:");
printNumericTable(res->get(low_order_moments::maximum), "Maximum:");
printNumericTable(res->get(low_order_moments::sum), "Sum:");
printNumericTable(res->get(low_order_moments::sumSquares), "Sum of squares:");
printNumericTable(res->get(low_order_moments::sumSquaresCentered), "Sum of squared difference from the means:");
printNumericTable(res->get(low_order_moments::mean), "Mean:");
printNumericTable(res->get(low_order_moments::secondOrderRawMoment), "Second order raw moment:");
printNumericTable(res->get(low_order_moments::variance), "Variance:");
printNumericTable(res->get(low_order_moments::standardDeviation), "Standard deviation:");
printNumericTable(res->get(low_order_moments::variation), "Variation:");
}
MPI_Finalize();
return 0;
}