#include <mpi.h>
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
using namespace daal::algorithms::multinomial_naive_bayes;
typedef float algorithmFPType;
const string trainDatasetFileNames[4] =
{
"./data/distributed/naivebayes_train_csr.csv", "./data/distributed/naivebayes_train_csr.csv",
"./data/distributed/naivebayes_train_csr.csv", "./data/distributed/naivebayes_train_csr.csv"
};
const string trainGroundTruthFileNames[4] =
{
"./data/distributed/naivebayes_train_labels.csv", "./data/distributed/naivebayes_train_labels.csv",
"./data/distributed/naivebayes_train_labels.csv", "./data/distributed/naivebayes_train_labels.csv"
};
string testDatasetFileName = "./data/distributed/naivebayes_test_csr.csv";
string testGroundTruthFileName = "./data/distributed/naivebayes_test_labels.csv";
const size_t nClasses = 20;
const size_t nBlocks = 4;
int rankId, comm_size;
#define mpi_root 0
void trainModel();
void testModel();
void printResults();
training::ResultPtr trainingResult;
classifier::prediction::ResultPtr predictionResult;
int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
MPI_Comm_rank(MPI_COMM_WORLD, &rankId);
trainModel();
if(rankId == mpi_root)
{
testModel();
printResults();
}
MPI_Finalize();
return 0;
}
void trainModel()
{
CSRNumericTable *trainDataTable = createSparseTable<float>(trainDatasetFileNames[rankId]);
FileDataSource<CSVFeatureManager> trainLabelsSource(trainGroundTruthFileNames[rankId], DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
trainLabelsSource.loadDataBlock();
training::Distributed<step1Local, algorithmFPType, training::fastCSR> localAlgorithm(nClasses);
localAlgorithm.input.set(classifier::training::data, CSRNumericTablePtr(trainDataTable));
localAlgorithm.input.set(classifier::training::labels, trainLabelsSource.getNumericTable());
localAlgorithm.compute();
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize(dataArch);
size_t perNodeArchLength = dataArch.getSizeOfArchive();
if (rankId == mpi_root)
{
serializedData.reset(new byte[perNodeArchLength * nBlocks]);
}
{
services::SharedPtr<byte> nodeResults(new byte[perNodeArchLength]);
dataArch.copyArchiveToArray(nodeResults.get(), perNodeArchLength );
MPI_Gather(nodeResults.get(), perNodeArchLength, MPI_CHAR, serializedData.get(), perNodeArchLength, MPI_CHAR, mpi_root,
MPI_COMM_WORLD);
}
if(rankId == mpi_root)
{
training::Distributed<step2Master, algorithmFPType, training::fastCSR> masterAlgorithm(nClasses);
for(size_t i = 0; i < nBlocks ; i++)
{
OutputDataArchive dataArch(serializedData.get() + perNodeArchLength * i, perNodeArchLength);
training::PartialResultPtr dataForStep2FromStep1(new training::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);
masterAlgorithm.input.add(training::partialModels, dataForStep2FromStep1);
}
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();
trainingResult = masterAlgorithm.getResult();
}
}
void testModel()
{
CSRNumericTable *testDataTable = createSparseTable<float>(testDatasetFileName);
prediction::Batch<algorithmFPType, prediction::fastCSR> algorithm(nClasses);
algorithm.input.set(classifier::prediction::data, CSRNumericTablePtr(testDataTable));
algorithm.input.set(classifier::prediction::model, trainingResult->get(classifier::training::model));
algorithm.compute();
predictionResult = algorithm.getResult();
}
void printResults()
{
FileDataSource<CSVFeatureManager> testGroundTruth(testGroundTruthFileName, DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
testGroundTruth.loadDataBlock();
printNumericTables<int, int>(testGroundTruth.getNumericTable().get(),
predictionResult->get(classifier::prediction::prediction).get(),
"Ground truth", "Classification results",
"NaiveBayes classification results (first 20 observations):", 20);
}