Python* API Reference for Intel® Data Analytics Acceleration Library 2018 Update 2

kmeans_init_csr_distr.py

1 # file: kmeans_init_csr_distr.py
2 #===============================================================================
3 # Copyright 2014-2018 Intel Corporation
4 # All Rights Reserved.
5 #
6 # If this software was obtained under the Intel Simplified Software License,
7 # the following terms apply:
8 #
9 # The source code, information and material ("Material") contained herein is
10 # owned by Intel Corporation or its suppliers or licensors, and title to such
11 # Material remains with Intel Corporation or its suppliers or licensors. The
12 # Material contains proprietary information of Intel or its suppliers and
13 # licensors. The Material is protected by worldwide copyright laws and treaty
14 # provisions. No part of the Material may be used, copied, reproduced,
15 # modified, published, uploaded, posted, transmitted, distributed or disclosed
16 # in any way without Intel's prior express written permission. No license under
17 # any patent, copyright or other intellectual property rights in the Material
18 # is granted to or conferred upon you, either expressly, by implication,
19 # inducement, estoppel or otherwise. Any license under such intellectual
20 # property rights must be express and approved by Intel in writing.
21 #
22 # Unless otherwise agreed by Intel in writing, you may not remove or alter this
23 # notice or any other notice embedded in Materials by Intel or Intel's
24 # suppliers or licensors in any way.
25 #
26 #
27 # If this software was obtained under the Apache License, Version 2.0 (the
28 # "License"), the following terms apply:
29 #
30 # You may not use this file except in compliance with the License. You may
31 # obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
32 #
33 #
34 # Unless required by applicable law or agreed to in writing, software
35 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
36 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
37 #
38 # See the License for the specific language governing permissions and
39 # limitations under the License.
40 #===============================================================================
41 
42 #
43 # ! Content:
44 # ! Python example of CSR K-Means clustering in the distributed processing mode
45 # !*****************************************************************************
46 
47 #
48 
49 
50 #
51 
52 import sys, os
53 import numpy as np
54 
55 from daal import step1Local, step2Master, step3Master
56 import daal.algorithms.kmeans as kmeans
57 import daal.algorithms.kmeans.init as init
58 from daal import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
59 from daal.data_management import RowMergedNumericTable
60 
61 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
62 if utils_folder not in sys.path:
63  sys.path.insert(0, utils_folder)
64 from utils import printNumericTable, createSparseTable
65 
66 # K-Means algorithm parameters
67 algorithmFPType = np.float32
68 nClusters = 20
69 nIterations = 5
70 nBlocks = 4
71 nVectorsInBlock = 8000
72 
73 DAAL_PREFIX = os.path.join('..', 'data', 'distributed')
74 dataFileNames = [os.path.join(DAAL_PREFIX, 'kmeans_csr_1.csv'),
75  os.path.join(DAAL_PREFIX, 'kmeans_csr_2.csv'),
76  os.path.join(DAAL_PREFIX, 'kmeans_csr_3.csv'),
77  os.path.join(DAAL_PREFIX, 'kmeans_csr_4.csv')]
78 
79 
80 def loadData(files):
81  return [createSparseTable(f, ntype=np.float32) for f in files]
82 
83 
84 def initStep1(data, method):
85  for i in range(nBlocks):
86  # Create an algorithm object for the K-Means algorithm
87  local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
88  fptype=algorithmFPType, method=method)
89  local.input.set(kmeans.init.data, data[i])
90  pNewCenters = local.compute().get(kmeans.init.partialCentroids)
91  if pNewCenters:
92  return pNewCenters
93  return None
94 
95 
96 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
97 # kmeans.init.Distributed(nClusters, bFirstIteration, step=step3Master, fptype=algorithmFPType, method=method)
98  for i in range(len(data)):
99  step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
100  step2.input.set(kmeans.init.data, data[i])
101  step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
102  if not bFirstIteration:
103  step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
104  res = step2.compute()
105  if bFirstIteration:
106  localNodeData.append(res.getLocal(kmeans.init.internalResult))
107  step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
108  return step3.compute()
109 
110 
111 def initStep4(data, localNodeData, step3res, method):
112  aRes = []
113  for i in range(0, len(data)):
114  # Get an input for step 4 on this node if any
115  step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
116  if not step3Output:
117  continue
118 
119  # Create an algorithm object for the step 4
120  step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
121  # Set the input data to the algorithm
122  step4.input.setInput(kmeans.init.data, data[i])
123  step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
124  step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
125  # Compute and get the result
126  step4.compute()
127  aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
128 
129  if len(aRes) == 0:
130  return None
131  if len(aRes) == 1:
132  return aRes[0]
133  # For parallelPlus algorithm
134  pMerged = RowMergedNumericTable()
135  for r in aRes:
136  pMerged.addNumericTable(r)
137  return pMerged
138 # return NumericTable.cast(pMerged)
139 
140 
141 def initCentroids_plusPlusCSR(data):
142  # Internal data to be stored on the local nodes
143  localNodeData = []
144  # Numeric table to collect the results
145  pCentroids = RowMergedNumericTable()
146  # First step on the local nodes
147  pNewCentroids = initStep1(data, kmeans.init.plusPlusCSR)
148  pCentroids.addNumericTable(pNewCentroids)
149 
150  # Create an algorithm object for the step 3
151  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusCSR)
152  for iCenter in range(1, nClusters):
153  # Perform steps 2 and 3
154  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusCSR)
155  # Perform steps 4
156  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusCSR)
157  pCentroids.addNumericTable(pNewCentroids)
158  return pCentroids #NumericTable.cast(pCentroids)
159 
160 
161 def initCentroids_parallelPlusCSR(data):
162  # Internal data to be stored on the local nodes
163  localNodeData = []
164  # First step on the local nodes
165  pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusCSR)
166 
167  # Create an algorithm object for the step 5
168  step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
169  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
170  # Create an algorithm object for the step 3
171  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
172  for iRound in range(step5.parameter.nRounds):
173  # Perform steps 2 and 3
174  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusCSR)
175  # Perform step 4
176  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusCSR)
177  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
178 
179  # One more step 2
180  for i in range(nBlocks):
181  # Create an algorithm object for the step 2
182  local = kmeans.init.Distributed(step2Local, nClusters, False, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
183  local.parameter.outputForStep5Required = True
184  # Set the input data to the algorithm
185  local.input.setInput(kmeans.init.data, data[i])
186  local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
187  local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
188  # Compute, get the result and add the result to the input of step 5
189  step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
190 
191  step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
192  step5.compute()
193  return step5.finalizeCompute().get(kmeans.init.centroids)
194 
195 
196 def initCentroids(data, method):
197  if method == kmeans.init.parallelPlusCSR:
198  return initCentroids_parallelPlusCSR(data)
199  if method == kmeans.init.plusPlusCSR:
200  return initCentroids_plusPlusCSR(data)
201  assert False, "Unknown method for initCentroids"
202 
203 
204 def calculateCentroids(initialCentroids, data):
205  masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType, method=kmeans.lloydCSR)
206 
207  nRows = initialCentroids.getNumberOfRows()
208  nCols = initialCentroids.getNumberOfColumns()
209 
210  assignments = []
211  centroids = initialCentroids
212  objectiveFunction = None
213 
214  # Calculate centroids
215  for it in range(nIterations):
216  for i in range(nBlocks):
217  # Create an algorithm object for the K-Means algorithm
218  localAlgorithm = kmeans.Distributed(step1Local, nClusters, False, fptype=algorithmFPType, methods=kmeans.lloydCSR)
219 
220  # Set the input data to the algorithm
221  localAlgorithm.input.set(kmeans.data, data[i])
222  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
223 
224  masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
225 
226  masterAlgorithm.compute()
227  res = masterAlgorithm.finalizeCompute()
228 
229  centroids = res.get(kmeans.centroids)
230  objectiveFunction = res.get(kmeans.objectiveFunction)
231 
232  # Calculate assignments
233  for i in range(nBlocks):
234  # Create an algorithm object for the K-Means algorithm
235  localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType, method=kmeans.lloydCSR)
236 
237  # Set the input data to the algorithm
238  localAlgorithm.input.set(kmeans.data, data[i])
239  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
240 
241  assignments.append(localAlgorithm.compute().get(kmeans.assignments))
242 
243  # Print the clusterization results
244  printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10)
245  printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10)
246  printNumericTable(objectiveFunction, "Objective function value:")
247 
248 
249 def runKMeans(data, method, methodName):
250  print("K-means init parameters: method = " + str(methodName))
251  centroids = initCentroids(data, method=method)
252  calculateCentroids(centroids, data)
253 
254 
255 if __name__ == "__main__":
256  data = loadData(dataFileNames)
257  runKMeans(data, kmeans.init.plusPlusCSR, "plusPlusCSR")
258  runKMeans(data, kmeans.init.parallelPlusCSR, "parallelPlusCSR")

For more complete information about compiler optimizations, see our Optimization Notice.