50 from daal
import step1Local, step2Local, step2Master, step3Local, step4Local
51 import daal.algorithms.implicit_als.prediction.ratings
as ratings
52 import daal.algorithms.implicit_als.training
as training
53 import daal.algorithms.implicit_als.training.init
as init
54 from daal.data_management
import KeyValueDataCollection, HomogenNumericTable
56 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
57 if utils_folder
not in sys.path:
58 sys.path.insert(0, utils_folder)
59 from utils
import createSparseTable, printALSRatings
61 DAAL_PREFIX = os.path.join(
'..',
'data')
67 trainDatasetFileNames = [
68 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_1.csv'),
69 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_2.csv'),
70 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_3.csv'),
71 os.path.join(DAAL_PREFIX,
'distributed',
'implicit_als_trans_csr_4.csv')
74 usersPartition = [0] * 1
75 usersPartition[0] = nBlocks
77 userOffsets = [0] * nBlocks
78 itemOffsets = [0] * nBlocks
85 dataTable = [0] * nBlocks
86 transposedDataTable = [0] * nBlocks
88 predictedRatings = [[0] * nBlocks
for x
in range(nBlocks)]
90 userStep3LocalInput = [0] * nBlocks
91 itemStep3LocalInput = [0] * nBlocks
93 itemsPartialResultLocal = [0] * nBlocks
94 usersPartialResultLocal = [0] * nBlocks
100 dataTable[block] = createSparseTable(trainDatasetFileNames[block])
103 def initializeStep1Local(block):
104 global itemsPartialResultLocal
105 global itemStep3LocalInput
109 initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
110 initAlgorithm.parameter.fullNUsers = nUsers
111 initAlgorithm.parameter.nFactors = nFactors
112 initAlgorithm.parameter.seed += block
113 usersPartitionArray = np.array(usersPartition, dtype=np.float64)
114 usersPartitionArray.shape = (1, 1)
116 initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
119 initAlgorithm.input.set(init.data, dataTable[block])
122 partialResult = initAlgorithm.compute()
123 itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
124 userOffsets[block] = partialResult.getCollection(init.offsets, block)
125 partialModelLocal = partialResult.getPartialModel(init.partialModel)
127 itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
128 itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
130 return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
132 def initializeStep2Local(block, initStep2LocalInput):
133 global transposedDataTable
134 global userStep3LocalInput
137 initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
139 initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
142 partialResult = initAlgorithm.compute()
144 transposedDataTable[block] = partialResult.getTable(init.transposedData)
145 userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
146 itemOffsets[block] = partialResult.getCollection(init.offsets, block)
148 def initializeModel():
149 initStep1LocalResult = [0] * nBlocks
151 for i
in range(nBlocks):
152 initStep1LocalResult[i] = initializeStep1Local(i)
154 initStep2LocalInput = [0] * nBlocks
156 for i
in range(nBlocks):
157 initStep2LocalInput[i] = KeyValueDataCollection()
158 for j
in range(nBlocks):
159 initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
161 for i
in range(nBlocks):
162 initializeStep2Local(i, initStep2LocalInput[i])
165 def computeStep1Local(partialResultLocal):
168 algorithm = training.Distributed(step=step1Local)
169 algorithm.parameter.nFactors = nFactors
172 algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
176 return algorithm.compute()
179 def computeStep2Master(step1LocalResult):
182 algorithm = training.Distributed(step=step2Master)
183 algorithm.parameter.nFactors = nFactors
187 for i
in range(nBlocks):
188 algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
192 res = algorithm.compute()
193 return res.get(training.outputOfStep2ForStep4)
196 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
199 algorithm = training.Distributed(step=step3Local)
200 algorithm.parameter.nFactors = nFactors
203 algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
204 algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
205 algorithm.input.setTable(training.offset, offsets)
209 res = algorithm.compute()
210 return res.get(training.outputOfStep3ForStep4)
213 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
216 algorithm = training.Distributed(step=step4Local)
217 algorithm.parameter.nFactors = nFactors
220 algorithm.input.setModels(training.partialModels, step4LocalInput)
221 algorithm.input.setTable(training.partialData, dataTable)
222 algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
226 return algorithm.compute()
231 step1LocalResult = [0] * nBlocks
232 step3LocalResult = [0] * nBlocks
233 step4LocalInput = [0] * nBlocks
235 for i
in range(nBlocks):
236 step4LocalInput[i] = KeyValueDataCollection()
238 for iteration
in range(maxIterations):
241 for i
in range(nBlocks):
242 step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
244 step2MasterResult = computeStep2Master(step1LocalResult)
246 for i
in range(nBlocks):
247 step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
250 for i
in range(nBlocks):
251 for j
in range(nBlocks):
252 step4LocalInput[i][j] = step3LocalResult[j][i]
254 for i
in range(nBlocks):
255 usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
258 for i
in range(nBlocks):
259 step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
261 step2MasterResult = computeStep2Master(step1LocalResult)
263 for i
in range(nBlocks):
264 step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
267 for i
in range(nBlocks):
268 for j
in range(nBlocks):
269 step4LocalInput[i][j] = step3LocalResult[j][i]
271 for i
in range(nBlocks):
272 itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
277 algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
278 algorithm.parameter.nFactors = nFactors
281 algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
282 algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
288 res = algorithm.finalizeCompute()
289 return res.get(ratings.prediction)
294 for i
in range(nBlocks):
295 for j
in range(nBlocks):
296 print(
"Ratings for users block {}, items block {} :".format(i, j))
297 printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
299 if __name__ ==
"__main__":
300 for i
in range(nBlocks):
307 for i
in range(nBlocks):
308 for j
in range(nBlocks):
309 predictedRatings[i][j] = testModel(i, j)