Python* API Reference for Intel® Data Analytics Acceleration Library 2019 Update 5

impl_als_csr_distr.py

Deprecation Notice: With the introduction of daal4py, a package that supersedes PyDAAL, Intel is deprecating PyDAAL and will discontinue support starting with Intel® DAAL 2021 and Intel® Distribution for Python 2021. Until then Intel will continue to provide compatible pyDAAL pip and conda packages for newer releases of Intel DAAL and make it available in open source. However, Intel will not add the new features of Intel DAAL to pyDAAL. Intel recommends developers switch to and use daal4py.

Note: To find daal4py examples, refer to daal4py documentation or browse github repository.

1 # file: impl_als_csr_distr.py
2 #===============================================================================
3 # Copyright 2014-2019 Intel Corporation.
4 #
5 # This software and the related documents are Intel copyrighted materials, and
6 # your use of them is governed by the express license under which they were
7 # provided to you (License). Unless the License provides otherwise, you may not
8 # use, modify, copy, publish, distribute, disclose or transmit this software or
9 # the related documents without Intel's prior written permission.
10 #
11 # This software and the related documents are provided as is, with no express
12 # or implied warranties, other than those that are expressly stated in the
13 # License.
14 #===============================================================================
15 
16 ## <a name="DAAL-EXAMPLE-PY-IMPLICIT_ALS_CSR_DISTRIBUTED"></a>
17 ## \example impl_als_csr_distr.py
18 
19 import os
20 import sys
21 
22 import numpy as np
23 
24 from daal import step1Local, step2Local, step2Master, step3Local, step4Local
25 import daal.algorithms.implicit_als.prediction.ratings as ratings
26 import daal.algorithms.implicit_als.training as training
27 import daal.algorithms.implicit_als.training.init as init
28 from daal.data_management import KeyValueDataCollection, HomogenNumericTable
29 
30 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
31 if utils_folder not in sys.path:
32  sys.path.insert(0, utils_folder)
33 from utils import createSparseTable, printALSRatings
34 
35 DAAL_PREFIX = os.path.join('..', 'data')
36 
37 # Input data set parameters
38 nBlocks = 4
39 
40 # Number of observations in transposed training data set blocks
41 trainDatasetFileNames = [
42  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_1.csv'),
43  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_2.csv'),
44  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_3.csv'),
45  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_4.csv')
46 ]
47 
48 usersPartition = [0] * 1
49 usersPartition[0] = nBlocks
50 
51 userOffsets = [0] * nBlocks
52 itemOffsets = [0] * nBlocks
53 
54 # Algorithm parameters
55 nUsers = 46 # Full number of users
56 nFactors = 2 # Number of factors
57 maxIterations = 5 # Number of iterations in the implicit ALS training algorithm
58 
59 dataTable = [0] * nBlocks
60 transposedDataTable = [0] * nBlocks
61 
62 predictedRatings = [[0] * nBlocks for x in range(nBlocks)]
63 
64 userStep3LocalInput = [0] * nBlocks
65 itemStep3LocalInput = [0] * nBlocks
66 
67 itemsPartialResultLocal = [0] * nBlocks
68 usersPartialResultLocal = [0] * nBlocks
69 
70 def readData(block):
71  global dataTable
72 
73  # Read trainDatasetFileName from a file and create a numeric table to store the input data
74  dataTable[block] = createSparseTable(trainDatasetFileNames[block])
75 
76 
77 def initializeStep1Local(block):
78  global itemsPartialResultLocal
79  global itemStep3LocalInput
80  global userOffsets
81 
82  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
83  initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
84  initAlgorithm.parameter.fullNUsers = nUsers
85  initAlgorithm.parameter.nFactors = nFactors
86  initAlgorithm.parameter.seed += block
87  usersPartitionArray = np.array(usersPartition, dtype=np.float64)
88  usersPartitionArray.shape = (1, 1)
89 
90  initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
91 
92  # Pass a training data set to the algorithm
93  initAlgorithm.input.set(init.data, dataTable[block])
94 
95  # Initialize the implicit ALS model
96  partialResult = initAlgorithm.compute()
97  itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
98  userOffsets[block] = partialResult.getCollection(init.offsets, block)
99  partialModelLocal = partialResult.getPartialModel(init.partialModel)
100 
101  itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
102  itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
103 
104  return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
105 
106 def initializeStep2Local(block, initStep2LocalInput):
107  global transposedDataTable
108  global userStep3LocalInput
109  global itemOffsets
110  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
111  initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
112 
113  initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
114 
115  # Initialize the implicit ALS model
116  partialResult = initAlgorithm.compute()
117 
118  transposedDataTable[block] = partialResult.getTable(init.transposedData)
119  userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
120  itemOffsets[block] = partialResult.getCollection(init.offsets, block)
121 
122 def initializeModel():
123  initStep1LocalResult = [0] * nBlocks
124 
125  for i in range(nBlocks):
126  initStep1LocalResult[i] = initializeStep1Local(i)
127 
128  initStep2LocalInput = [0] * nBlocks
129 
130  for i in range(nBlocks):
131  initStep2LocalInput[i] = KeyValueDataCollection()
132  for j in range(nBlocks):
133  initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
134 
135  for i in range(nBlocks):
136  initializeStep2Local(i, initStep2LocalInput[i])
137 
138 
139 def computeStep1Local(partialResultLocal):
140 
141  # Create an algorithm object to perform first step of the implicit ALS training algorithm on local-node data
142  algorithm = training.Distributed(step=step1Local)
143  algorithm.parameter.nFactors = nFactors
144 
145  # Set input objects for the algorithm
146  algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
147 
148  # Compute partial results of the first step on local nodes
149  # DistributedPartialResultStep1 class from training
150  return algorithm.compute()
151 
152 
153 def computeStep2Master(step1LocalResult):
154 
155  # Create an algorithm object to perform second step of the implicit ALS training algorithm
156  algorithm = training.Distributed(step=step2Master)
157  algorithm.parameter.nFactors = nFactors
158 
159  # Set the partial results of the first local step of distributed computations
160  # as input for the master-node algorithm
161  for i in range(nBlocks):
162  algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
163 
164  # Compute a partial result on the master node from the partial results on local nodes
165  # DistributedPartialResultStep2 class from training
166  res = algorithm.compute()
167  return res.get(training.outputOfStep2ForStep4)
168 
169 
170 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
171 
172  # Create an algorithm object to perform third step of the implicit ALS training algorithm on local-node data
173  algorithm = training.Distributed(step=step3Local)
174  algorithm.parameter.nFactors = nFactors
175 
176  # Set input objects for the algorithm
177  algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
178  algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
179  algorithm.input.setTable(training.offset, offsets)
180 
181  # Compute partial results of the third step on local nodes
182  # DistributedPartialResultStep3 class from training
183  res = algorithm.compute()
184  return res.get(training.outputOfStep3ForStep4)
185 
186 
187 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
188 
189  # Create an algorithm object to perform fourth step of the implicit ALS training algorithm on local-node data
190  algorithm = training.Distributed(step=step4Local)
191  algorithm.parameter.nFactors = nFactors
192 
193  # Set input objects for the algorithm
194  algorithm.input.setModels(training.partialModels, step4LocalInput)
195  algorithm.input.setTable(training.partialData, dataTable)
196  algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
197 
198  # Build the implicit ALS partial model on the local node
199  # DistributedPartialResultStep4 class from training
200  return algorithm.compute()
201 
202 
203 def trainModel():
204 
205  step1LocalResult = [0] * nBlocks
206  step3LocalResult = [0] * nBlocks
207  step4LocalInput = [0] * nBlocks
208 
209  for i in range(nBlocks):
210  step4LocalInput[i] = KeyValueDataCollection()
211 
212  for iteration in range(maxIterations):
213 
214  # Update partial users factors
215  for i in range(nBlocks):
216  step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
217 
218  step2MasterResult = computeStep2Master(step1LocalResult)
219 
220  for i in range(nBlocks):
221  step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
222 
223  # Prepare input objects for the fourth step of the distributed algorithm
224  for i in range(nBlocks):
225  for j in range(nBlocks):
226  step4LocalInput[i][j] = step3LocalResult[j][i]
227 
228  for i in range(nBlocks):
229  usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
230 
231  # Update partial items factors
232  for i in range(nBlocks):
233  step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
234 
235  step2MasterResult = computeStep2Master(step1LocalResult)
236 
237  for i in range(nBlocks):
238  step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
239 
240  # Prepare input objects for the fourth step of the distributed algorithm
241  for i in range(nBlocks):
242  for j in range(nBlocks):
243  step4LocalInput[i][j] = step3LocalResult[j][i]
244 
245  for i in range(nBlocks):
246  itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
247 
248 
249 def testModel(i, j):
250  # Create an algorithm object to predict ratings based in the implicit ALS partial models
251  algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
252  algorithm.parameter.nFactors = nFactors
253 
254  # Set input objects for the algorithm
255  algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
256  algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
257 
258  # Predict ratings and retrieve the algorithm results
259  algorithm.compute()
260 
261  # Result class from ratings
262  res = algorithm.finalizeCompute()
263  return res.get(ratings.prediction)
264 
265 
266 def printResults():
267 
268  for i in range(nBlocks):
269  for j in range(nBlocks):
270  print("Ratings for users block {}, items block {} :".format(i, j))
271  printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
272 
273 if __name__ == "__main__":
274  for i in range(nBlocks):
275  readData(i)
276 
277  initializeModel()
278 
279  trainModel()
280 
281  for i in range(nBlocks):
282  for j in range(nBlocks):
283  predictedRatings[i][j] = testModel(i, j)
284 
285  printResults()

For more complete information about compiler optimizations, see our Optimization Notice.