Python* API Reference for Intel® Data Analytics Acceleration Library 2018 Update 1

impl_als_csr_distr.py

1 #===============================================================================
2 # Copyright 2014-2017 Intel Corporation
3 # All Rights Reserved.
4 #
5 # If this software was obtained under the Intel Simplified Software License,
6 # the following terms apply:
7 #
8 # The source code, information and material ("Material") contained herein is
9 # owned by Intel Corporation or its suppliers or licensors, and title to such
10 # Material remains with Intel Corporation or its suppliers or licensors. The
11 # Material contains proprietary information of Intel or its suppliers and
12 # licensors. The Material is protected by worldwide copyright laws and treaty
13 # provisions. No part of the Material may be used, copied, reproduced,
14 # modified, published, uploaded, posted, transmitted, distributed or disclosed
15 # in any way without Intel's prior express written permission. No license under
16 # any patent, copyright or other intellectual property rights in the Material
17 # is granted to or conferred upon you, either expressly, by implication,
18 # inducement, estoppel or otherwise. Any license under such intellectual
19 # property rights must be express and approved by Intel in writing.
20 #
21 # Unless otherwise agreed by Intel in writing, you may not remove or alter this
22 # notice or any other notice embedded in Materials by Intel or Intel's
23 # suppliers or licensors in any way.
24 #
25 #
26 # If this software was obtained under the Apache License, Version 2.0 (the
27 # "License"), the following terms apply:
28 #
29 # You may not use this file except in compliance with the License. You may
30 # obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
31 #
32 #
33 # Unless required by applicable law or agreed to in writing, software
34 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
35 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
36 #
37 # See the License for the specific language governing permissions and
38 # limitations under the License.
39 #===============================================================================
40 
41 ## <a name="DAAL-EXAMPLE-PY-IMPLICIT_ALS_CSR_DISTRIBUTED"></a>
42 ## \example impl_als_csr_distr.py
43 
44 import os
45 import sys
46 
47 import numpy as np
48 
49 from daal import step1Local, step2Local, step2Master, step3Local, step4Local
50 import daal.algorithms.implicit_als.prediction.ratings as ratings
51 import daal.algorithms.implicit_als.training as training
52 import daal.algorithms.implicit_als.training.init as init
53 from daal.data_management import KeyValueDataCollection, HomogenNumericTable
54 
55 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
56 if utils_folder not in sys.path:
57  sys.path.insert(0, utils_folder)
58 from utils import createSparseTable, printALSRatings
59 
60 DAAL_PREFIX = os.path.join('..', 'data')
61 
62 # Input data set parameters
63 nBlocks = 4
64 
65 # Number of observations in transposed training data set blocks
66 trainDatasetFileNames = [
67  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_1.csv'),
68  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_2.csv'),
69  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_3.csv'),
70  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_4.csv')
71 ]
72 
73 usersPartition = [0] * 1
74 usersPartition[0] = nBlocks
75 
76 userOffsets = [0] * nBlocks
77 itemOffsets = [0] * nBlocks
78 
79 # Algorithm parameters
80 nUsers = 46 # Full number of users
81 nFactors = 2 # Number of factors
82 maxIterations = 5 # Number of iterations in the implicit ALS training algorithm
83 
84 dataTable = [0] * nBlocks
85 transposedDataTable = [0] * nBlocks
86 
87 predictedRatings = [[0] * nBlocks for x in range(nBlocks)]
88 
89 userStep3LocalInput = [0] * nBlocks
90 itemStep3LocalInput = [0] * nBlocks
91 
92 itemsPartialResultLocal = [0] * nBlocks
93 usersPartialResultLocal = [0] * nBlocks
94 
95 def readData(block):
96  global dataTable
97 
98  # Read trainDatasetFileName from a file and create a numeric table to store the input data
99  dataTable[block] = createSparseTable(trainDatasetFileNames[block])
100 
101 
102 def initializeStep1Local(block):
103  global itemsPartialResultLocal
104  global itemStep3LocalInput
105  global userOffsets
106 
107  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
108  initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
109  initAlgorithm.parameter.fullNUsers = nUsers
110  initAlgorithm.parameter.nFactors = nFactors
111  initAlgorithm.parameter.seed += block
112  usersPartitionArray = np.array(usersPartition, dtype=np.float64)
113  usersPartitionArray.shape = (1, 1)
114 
115  initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
116 
117  # Pass a training data set to the algorithm
118  initAlgorithm.input.set(init.data, dataTable[block])
119 
120  # Initialize the implicit ALS model
121  partialResult = initAlgorithm.compute()
122  itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
123  userOffsets[block] = partialResult.getCollection(init.offsets, block)
124  partialModelLocal = partialResult.getPartialModel(init.partialModel)
125 
126  itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
127  itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
128 
129  return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
130 
131 def initializeStep2Local(block, initStep2LocalInput):
132  global transposedDataTable
133  global userStep3LocalInput
134  global itemOffsets
135  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
136  initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
137 
138  initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
139 
140  # Initialize the implicit ALS model
141  partialResult = initAlgorithm.compute()
142 
143  transposedDataTable[block] = partialResult.getTable(init.transposedData)
144  userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
145  itemOffsets[block] = partialResult.getCollection(init.offsets, block)
146 
147 def initializeModel():
148  initStep1LocalResult = [0] * nBlocks
149 
150  for i in range(nBlocks):
151  initStep1LocalResult[i] = initializeStep1Local(i)
152 
153  initStep2LocalInput = [0] * nBlocks
154 
155  for i in range(nBlocks):
156  initStep2LocalInput[i] = KeyValueDataCollection()
157  for j in range(nBlocks):
158  initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
159 
160  for i in range(nBlocks):
161  initializeStep2Local(i, initStep2LocalInput[i])
162 
163 
164 def computeStep1Local(partialResultLocal):
165 
166  # Create an algorithm object to perform first step of the implicit ALS training algorithm on local-node data
167  algorithm = training.Distributed(step=step1Local)
168  algorithm.parameter.nFactors = nFactors
169 
170  # Set input objects for the algorithm
171  algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
172 
173  # Compute partial results of the first step on local nodes
174  # DistributedPartialResultStep1 class from training
175  return algorithm.compute()
176 
177 
178 def computeStep2Master(step1LocalResult):
179 
180  # Create an algorithm object to perform second step of the implicit ALS training algorithm
181  algorithm = training.Distributed(step=step2Master)
182  algorithm.parameter.nFactors = nFactors
183 
184  # Set the partial results of the first local step of distributed computations
185  # as input for the master-node algorithm
186  for i in range(nBlocks):
187  algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
188 
189  # Compute a partial result on the master node from the partial results on local nodes
190  # DistributedPartialResultStep2 class from training
191  res = algorithm.compute()
192  return res.get(training.outputOfStep2ForStep4)
193 
194 
195 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
196 
197  # Create an algorithm object to perform third step of the implicit ALS training algorithm on local-node data
198  algorithm = training.Distributed(step=step3Local)
199  algorithm.parameter.nFactors = nFactors
200 
201  # Set input objects for the algorithm
202  algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
203  algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
204  algorithm.input.setTable(training.offset, offsets)
205 
206  # Compute partial results of the third step on local nodes
207  # DistributedPartialResultStep3 class from training
208  res = algorithm.compute()
209  return res.get(training.outputOfStep3ForStep4)
210 
211 
212 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
213 
214  # Create an algorithm object to perform fourth step of the implicit ALS training algorithm on local-node data
215  algorithm = training.Distributed(step=step4Local)
216  algorithm.parameter.nFactors = nFactors
217 
218  # Set input objects for the algorithm
219  algorithm.input.setModels(training.partialModels, step4LocalInput)
220  algorithm.input.setTable(training.partialData, dataTable)
221  algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
222 
223  # Build the implicit ALS partial model on the local node
224  # DistributedPartialResultStep4 class from training
225  return algorithm.compute()
226 
227 
228 def trainModel():
229 
230  step1LocalResult = [0] * nBlocks
231  step3LocalResult = [0] * nBlocks
232  step4LocalInput = [0] * nBlocks
233 
234  for i in range(nBlocks):
235  step4LocalInput[i] = KeyValueDataCollection()
236 
237  for iteration in range(maxIterations):
238 
239  # Update partial users factors
240  for i in range(nBlocks):
241  step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
242 
243  step2MasterResult = computeStep2Master(step1LocalResult)
244 
245  for i in range(nBlocks):
246  step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
247 
248  # Prepare input objects for the fourth step of the distributed algorithm
249  for i in range(nBlocks):
250  for j in range(nBlocks):
251  step4LocalInput[i][j] = step3LocalResult[j][i]
252 
253  for i in range(nBlocks):
254  usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
255 
256  # Update partial items factors
257  for i in range(nBlocks):
258  step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
259 
260  step2MasterResult = computeStep2Master(step1LocalResult)
261 
262  for i in range(nBlocks):
263  step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
264 
265  # Prepare input objects for the fourth step of the distributed algorithm
266  for i in range(nBlocks):
267  for j in range(nBlocks):
268  step4LocalInput[i][j] = step3LocalResult[j][i]
269 
270  for i in range(nBlocks):
271  itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
272 
273 
274 def testModel(i, j):
275  # Create an algorithm object to predict ratings based in the implicit ALS partial models
276  algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
277  algorithm.parameter.nFactors = nFactors
278 
279  # Set input objects for the algorithm
280  algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
281  algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
282 
283  # Predict ratings and retrieve the algorithm results
284  algorithm.compute()
285 
286  # Result class from ratings
287  res = algorithm.finalizeCompute()
288  return res.get(ratings.prediction)
289 
290 
291 def printResults():
292 
293  for i in range(nBlocks):
294  for j in range(nBlocks):
295  print("Ratings for users block {}, items block {} :".format(i, j))
296  printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
297 
298 if __name__ == "__main__":
299  for i in range(nBlocks):
300  readData(i)
301 
302  initializeModel()
303 
304  trainModel()
305 
306  for i in range(nBlocks):
307  for j in range(nBlocks):
308  predictedRatings[i][j] = testModel(i, j)
309 
310  printResults()

For more complete information about compiler optimizations, see our Optimization Notice.