Python* API Reference for Intel® Data Analytics Acceleration Library 2019

impl_als_csr_distr.py

1 # file: impl_als_csr_distr.py
2 #===============================================================================
3 # Copyright 2014-2018 Intel Corporation.
4 #
5 # This software and the related documents are Intel copyrighted materials, and
6 # your use of them is governed by the express license under which they were
7 # provided to you (License). Unless the License provides otherwise, you may not
8 # use, modify, copy, publish, distribute, disclose or transmit this software or
9 # the related documents without Intel's prior written permission.
10 #
11 # This software and the related documents are provided as is, with no express
12 # or implied warranties, other than those that are expressly stated in the
13 # License.
14 #===============================================================================
15 
16 ## <a name="DAAL-EXAMPLE-PY-IMPLICIT_ALS_CSR_DISTRIBUTED"></a>
17 ## \example impl_als_csr_distr.py
18 
19 import os
20 import sys
21 
22 import numpy as np
23 
24 from daal import step1Local, step2Local, step2Master, step3Local, step4Local
25 import daal.algorithms.implicit_als.prediction.ratings as ratings
26 import daal.algorithms.implicit_als.training as training
27 import daal.algorithms.implicit_als.training.init as init
28 from daal.data_management import KeyValueDataCollection, HomogenNumericTable
29 
30 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
31 if utils_folder not in sys.path:
32  sys.path.insert(0, utils_folder)
33 from utils import createSparseTable, printALSRatings
34 
35 DAAL_PREFIX = os.path.join('..', 'data')
36 
37 # Input data set parameters
38 nBlocks = 4
39 
40 # Number of observations in transposed training data set blocks
41 trainDatasetFileNames = [
42  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_1.csv'),
43  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_2.csv'),
44  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_3.csv'),
45  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_4.csv')
46 ]
47 
48 usersPartition = [0] * 1
49 usersPartition[0] = nBlocks
50 
51 userOffsets = [0] * nBlocks
52 itemOffsets = [0] * nBlocks
53 
54 # Algorithm parameters
55 nUsers = 46 # Full number of users
56 nFactors = 2 # Number of factors
57 maxIterations = 5 # Number of iterations in the implicit ALS training algorithm
58 
59 dataTable = [0] * nBlocks
60 transposedDataTable = [0] * nBlocks
61 
62 predictedRatings = [[0] * nBlocks for x in range(nBlocks)]
63 
64 userStep3LocalInput = [0] * nBlocks
65 itemStep3LocalInput = [0] * nBlocks
66 
67 itemsPartialResultLocal = [0] * nBlocks
68 usersPartialResultLocal = [0] * nBlocks
69 
70 def readData(block):
71  global dataTable
72 
73  # Read trainDatasetFileName from a file and create a numeric table to store the input data
74  dataTable[block] = createSparseTable(trainDatasetFileNames[block])
75 
76 
77 def initializeStep1Local(block):
78  global itemsPartialResultLocal
79  global itemStep3LocalInput
80  global userOffsets
81 
82  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
83  initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
84  initAlgorithm.parameter.fullNUsers = nUsers
85  initAlgorithm.parameter.nFactors = nFactors
86  initAlgorithm.parameter.seed += block
87  usersPartitionArray = np.array(usersPartition, dtype=np.float64)
88  usersPartitionArray.shape = (1, 1)
89 
90  initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
91 
92  # Pass a training data set to the algorithm
93  initAlgorithm.input.set(init.data, dataTable[block])
94 
95  # Initialize the implicit ALS model
96  partialResult = initAlgorithm.compute()
97  itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
98  userOffsets[block] = partialResult.getCollection(init.offsets, block)
99  partialModelLocal = partialResult.getPartialModel(init.partialModel)
100 
101  itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
102  itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
103 
104  return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
105 
106 def initializeStep2Local(block, initStep2LocalInput):
107  global transposedDataTable
108  global userStep3LocalInput
109  global itemOffsets
110  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
111  initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
112 
113  initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
114 
115  # Initialize the implicit ALS model
116  partialResult = initAlgorithm.compute()
117 
118  transposedDataTable[block] = partialResult.getTable(init.transposedData)
119  userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
120  itemOffsets[block] = partialResult.getCollection(init.offsets, block)
121 
122 def initializeModel():
123  initStep1LocalResult = [0] * nBlocks
124 
125  for i in range(nBlocks):
126  initStep1LocalResult[i] = initializeStep1Local(i)
127 
128  initStep2LocalInput = [0] * nBlocks
129 
130  for i in range(nBlocks):
131  initStep2LocalInput[i] = KeyValueDataCollection()
132  for j in range(nBlocks):
133  initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
134 
135  for i in range(nBlocks):
136  initializeStep2Local(i, initStep2LocalInput[i])
137 
138 
139 def computeStep1Local(partialResultLocal):
140 
141  # Create an algorithm object to perform first step of the implicit ALS training algorithm on local-node data
142  algorithm = training.Distributed(step=step1Local)
143  algorithm.parameter.nFactors = nFactors
144 
145  # Set input objects for the algorithm
146  algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
147 
148  # Compute partial results of the first step on local nodes
149  # DistributedPartialResultStep1 class from training
150  return algorithm.compute()
151 
152 
153 def computeStep2Master(step1LocalResult):
154 
155  # Create an algorithm object to perform second step of the implicit ALS training algorithm
156  algorithm = training.Distributed(step=step2Master)
157  algorithm.parameter.nFactors = nFactors
158 
159  # Set the partial results of the first local step of distributed computations
160  # as input for the master-node algorithm
161  for i in range(nBlocks):
162  algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
163 
164  # Compute a partial result on the master node from the partial results on local nodes
165  # DistributedPartialResultStep2 class from training
166  res = algorithm.compute()
167  return res.get(training.outputOfStep2ForStep4)
168 
169 
170 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
171 
172  # Create an algorithm object to perform third step of the implicit ALS training algorithm on local-node data
173  algorithm = training.Distributed(step=step3Local)
174  algorithm.parameter.nFactors = nFactors
175 
176  # Set input objects for the algorithm
177  algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
178  algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
179  algorithm.input.setTable(training.offset, offsets)
180 
181  # Compute partial results of the third step on local nodes
182  # DistributedPartialResultStep3 class from training
183  res = algorithm.compute()
184  return res.get(training.outputOfStep3ForStep4)
185 
186 
187 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
188 
189  # Create an algorithm object to perform fourth step of the implicit ALS training algorithm on local-node data
190  algorithm = training.Distributed(step=step4Local)
191  algorithm.parameter.nFactors = nFactors
192 
193  # Set input objects for the algorithm
194  algorithm.input.setModels(training.partialModels, step4LocalInput)
195  algorithm.input.setTable(training.partialData, dataTable)
196  algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
197 
198  # Build the implicit ALS partial model on the local node
199  # DistributedPartialResultStep4 class from training
200  return algorithm.compute()
201 
202 
203 def trainModel():
204 
205  step1LocalResult = [0] * nBlocks
206  step3LocalResult = [0] * nBlocks
207  step4LocalInput = [0] * nBlocks
208 
209  for i in range(nBlocks):
210  step4LocalInput[i] = KeyValueDataCollection()
211 
212  for iteration in range(maxIterations):
213 
214  # Update partial users factors
215  for i in range(nBlocks):
216  step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
217 
218  step2MasterResult = computeStep2Master(step1LocalResult)
219 
220  for i in range(nBlocks):
221  step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
222 
223  # Prepare input objects for the fourth step of the distributed algorithm
224  for i in range(nBlocks):
225  for j in range(nBlocks):
226  step4LocalInput[i][j] = step3LocalResult[j][i]
227 
228  for i in range(nBlocks):
229  usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
230 
231  # Update partial items factors
232  for i in range(nBlocks):
233  step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
234 
235  step2MasterResult = computeStep2Master(step1LocalResult)
236 
237  for i in range(nBlocks):
238  step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
239 
240  # Prepare input objects for the fourth step of the distributed algorithm
241  for i in range(nBlocks):
242  for j in range(nBlocks):
243  step4LocalInput[i][j] = step3LocalResult[j][i]
244 
245  for i in range(nBlocks):
246  itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
247 
248 
249 def testModel(i, j):
250  # Create an algorithm object to predict ratings based in the implicit ALS partial models
251  algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
252  algorithm.parameter.nFactors = nFactors
253 
254  # Set input objects for the algorithm
255  algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
256  algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
257 
258  # Predict ratings and retrieve the algorithm results
259  algorithm.compute()
260 
261  # Result class from ratings
262  res = algorithm.finalizeCompute()
263  return res.get(ratings.prediction)
264 
265 
266 def printResults():
267 
268  for i in range(nBlocks):
269  for j in range(nBlocks):
270  print("Ratings for users block {}, items block {} :".format(i, j))
271  printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
272 
273 if __name__ == "__main__":
274  for i in range(nBlocks):
275  readData(i)
276 
277  initializeModel()
278 
279  trainModel()
280 
281  for i in range(nBlocks):
282  for j in range(nBlocks):
283  predictedRatings[i][j] = testModel(i, j)
284 
285  printResults()

For more complete information about compiler optimizations, see our Optimization Notice.