49 from daal
import step1Local, step2Local, step2Master, step3Local, step4Local
50 import daal.algorithms.implicit_als.prediction.ratings
as ratings
51 import daal.algorithms.implicit_als.training
as training
52 import daal.algorithms.implicit_als.training.init
as init
53 from daal.data_management
import KeyValueDataCollection, HomogenNumericTable
55 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
56 if utils_folder
not in sys.path:
57 sys.path.insert(0, utils_folder)
58 from utils
import createSparseTable, printALSRatings
60 DAAL_PREFIX = os.path.join(
'..',
'data')
66 trainDatasetFileNames = [
67 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_1.csv'),
68 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_2.csv'),
69 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_3.csv'),
70 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_4.csv')
73 usersPartition = [0] * 1
74 usersPartition[0] = nBlocks
76 userOffsets = [0] * nBlocks
77 itemOffsets = [0] * nBlocks
84 dataTable = [0] * nBlocks
85 transposedDataTable = [0] * nBlocks
87 predictedRatings = [[0] * nBlocks
for x
in range(nBlocks)]
89 userStep3LocalInput = [0] * nBlocks
90 itemStep3LocalInput = [0] * nBlocks
92 itemsPartialResultLocal = [0] * nBlocks
93 usersPartialResultLocal = [0] * nBlocks
99 dataTable[block] = createSparseTable(trainDatasetFileNames[block])
102 def initializeStep1Local(block):
103 global itemsPartialResultLocal
104 global itemStep3LocalInput
108 initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
109 initAlgorithm.parameter.fullNUsers = nUsers
110 initAlgorithm.parameter.nFactors = nFactors
111 initAlgorithm.parameter.seed += block
112 usersPartitionArray = np.array(usersPartition, dtype=np.float64)
113 usersPartitionArray.shape = (1, 1)
115 initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
118 initAlgorithm.input.set(init.data, dataTable[block])
121 partialResult = initAlgorithm.compute()
122 itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
123 userOffsets[block] = partialResult.getCollection(init.offsets, block)
124 partialModelLocal = partialResult.getPartialModel(init.partialModel)
126 itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
127 itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
129 return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
131 def initializeStep2Local(block, initStep2LocalInput):
132 global transposedDataTable
133 global userStep3LocalInput
136 initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
138 initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
141 partialResult = initAlgorithm.compute()
143 transposedDataTable[block] = partialResult.getTable(init.transposedData)
144 userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
145 itemOffsets[block] = partialResult.getCollection(init.offsets, block)
147 def initializeModel():
148 initStep1LocalResult = [0] * nBlocks
150 for i
in range(nBlocks):
151 initStep1LocalResult[i] = initializeStep1Local(i)
153 initStep2LocalInput = [0] * nBlocks
155 for i
in range(nBlocks):
156 initStep2LocalInput[i] = KeyValueDataCollection()
157 for j
in range(nBlocks):
158 initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
160 for i
in range(nBlocks):
161 initializeStep2Local(i, initStep2LocalInput[i])
164 def computeStep1Local(partialResultLocal):
167 algorithm = training.Distributed(step=step1Local)
168 algorithm.parameter.nFactors = nFactors
171 algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
175 return algorithm.compute()
178 def computeStep2Master(step1LocalResult):
181 algorithm = training.Distributed(step=step2Master)
182 algorithm.parameter.nFactors = nFactors
186 for i
in range(nBlocks):
187 algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
191 res = algorithm.compute()
192 return res.get(training.outputOfStep2ForStep4)
195 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
198 algorithm = training.Distributed(step=step3Local)
199 algorithm.parameter.nFactors = nFactors
202 algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
203 algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
204 algorithm.input.setTable(training.offset, offsets)
208 res = algorithm.compute()
209 return res.get(training.outputOfStep3ForStep4)
212 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
215 algorithm = training.Distributed(step=step4Local)
216 algorithm.parameter.nFactors = nFactors
219 algorithm.input.setModels(training.partialModels, step4LocalInput)
220 algorithm.input.setTable(training.partialData, dataTable)
221 algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
225 return algorithm.compute()
230 step1LocalResult = [0] * nBlocks
231 step3LocalResult = [0] * nBlocks
232 step4LocalInput = [0] * nBlocks
234 for i
in range(nBlocks):
235 step4LocalInput[i] = KeyValueDataCollection()
237 for iteration
in range(maxIterations):
240 for i
in range(nBlocks):
241 step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
243 step2MasterResult = computeStep2Master(step1LocalResult)
245 for i
in range(nBlocks):
246 step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
249 for i
in range(nBlocks):
250 for j
in range(nBlocks):
251 step4LocalInput[i][j] = step3LocalResult[j][i]
253 for i
in range(nBlocks):
254 usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
257 for i
in range(nBlocks):
258 step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
260 step2MasterResult = computeStep2Master(step1LocalResult)
262 for i
in range(nBlocks):
263 step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
266 for i
in range(nBlocks):
267 for j
in range(nBlocks):
268 step4LocalInput[i][j] = step3LocalResult[j][i]
270 for i
in range(nBlocks):
271 itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
276 algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
277 algorithm.parameter.nFactors = nFactors
280 algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
281 algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
287 res = algorithm.finalizeCompute()
288 return res.get(ratings.prediction)
293 for i
in range(nBlocks):
294 for j
in range(nBlocks):
295 print(
"Ratings for users block {}, items block {} :".format(i, j))
296 printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
298 if __name__ ==
"__main__":
299 for i
in range(nBlocks):
306 for i
in range(nBlocks):
307 for j
in range(nBlocks):
308 predictedRatings[i][j] = testModel(i, j)