#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms::implicit_als;
const size_t nBlocks = 4;
const string trainDatasetFileNames[nBlocks] =
{
"../data/distributed/implicit_als_trans_csr_1.csv",
"../data/distributed/implicit_als_trans_csr_2.csv",
"../data/distributed/implicit_als_trans_csr_3.csv",
"../data/distributed/implicit_als_trans_csr_4.csv"
};
static int usersPartition[] = { nBlocks };
NumericTablePtr userOffsets[nBlocks];
NumericTablePtr itemOffsets[nBlocks];
typedef float algorithmFPType;
const size_t nUsers = 46;
const size_t nFactors = 2;
const size_t maxIterations = 5;
CSRNumericTablePtr dataTable[nBlocks];
CSRNumericTablePtr transposedDataTable[nBlocks];
NumericTablePtr predictedRatings[nBlocks][nBlocks];
KeyValueDataCollectionPtr userStep3LocalInput[nBlocks];
KeyValueDataCollectionPtr itemStep3LocalInput[nBlocks];
training::DistributedPartialResultStep4Ptr itemsPartialResultLocal[nBlocks];
training::DistributedPartialResultStep4Ptr usersPartialResultLocal[nBlocks];
void initializeModel();
void readData(size_t block);
void trainModel();
void testModel();
void printResults();
int main(int argc, char *argv[])
{
for (size_t i = 0; i < nBlocks; i++)
{
readData(i);
}
initializeModel();
trainModel();
testModel();
printResults();
return 0;
}
KeyValueDataCollectionPtr initializeStep1Local(size_t block)
{
training::init::Distributed<step1Local, algorithmFPType, training::init::fastCSR> initAlgorithm;
initAlgorithm.parameter.fullNUsers = nUsers;
initAlgorithm.parameter.nFactors = nFactors;
initAlgorithm.parameter.seed += block;
initAlgorithm.parameter.partition.reset(new HomogenNumericTable<int>((int *)usersPartition, 1, 1));
initAlgorithm.input.set(training::init::data, dataTable[block]);
initAlgorithm.compute();
training::init::PartialResultPtr partialResult = initAlgorithm.getPartialResult();
itemStep3LocalInput[block] = partialResult->get(training::init::outputOfInitForComputeStep3);
userOffsets[block] = partialResult->get(training::init::offsets, block);
PartialModelPtr partialModelLocal = partialResult->get(training::init::partialModel);
itemsPartialResultLocal[block].reset(new training::DistributedPartialResultStep4());
itemsPartialResultLocal[block]->set(training::outputOfStep4ForStep1, partialModelLocal);
return partialResult->get(training::init::outputOfStep1ForStep2);
}
void initializeStep2Local(size_t block, KeyValueDataCollectionPtr initStep2LocalInput)
{
training::init::Distributed<step2Local, algorithmFPType, training::init::fastCSR> initAlgorithm;
initAlgorithm.input.set(training::init::inputOfStep2FromStep1, initStep2LocalInput);
initAlgorithm.compute();
training::init::DistributedPartialResultStep2Ptr partialResult = initAlgorithm.getPartialResult();
transposedDataTable[block] = CSRNumericTable::cast(partialResult->get(training::init::transposedData));
userStep3LocalInput[block] = partialResult->get(training::init::outputOfInitForComputeStep3);
itemOffsets[block] = partialResult->get(training::init::offsets, block);
}
void initializeModel()
{
KeyValueDataCollectionPtr initStep1LocalResult[nBlocks];
for (size_t i = 0; i < nBlocks; i++)
{
initStep1LocalResult[i] = initializeStep1Local(i);
}
KeyValueDataCollectionPtr initStep2LocalInput[nBlocks];
for (size_t i = 0; i < nBlocks; i++)
{
initStep2LocalInput[i].reset(new KeyValueDataCollection());
for (size_t j = 0; j < nBlocks; j++)
{
(*initStep2LocalInput[i])[j] = (*initStep1LocalResult[j])[i];
}
}
for (size_t i = 0; i < nBlocks; i++)
{
initializeStep2Local(i, initStep2LocalInput[i]);
}
}
training::DistributedPartialResultStep1Ptr computeStep1Local(
const training::DistributedPartialResultStep4Ptr &partialResultLocal)
{
training::Distributed<step1Local> algorithm;
algorithm.parameter.nFactors = nFactors;
algorithm.input.set(training::partialModel, partialResultLocal->get(training::outputOfStep4ForStep1));
algorithm.compute();
return algorithm.getPartialResult();
}
NumericTablePtr computeStep2Master(const training::DistributedPartialResultStep1Ptr *step1LocalResult)
{
training::Distributed<step2Master> algorithm;
algorithm.parameter.nFactors = nFactors;
for (size_t i = 0; i < nBlocks; i++)
{
algorithm.input.add(training::inputOfStep2FromStep1, step1LocalResult[i]);
}
algorithm.compute();
return algorithm.getPartialResult()->get(training::outputOfStep2ForStep4);
}
KeyValueDataCollectionPtr computeStep3Local(const NumericTablePtr &offsetTable,
const training::DistributedPartialResultStep4Ptr &partialResultLocal,
const KeyValueDataCollectionPtr &step3LocalInput)
{
training::Distributed<step3Local> algorithm;
algorithm.parameter.nFactors = nFactors;
algorithm.input.set(training::partialModel, partialResultLocal->get(training::outputOfStep4ForStep3));
algorithm.input.set(training::inputOfStep3FromInit, step3LocalInput);
algorithm.input.set(training::offset, offsetTable);
algorithm.compute();
return algorithm.getPartialResult()->get(training::outputOfStep3ForStep4);
}
training::DistributedPartialResultStep4Ptr computeStep4Local(
const CSRNumericTablePtr &dataTable,
const NumericTablePtr &step2MasterResult,
const KeyValueDataCollectionPtr &step4LocalInput)
{
training::Distributed<step4Local> algorithm;
algorithm.parameter.nFactors = nFactors;
algorithm.input.set(training::partialModels, step4LocalInput);
algorithm.input.set(training::partialData, dataTable);
algorithm.input.set(training::inputOfStep4FromStep2, step2MasterResult);
algorithm.compute();
return algorithm.getPartialResult();
}
void trainModel()
{
training::DistributedPartialResultStep1Ptr step1LocalResult[nBlocks];
NumericTablePtr step2MasterResult;
KeyValueDataCollectionPtr step3LocalResult[nBlocks];
KeyValueDataCollectionPtr step4LocalInput[nBlocks];
for (size_t i = 0; i < nBlocks; i++)
{
step4LocalInput[i].reset(new KeyValueDataCollection());
}
for (size_t iteration = 0; iteration < maxIterations; iteration++)
{
for (size_t i = 0; i < nBlocks; i++)
{
step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i]);
}
step2MasterResult = computeStep2Master(step1LocalResult);
for (size_t i = 0; i < nBlocks; i++)
{
step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i]);
}
for (size_t i = 0; i < nBlocks; i++)
{
for (size_t j = 0; j < nBlocks; j++)
{
(*step4LocalInput[i])[j] = (*step3LocalResult[j])[i];
}
}
for (size_t i = 0; i < nBlocks; i++)
{
usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i]);
}
for (size_t i = 0; i < nBlocks; i++)
{
step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i]);
}
step2MasterResult = computeStep2Master(step1LocalResult);
for (size_t i = 0; i < nBlocks; i++)
{
step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i]);
}
for (size_t i = 0; i < nBlocks; i++)
{
for (size_t j = 0; j < nBlocks; j++)
{
(*step4LocalInput[i])[j] = (*step3LocalResult[j])[i];
}
}
for (size_t i = 0; i < nBlocks; i++)
{
itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i]);
}
}
}
void testModel()
{
for (size_t i = 0; i < nBlocks; i++)
{
for (size_t j = 0; j < nBlocks; j++)
{
prediction::ratings::Distributed<step1Local> algorithm;
algorithm.parameter.nFactors = nFactors;
algorithm.input.set(prediction::ratings::usersPartialModel, usersPartialResultLocal[i]->get(training::outputOfStep4));
algorithm.input.set(prediction::ratings::itemsPartialModel, itemsPartialResultLocal[j]->get(training::outputOfStep4));
algorithm.compute();
predictedRatings[i][j] = algorithm.getResult()->get(prediction::ratings::prediction);
}
}
}
void readData(size_t block)
{
dataTable[block] = CSRNumericTablePtr(createSparseTable<float>(trainDatasetFileNames[block]));
}
void printResults()
{
for (size_t i = 0; i < nBlocks; i++)
{
for (size_t j = 0; j < nBlocks; j++)
{
cout << "Ratings for users block " << i << ", items block " << j << " :" << endl;
printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j]);
}
}
}