#include <mpi.h>
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms::ridge_regression;
const string trainDatasetFileNames[] =
{
"./data/distributed/linear_regression_train_1.csv",
"./data/distributed/linear_regression_train_2.csv",
"./data/distributed/linear_regression_train_3.csv",
"./data/distributed/linear_regression_train_4.csv"
};
string testDatasetFileName = "./data/distributed/linear_regression_test.csv";
const size_t nBlocks = 4;
const size_t nFeatures = 10;
const size_t nDependentVariables = 2;
int rankId, comm_size;
#define mpi_root 0
void trainModel();
void testModel();
training::ResultPtr trainingResult;
prediction::ResultPtr predictionResult;
int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
MPI_Comm_rank(MPI_COMM_WORLD, &rankId);
trainModel();
if (rankId == mpi_root)
{
testModel();
}
MPI_Finalize();
return 0;
}
void trainModel()
{
FileDataSource<CSVFeatureManager> trainDataSource(trainDatasetFileNames[rankId],
DataSource::notAllocateNumericTable,
DataSource::doDictionaryFromContext);
NumericTablePtr trainData(new HomogenNumericTable<>(nFeatures, 0, NumericTable::doNotAllocate));
NumericTablePtr trainDependentVariables(new HomogenNumericTable<>(nDependentVariables, 0, NumericTable::doNotAllocate));
NumericTablePtr mergedData(new MergedNumericTable(trainData, trainDependentVariables));
trainDataSource.loadDataBlock(mergedData.get());
training::Distributed<step1Local> localAlgorithm;
localAlgorithm.input.set(training::data, trainData);
localAlgorithm.input.set(training::dependentVariables, trainDependentVariables);
localAlgorithm.compute();
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize(dataArch);
size_t perNodeArchLength = dataArch.getSizeOfArchive();
if (rankId == mpi_root)
{
serializedData = services::SharedPtr<byte>(new byte[perNodeArchLength * nBlocks]);
}
byte *nodeResults = new byte[perNodeArchLength];
dataArch.copyArchiveToArray( nodeResults, perNodeArchLength );
MPI_Gather(nodeResults, perNodeArchLength, MPI_CHAR, serializedData.get(), perNodeArchLength, MPI_CHAR, mpi_root, MPI_COMM_WORLD);
delete[] nodeResults;
if (rankId == mpi_root)
{
training::Distributed<step2Master> masterAlgorithm;
for (size_t i = 0; i < nBlocks; ++i)
{
OutputDataArchive dataArch(serializedData.get() + perNodeArchLength * i, perNodeArchLength);
training::PartialResultPtr dataForStep2FromStep1 =
training::PartialResultPtr(new training::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);
masterAlgorithm.input.add(training::partialModels, dataForStep2FromStep1);
}
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();
trainingResult = masterAlgorithm.getResult();
printNumericTable(trainingResult->get(training::model)->getBeta(), "Ridge Regression coefficients:");
}
}
void testModel()
{
FileDataSource<CSVFeatureManager> testDataSource(testDatasetFileName, DataSource::doAllocateNumericTable, DataSource::doDictionaryFromContext);
NumericTablePtr testData(new HomogenNumericTable<>(nFeatures, 0, NumericTable::doNotAllocate));
NumericTablePtr testGroundTruth(new HomogenNumericTable<>(nDependentVariables, 0, NumericTable::doNotAllocate));
NumericTablePtr mergedData(new MergedNumericTable(testData, testGroundTruth));
testDataSource.loadDataBlock(mergedData.get());
prediction::Batch<> algorithm;
algorithm.input.set(prediction::data, testData);
algorithm.input.set(prediction::model, trainingResult->get(training::model));
algorithm.compute();
predictionResult = algorithm.getResult();
printNumericTable(predictionResult->get(prediction::prediction), "Ridge Regression prediction results: (first 10 rows):", 10);
printNumericTable(testGroundTruth, "Ground truth (first 10 rows):", 10);
}