#include <mpi.h>
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
using namespace daal::algorithms::multinomial_naive_bayes;
const string trainDatasetFileNames[4] =
{
"./data/distributed/naivebayes_train_dense.csv", "./data/distributed/naivebayes_train_dense.csv",
"./data/distributed/naivebayes_train_dense.csv", "./data/distributed/naivebayes_train_dense.csv"
};
const string trainGroundTruthFileNames[4] =
{
"./data/distributed/naivebayes_train_labels.csv", "./data/distributed/naivebayes_train_labels.csv",
"./data/distributed/naivebayes_train_labels.csv", "./data/distributed/naivebayes_train_labels.csv"
};
string testDatasetFileName = "./data/distributed/naivebayes_test_dense.csv";
string testGroundTruthFileName = "./data/distributed/naivebayes_test_labels.csv";
const size_t nClasses = 20;
const size_t nBlocks = 4;
int rankId, comm_size;
#define mpi_root 0
void trainModel();
void testModel();
void printResults();
training::ResultPtr trainingResult;
classifier::prediction::ResultPtr predictionResult;
int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
MPI_Comm_rank(MPI_COMM_WORLD, &rankId);
trainModel();
if(rankId == mpi_root)
{
testModel();
printResults();
}
MPI_Finalize();
return 0;
}
void trainModel()
{
FileDataSource<CSVFeatureManager> trainDataSource(trainDatasetFileNames[rankId], DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
FileDataSource<CSVFeatureManager> trainLabelsSource(trainGroundTruthFileNames[rankId], DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
trainDataSource.loadDataBlock();
trainLabelsSource.loadDataBlock();
training::Distributed<step1Local> localAlgorithm(nClasses);
localAlgorithm.input.set(classifier::training::data, trainDataSource.getNumericTable());
localAlgorithm.input.set(classifier::training::labels, trainLabelsSource.getNumericTable());
localAlgorithm.compute();
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize(dataArch);
size_t perNodeArchLength = dataArch.getSizeOfArchive();
if (rankId == mpi_root)
{
serializedData.reset(new byte[perNodeArchLength * nBlocks]);
}
{
services::SharedPtr<byte> nodeResults(new byte[perNodeArchLength]);
dataArch.copyArchiveToArray(nodeResults.get(), perNodeArchLength );
MPI_Gather(nodeResults.get(), perNodeArchLength, MPI_CHAR, serializedData.get(), perNodeArchLength, MPI_CHAR, mpi_root,
MPI_COMM_WORLD);
}
if(rankId == mpi_root)
{
training::Distributed<step2Master> masterAlgorithm(nClasses);
for(size_t i = 0; i < nBlocks ; i++)
{
OutputDataArchive dataArch(serializedData.get() + perNodeArchLength * i, perNodeArchLength);
training::PartialResultPtr dataForStep2FromStep1(new training::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);
masterAlgorithm.input.add(training::partialModels, dataForStep2FromStep1);
}
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();
trainingResult = masterAlgorithm.getResult();
}
}
void testModel()
{
FileDataSource<CSVFeatureManager> testDataSource(testDatasetFileName, DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
testDataSource.loadDataBlock();
prediction::Batch<> algorithm(nClasses);
algorithm.input.set(classifier::prediction::data, testDataSource.getNumericTable());
algorithm.input.set(classifier::prediction::model, trainingResult->get(classifier::training::model));
algorithm.compute();
predictionResult = algorithm.getResult();
}
void printResults()
{
FileDataSource<CSVFeatureManager> testGroundTruth(testGroundTruthFileName, DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
testGroundTruth.loadDataBlock();
printNumericTables<int, int>(testGroundTruth.getNumericTable().get(),
predictionResult->get(classifier::prediction::prediction).get(),
"Ground truth", "Classification results",
"NaiveBayes classification results (first 20 observations):", 20);
}