Python* API Reference for Intel® Data Analytics Acceleration Library 2018 Update 2

impl_als_csr_distr.py

1 # file: impl_als_csr_distr.py
2 #===============================================================================
3 # Copyright 2014-2018 Intel Corporation
4 # All Rights Reserved.
5 #
6 # If this software was obtained under the Intel Simplified Software License,
7 # the following terms apply:
8 #
9 # The source code, information and material ("Material") contained herein is
10 # owned by Intel Corporation or its suppliers or licensors, and title to such
11 # Material remains with Intel Corporation or its suppliers or licensors. The
12 # Material contains proprietary information of Intel or its suppliers and
13 # licensors. The Material is protected by worldwide copyright laws and treaty
14 # provisions. No part of the Material may be used, copied, reproduced,
15 # modified, published, uploaded, posted, transmitted, distributed or disclosed
16 # in any way without Intel's prior express written permission. No license under
17 # any patent, copyright or other intellectual property rights in the Material
18 # is granted to or conferred upon you, either expressly, by implication,
19 # inducement, estoppel or otherwise. Any license under such intellectual
20 # property rights must be express and approved by Intel in writing.
21 #
22 # Unless otherwise agreed by Intel in writing, you may not remove or alter this
23 # notice or any other notice embedded in Materials by Intel or Intel's
24 # suppliers or licensors in any way.
25 #
26 #
27 # If this software was obtained under the Apache License, Version 2.0 (the
28 # "License"), the following terms apply:
29 #
30 # You may not use this file except in compliance with the License. You may
31 # obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
32 #
33 #
34 # Unless required by applicable law or agreed to in writing, software
35 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
36 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
37 #
38 # See the License for the specific language governing permissions and
39 # limitations under the License.
40 #===============================================================================
41 
42 
43 
44 
45 import os
46 import sys
47 
48 import numpy as np
49 
50 from daal import step1Local, step2Local, step2Master, step3Local, step4Local
51 import daal.algorithms.implicit_als.prediction.ratings as ratings
52 import daal.algorithms.implicit_als.training as training
53 import daal.algorithms.implicit_als.training.init as init
54 from daal.data_management import KeyValueDataCollection, HomogenNumericTable
55 
56 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
57 if utils_folder not in sys.path:
58  sys.path.insert(0, utils_folder)
59 from utils import createSparseTable, printALSRatings
60 
61 DAAL_PREFIX = os.path.join('..', 'data')
62 
63 # Input data set parameters
64 nBlocks = 4
65 
66 # Number of observations in transposed training data set blocks
67 trainDatasetFileNames = [
68  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_1.csv'),
69  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_2.csv'),
70  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_3.csv'),
71  os.path.join(DAAL_PREFIX, 'distributed', 'implicit_als_trans_csr_4.csv')
72 ]
73 
74 usersPartition = [0] * 1
75 usersPartition[0] = nBlocks
76 
77 userOffsets = [0] * nBlocks
78 itemOffsets = [0] * nBlocks
79 
80 # Algorithm parameters
81 nUsers = 46 # Full number of users
82 nFactors = 2 # Number of factors
83 maxIterations = 5 # Number of iterations in the implicit ALS training algorithm
84 
85 dataTable = [0] * nBlocks
86 transposedDataTable = [0] * nBlocks
87 
88 predictedRatings = [[0] * nBlocks for x in range(nBlocks)]
89 
90 userStep3LocalInput = [0] * nBlocks
91 itemStep3LocalInput = [0] * nBlocks
92 
93 itemsPartialResultLocal = [0] * nBlocks
94 usersPartialResultLocal = [0] * nBlocks
95 
96 def readData(block):
97  global dataTable
98 
99  # Read trainDatasetFileName from a file and create a numeric table to store the input data
100  dataTable[block] = createSparseTable(trainDatasetFileNames[block])
101 
102 
103 def initializeStep1Local(block):
104  global itemsPartialResultLocal
105  global itemStep3LocalInput
106  global userOffsets
107 
108  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
109  initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
110  initAlgorithm.parameter.fullNUsers = nUsers
111  initAlgorithm.parameter.nFactors = nFactors
112  initAlgorithm.parameter.seed += block
113  usersPartitionArray = np.array(usersPartition, dtype=np.float64)
114  usersPartitionArray.shape = (1, 1)
115 
116  initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
117 
118  # Pass a training data set to the algorithm
119  initAlgorithm.input.set(init.data, dataTable[block])
120 
121  # Initialize the implicit ALS model
122  partialResult = initAlgorithm.compute()
123  itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
124  userOffsets[block] = partialResult.getCollection(init.offsets, block)
125  partialModelLocal = partialResult.getPartialModel(init.partialModel)
126 
127  itemsPartialResultLocal[block] = training.DistributedPartialResultStep4()
128  itemsPartialResultLocal[block].set(training.outputOfStep4ForStep1, partialModelLocal)
129 
130  return partialResult.getTablesCollection(init.outputOfStep1ForStep2)
131 
132 def initializeStep2Local(block, initStep2LocalInput):
133  global transposedDataTable
134  global userStep3LocalInput
135  global itemOffsets
136  # Create an algorithm object to initialize the implicit ALS model with the fastCSR method
137  initAlgorithm = init.Distributed(step=step2Local, method=init.fastCSR)
138 
139  initAlgorithm.input.set(init.inputOfStep2FromStep1, initStep2LocalInput)
140 
141  # Initialize the implicit ALS model
142  partialResult = initAlgorithm.compute()
143 
144  transposedDataTable[block] = partialResult.getTable(init.transposedData)
145  userStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
146  itemOffsets[block] = partialResult.getCollection(init.offsets, block)
147 
148 def initializeModel():
149  initStep1LocalResult = [0] * nBlocks
150 
151  for i in range(nBlocks):
152  initStep1LocalResult[i] = initializeStep1Local(i)
153 
154  initStep2LocalInput = [0] * nBlocks
155 
156  for i in range(nBlocks):
157  initStep2LocalInput[i] = KeyValueDataCollection()
158  for j in range(nBlocks):
159  initStep2LocalInput[i][j] = initStep1LocalResult[j][i]
160 
161  for i in range(nBlocks):
162  initializeStep2Local(i, initStep2LocalInput[i])
163 
164 
165 def computeStep1Local(partialResultLocal):
166 
167  # Create an algorithm object to perform first step of the implicit ALS training algorithm on local-node data
168  algorithm = training.Distributed(step=step1Local)
169  algorithm.parameter.nFactors = nFactors
170 
171  # Set input objects for the algorithm
172  algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
173 
174  # Compute partial results of the first step on local nodes
175  # DistributedPartialResultStep1 class from training
176  return algorithm.compute()
177 
178 
179 def computeStep2Master(step1LocalResult):
180 
181  # Create an algorithm object to perform second step of the implicit ALS training algorithm
182  algorithm = training.Distributed(step=step2Master)
183  algorithm.parameter.nFactors = nFactors
184 
185  # Set the partial results of the first local step of distributed computations
186  # as input for the master-node algorithm
187  for i in range(nBlocks):
188  algorithm.input.add(training.inputOfStep2FromStep1, step1LocalResult[i])
189 
190  # Compute a partial result on the master node from the partial results on local nodes
191  # DistributedPartialResultStep2 class from training
192  res = algorithm.compute()
193  return res.get(training.outputOfStep2ForStep4)
194 
195 
196 def computeStep3Local(offsets, partialResultLocal, step3LocalInput):
197 
198  # Create an algorithm object to perform third step of the implicit ALS training algorithm on local-node data
199  algorithm = training.Distributed(step=step3Local)
200  algorithm.parameter.nFactors = nFactors
201 
202  # Set input objects for the algorithm
203  algorithm.input.setModel(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep3))
204  algorithm.input.setCollection(training.inputOfStep3FromInit, step3LocalInput)
205  algorithm.input.setTable(training.offset, offsets)
206 
207  # Compute partial results of the third step on local nodes
208  # DistributedPartialResultStep3 class from training
209  res = algorithm.compute()
210  return res.get(training.outputOfStep3ForStep4)
211 
212 
213 def computeStep4Local(dataTable, step2MasterResult, step4LocalInput):
214 
215  # Create an algorithm object to perform fourth step of the implicit ALS training algorithm on local-node data
216  algorithm = training.Distributed(step=step4Local)
217  algorithm.parameter.nFactors = nFactors
218 
219  # Set input objects for the algorithm
220  algorithm.input.setModels(training.partialModels, step4LocalInput)
221  algorithm.input.setTable(training.partialData, dataTable)
222  algorithm.input.setTable(training.inputOfStep4FromStep2, step2MasterResult)
223 
224  # Build the implicit ALS partial model on the local node
225  # DistributedPartialResultStep4 class from training
226  return algorithm.compute()
227 
228 
229 def trainModel():
230 
231  step1LocalResult = [0] * nBlocks
232  step3LocalResult = [0] * nBlocks
233  step4LocalInput = [0] * nBlocks
234 
235  for i in range(nBlocks):
236  step4LocalInput[i] = KeyValueDataCollection()
237 
238  for iteration in range(maxIterations):
239 
240  # Update partial users factors
241  for i in range(nBlocks):
242  step1LocalResult[i] = computeStep1Local(itemsPartialResultLocal[i])
243 
244  step2MasterResult = computeStep2Master(step1LocalResult)
245 
246  for i in range(nBlocks):
247  step3LocalResult[i] = computeStep3Local(itemOffsets[i], itemsPartialResultLocal[i], itemStep3LocalInput[i])
248 
249  # Prepare input objects for the fourth step of the distributed algorithm
250  for i in range(nBlocks):
251  for j in range(nBlocks):
252  step4LocalInput[i][j] = step3LocalResult[j][i]
253 
254  for i in range(nBlocks):
255  usersPartialResultLocal[i] = computeStep4Local(transposedDataTable[i], step2MasterResult, step4LocalInput[i])
256 
257  # Update partial items factors
258  for i in range(nBlocks):
259  step1LocalResult[i] = computeStep1Local(usersPartialResultLocal[i])
260 
261  step2MasterResult = computeStep2Master(step1LocalResult)
262 
263  for i in range(nBlocks):
264  step3LocalResult[i] = computeStep3Local(userOffsets[i], usersPartialResultLocal[i], userStep3LocalInput[i])
265 
266  # Prepare input objects for the fourth step of the distributed algorithm
267  for i in range(nBlocks):
268  for j in range(nBlocks):
269  step4LocalInput[i][j] = step3LocalResult[j][i]
270 
271  for i in range(nBlocks):
272  itemsPartialResultLocal[i] = computeStep4Local(dataTable[i], step2MasterResult, step4LocalInput[i])
273 
274 
275 def testModel(i, j):
276  # Create an algorithm object to predict ratings based in the implicit ALS partial models
277  algorithm = ratings.Distributed(step=step1Local, method=ratings.defaultDense)
278  algorithm.parameter.nFactors = nFactors
279 
280  # Set input objects for the algorithm
281  algorithm.input.set(ratings.usersPartialModel, usersPartialResultLocal[i].get(training.outputOfStep4))
282  algorithm.input.set(ratings.itemsPartialModel, itemsPartialResultLocal[j].get(training.outputOfStep4))
283 
284  # Predict ratings and retrieve the algorithm results
285  algorithm.compute()
286 
287  # Result class from ratings
288  res = algorithm.finalizeCompute()
289  return res.get(ratings.prediction)
290 
291 
292 def printResults():
293 
294  for i in range(nBlocks):
295  for j in range(nBlocks):
296  print("Ratings for users block {}, items block {} :".format(i, j))
297  printALSRatings(userOffsets[i], itemOffsets[j], predictedRatings[i][j])
298 
299 if __name__ == "__main__":
300  for i in range(nBlocks):
301  readData(i)
302 
303  initializeModel()
304 
305  trainModel()
306 
307  for i in range(nBlocks):
308  for j in range(nBlocks):
309  predictedRatings[i][j] = testModel(i, j)
310 
311  printResults()

For more complete information about compiler optimizations, see our Optimization Notice.