Python* API Reference for Intel® Data Analytics Acceleration Library 2019

kmeans_init_csr_distr.py

1 # file: kmeans_init_csr_distr.py
2 #===============================================================================
3 # Copyright 2014-2018 Intel Corporation.
4 #
5 # This software and the related documents are Intel copyrighted materials, and
6 # your use of them is governed by the express license under which they were
7 # provided to you (License). Unless the License provides otherwise, you may not
8 # use, modify, copy, publish, distribute, disclose or transmit this software or
9 # the related documents without Intel's prior written permission.
10 #
11 # This software and the related documents are provided as is, with no express
12 # or implied warranties, other than those that are expressly stated in the
13 # License.
14 #===============================================================================
15 
16 #
17 # ! Content:
18 # ! Python example of CSR K-Means clustering in the distributed processing mode
19 # !*****************************************************************************
20 
21 #
22 ## <a name="DAAL-EXAMPLE-PY-KMEANS_INIT_CSR_DISTRIBUTED"></a>
23 ## \example kmeans_init_csr_distr.py
24 #
25 
26 import sys, os
27 import numpy as np
28 
29 from daal import step1Local, step2Master, step3Master
30 import daal.algorithms.kmeans as kmeans
31 import daal.algorithms.kmeans.init as init
32 from daal import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
33 from daal.data_management import RowMergedNumericTable
34 
35 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
36 if utils_folder not in sys.path:
37  sys.path.insert(0, utils_folder)
38 from utils import printNumericTable, createSparseTable
39 
40 # K-Means algorithm parameters
41 algorithmFPType = np.float32
42 nClusters = 20
43 nIterations = 5
44 nBlocks = 4
45 nVectorsInBlock = 8000
46 
47 DAAL_PREFIX = os.path.join('..', 'data', 'distributed')
48 dataFileNames = [os.path.join(DAAL_PREFIX, 'kmeans_csr_1.csv'),
49  os.path.join(DAAL_PREFIX, 'kmeans_csr_2.csv'),
50  os.path.join(DAAL_PREFIX, 'kmeans_csr_3.csv'),
51  os.path.join(DAAL_PREFIX, 'kmeans_csr_4.csv')]
52 
53 
54 def loadData(files):
55  return [createSparseTable(f, ntype=np.float32) for f in files]
56 
57 
58 def initStep1(data, method):
59  for i in range(nBlocks):
60  # Create an algorithm object for the K-Means algorithm
61  local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
62  fptype=algorithmFPType, method=method)
63  local.input.set(kmeans.init.data, data[i])
64  pNewCenters = local.compute().get(kmeans.init.partialCentroids)
65  if pNewCenters:
66  return pNewCenters
67  return None
68 
69 
70 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
71 # kmeans.init.Distributed(nClusters, bFirstIteration, step=step3Master, fptype=algorithmFPType, method=method)
72  for i in range(len(data)):
73  step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
74  step2.input.set(kmeans.init.data, data[i])
75  step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
76  if not bFirstIteration:
77  step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
78  res = step2.compute()
79  if bFirstIteration:
80  localNodeData.append(res.getLocal(kmeans.init.internalResult))
81  step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
82  return step3.compute()
83 
84 
85 def initStep4(data, localNodeData, step3res, method):
86  aRes = []
87  for i in range(0, len(data)):
88  # Get an input for step 4 on this node if any
89  step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
90  if not step3Output:
91  continue
92 
93  # Create an algorithm object for the step 4
94  step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
95  # Set the input data to the algorithm
96  step4.input.setInput(kmeans.init.data, data[i])
97  step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
98  step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
99  # Compute and get the result
100  step4.compute()
101  aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
102 
103  if len(aRes) == 0:
104  return None
105  if len(aRes) == 1:
106  return aRes[0]
107  # For parallelPlus algorithm
108  pMerged = RowMergedNumericTable()
109  for r in aRes:
110  pMerged.addNumericTable(r)
111  return pMerged
112 # return NumericTable.cast(pMerged)
113 
114 
115 def initCentroids_plusPlusCSR(data):
116  # Internal data to be stored on the local nodes
117  localNodeData = []
118  # Numeric table to collect the results
119  pCentroids = RowMergedNumericTable()
120  # First step on the local nodes
121  pNewCentroids = initStep1(data, kmeans.init.plusPlusCSR)
122  pCentroids.addNumericTable(pNewCentroids)
123 
124  # Create an algorithm object for the step 3
125  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusCSR)
126  for iCenter in range(1, nClusters):
127  # Perform steps 2 and 3
128  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusCSR)
129  # Perform steps 4
130  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusCSR)
131  pCentroids.addNumericTable(pNewCentroids)
132  return pCentroids #NumericTable.cast(pCentroids)
133 
134 
135 def initCentroids_parallelPlusCSR(data):
136  # Internal data to be stored on the local nodes
137  localNodeData = []
138  # First step on the local nodes
139  pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusCSR)
140 
141  # Create an algorithm object for the step 5
142  step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
143  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
144  # Create an algorithm object for the step 3
145  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
146  for iRound in range(step5.parameter.nRounds):
147  # Perform steps 2 and 3
148  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusCSR)
149  # Perform step 4
150  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusCSR)
151  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
152 
153  # One more step 2
154  for i in range(nBlocks):
155  # Create an algorithm object for the step 2
156  local = kmeans.init.Distributed(step2Local, nClusters, False, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
157  local.parameter.outputForStep5Required = True
158  # Set the input data to the algorithm
159  local.input.setInput(kmeans.init.data, data[i])
160  local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
161  local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
162  # Compute, get the result and add the result to the input of step 5
163  step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
164 
165  step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
166  step5.compute()
167  return step5.finalizeCompute().get(kmeans.init.centroids)
168 
169 
170 def initCentroids(data, method):
171  if method == kmeans.init.parallelPlusCSR:
172  return initCentroids_parallelPlusCSR(data)
173  if method == kmeans.init.plusPlusCSR:
174  return initCentroids_plusPlusCSR(data)
175  assert False, "Unknown method for initCentroids"
176 
177 
178 def calculateCentroids(initialCentroids, data):
179  masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType, method=kmeans.lloydCSR)
180 
181  nRows = initialCentroids.getNumberOfRows()
182  nCols = initialCentroids.getNumberOfColumns()
183 
184  assignments = []
185  centroids = initialCentroids
186  objectiveFunction = None
187 
188  # Calculate centroids
189  for it in range(nIterations):
190  for i in range(nBlocks):
191  # Create an algorithm object for the K-Means algorithm
192  localAlgorithm = kmeans.Distributed(step1Local, nClusters, False, fptype=algorithmFPType, methods=kmeans.lloydCSR)
193 
194  # Set the input data to the algorithm
195  localAlgorithm.input.set(kmeans.data, data[i])
196  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
197 
198  masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
199 
200  masterAlgorithm.compute()
201  res = masterAlgorithm.finalizeCompute()
202 
203  centroids = res.get(kmeans.centroids)
204  objectiveFunction = res.get(kmeans.objectiveFunction)
205 
206  # Calculate assignments
207  for i in range(nBlocks):
208  # Create an algorithm object for the K-Means algorithm
209  localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType, method=kmeans.lloydCSR)
210 
211  # Set the input data to the algorithm
212  localAlgorithm.input.set(kmeans.data, data[i])
213  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
214 
215  assignments.append(localAlgorithm.compute().get(kmeans.assignments))
216 
217  # Print the clusterization results
218  printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10)
219  printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10)
220  printNumericTable(objectiveFunction, "Objective function value:")
221 
222 
223 def runKMeans(data, method, methodName):
224  print("K-means init parameters: method = " + str(methodName))
225  centroids = initCentroids(data, method=method)
226  calculateCentroids(centroids, data)
227 
228 
229 if __name__ == "__main__":
230  data = loadData(dataFileNames)
231  runKMeans(data, kmeans.init.plusPlusCSR, "plusPlusCSR")
232  runKMeans(data, kmeans.init.parallelPlusCSR, "parallelPlusCSR")

For more complete information about compiler optimizations, see our Optimization Notice.