24 from daal
import step1Local, step2Local, step2Master, step3Local, step4Local
25 import daal.algorithms.implicit_als.prediction.ratings
as ratings
26 import daal.algorithms.implicit_als.training
as training
27 import daal.algorithms.implicit_als.training.init
as init
28 from daal.data_management
import KeyValueDataCollection, HomogenNumericTable
30 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
31 if utils_folder
not in sys.path:
32 sys.path.insert(0, utils_folder)
33 from utils
import createSparseTable, printALSRatings
35 DAAL_PREFIX = os.path.join(
'..',
'data')
41 trainDatasetFileNames = [
42 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_1.csv'),
43 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_2.csv'),
44 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_3.csv'),
45 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_4.csv')
48 usersPartition = [0] * 1
49 usersPartition[0] = nBlocks
51 userOffsets = [0] * nBlocks
52 itemOffsets = [0] * nBlocks
59 dataTable = [0] * nBlocks
60 transposedDataTable = [0] * nBlocks
62 predictedRatings = [[0] * nBlocks
for x
in range(nBlocks)]
64 userStep3LocalInput = [0] * nBlocks
65 itemStep3LocalInput = [0] * nBlocks
67 itemsPartialResultLocal = [0] * nBlocks
68 usersPartialResultLocal = [0] * nBlocks
74 dataTable[block] = createSparseTable(trainDatasetFileNames[block])
77 def initializeStep1Local(block):
78 global itemsPartialResultLocal
79 global itemStep3LocalInput
83 initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
84 initAlgorithm.parameter.fullNUsers = nUsers
85 initAlgorithm.parameter.nFactors = nFactors
86 initAlgorithm.parameter.seed += block
87 usersPartitionArray = np.array(usersPartition, dtype=np.float64)
88 usersPartitionArray.shape = (1, 1)
90 initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
93 initAlgorithm.input.set(init.data, dataTable[block])
96 partialResult = initAlgorithm.compute()
97 itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
98 userOffsets[block] = partialResult.getCollection(init.offsets, block)
99 partialModelLocal = partialResult.getPartialModel(init.partialModel)
101 itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
102 itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
104 return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
106 def initializeStep2Local(block, initStep2LocalInput):
107 global transposedDataTable
108 global userStep3LocalInput
111 initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
113 initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
116 partialResult = initAlgorithm.compute()
118 transposedDataTable[block] = partialResult.getTable(init.transposedData)
119 userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
120 itemOffsets[block] = partialResult.getCollection(init.offsets, block)
122 def initializeModel():
123 initStep1LocalResult = [0] * nBlocks
125 for i
in range(nBlocks):
126 initStep1LocalResult[i] = initializeStep1Local(i)
128 initStep2LocalInput = [0] * nBlocks
130 for i
in range(nBlocks):
131 initStep2LocalInput[i] = KeyValueDataCollection()
132 for j
in range(nBlocks):
133 initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
135 for i
in range(nBlocks):
136 initializeStep2Local(i, initStep2LocalInput[i])
139 def computeStep1Local(partialResultLocal):
142 algorithm = training.Distributed(step=step1Local)
143 algorithm.parameter.nFactors = nFactors
146 algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
150 return algorithm.compute()
153 def computeStep2Master(step1LocalResult):
156 algorithm = training.Distributed(step=step2Master)
157 algorithm.parameter.nFactors = nFactors
161 for i
in range(nBlocks):
162 algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
166 res = algorithm.compute()
167 return res.get(training.outputOfStep2ForStep4)
170 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
173 algorithm = training.Distributed(step=step3Local)
174 algorithm.parameter.nFactors = nFactors
177 algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
178 algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
179 algorithm.input.setTable(training.offset, offsets)
183 res = algorithm.compute()
184 return res.get(training.outputOfStep3ForStep4)
187 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
190 algorithm = training.Distributed(step=step4Local)
191 algorithm.parameter.nFactors = nFactors
194 algorithm.input.setModels(training.partialModels, step4LocalInput)
195 algorithm.input.setTable(training.partialData, dataTable)
196 algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
200 return algorithm.compute()
205 step1LocalResult = [0] * nBlocks
206 step3LocalResult = [0] * nBlocks
207 step4LocalInput = [0] * nBlocks
209 for i
in range(nBlocks):
210 step4LocalInput[i] = KeyValueDataCollection()
212 for iteration
in range(maxIterations):
215 for i
in range(nBlocks):
216 step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
218 step2MasterResult = computeStep2Master(step1LocalResult)
220 for i
in range(nBlocks):
221 step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
224 for i
in range(nBlocks):
225 for j
in range(nBlocks):
226 step4LocalInput[i][j] = step3LocalResult[j][i]
228 for i
in range(nBlocks):
229 usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
232 for i
in range(nBlocks):
233 step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
235 step2MasterResult = computeStep2Master(step1LocalResult)
237 for i
in range(nBlocks):
238 step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
241 for i
in range(nBlocks):
242 for j
in range(nBlocks):
243 step4LocalInput[i][j] = step3LocalResult[j][i]
245 for i
in range(nBlocks):
246 itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
251 algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
252 algorithm.parameter.nFactors = nFactors
255 algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
256 algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
262 res = algorithm.finalizeCompute()
263 return res.get(ratings.prediction)
268 for i
in range(nBlocks):
269 for j
in range(nBlocks):
270 print(
"Ratings for users block {}, items block {} :".format(i, j))
271 printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
273 if __name__ ==
"__main__":
274 for i
in range(nBlocks):
281 for i
in range(nBlocks):
282 for j
in range(nBlocks):
283 predictedRatings[i][j] = testModel(i, j)