Python* API Reference for Intel® Data Analytics Acceleration Library 2019 Update 4

kmeans_init_dense_distr.py

1 # file: kmeans_init_dense_distr.py
2 #===============================================================================
3 # Copyright 2014-2019 Intel Corporation.
4 #
5 # This software and the related documents are Intel copyrighted materials, and
6 # your use of them is governed by the express license under which they were
7 # provided to you (License). Unless the License provides otherwise, you may not
8 # use, modify, copy, publish, distribute, disclose or transmit this software or
9 # the related documents without Intel's prior written permission.
10 #
11 # This software and the related documents are provided as is, with no express
12 # or implied warranties, other than those that are expressly stated in the
13 # License.
14 #===============================================================================
15 
16 #
17 # ! Content:
18 # ! Python example of dense K-Means clustering in the distributed processing mode
19 # !*****************************************************************************
20 
21 #
22 
23 
24 #
25 
26 import sys, os
27 import numpy as np
28 
29 from daal import step1Local, step2Master, step3Master
30 import daal.algorithms.kmeans as kmeans
31 import daal.algorithms.kmeans.init as init
32 from daal import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
33 from daal.data_management import FileDataSource, DataSource, RowMergedNumericTable
34 
35 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
36 if utils_folder not in sys.path:
37  sys.path.insert(0, utils_folder)
38 from utils import printNumericTable, createSparseTable
39 
40 # K-Means algorithm parameters
41 algorithmFPType = np.float32
42 nClusters = 20
43 nIterations = 5
44 nBlocks = 4
45 nVectorsInBlock = 2500
46 
47 DAAL_PREFIX = os.path.join('..', 'data', 'distributed')
48 dataFileNames = [os.path.join(DAAL_PREFIX, 'kmeans_dense_1.csv'),
49  os.path.join(DAAL_PREFIX, 'kmeans_dense_2.csv'),
50  os.path.join(DAAL_PREFIX, 'kmeans_dense_3.csv'),
51  os.path.join(DAAL_PREFIX, 'kmeans_dense_4.csv')]
52 
53 
54 def loadData(files):
55  data = []
56  for i in range(nBlocks):
57  # Initialize FileDataSource<CSVFeatureManager> to retrieve the input data from a .csv file
58  dataSource = FileDataSource(files[i],
59  DataSource.doAllocateNumericTable,
60  DataSource.doDictionaryFromContext)
61  # Retrieve the data from the input file
62  dataSource.loadDataBlock()
63  data.append(dataSource.getNumericTable())
64  return data
65 
66 
67 def initStep1(data, method):
68  for i in range(nBlocks):
69  # Create an algorithm object for the K-Means algorithm
70  local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
71  fptype=algorithmFPType, method=method)
72  local.input.set(kmeans.init.data, data[i])
73  pNewCenters = local.compute().get(kmeans.init.partialCentroids)
74  if pNewCenters:
75  return pNewCenters
76  return None
77 
78 
79 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
80 # kmeans.init.Distributed(nClusters, bFirstIteration, step=step3Master, fptype=algorithmFPType, method=method)
81  for i in range(len(data)):
82  step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
83  step2.input.set(kmeans.init.data, data[i])
84  step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
85  if not bFirstIteration:
86  step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
87  res = step2.compute()
88  if bFirstIteration:
89  localNodeData.append(res.getLocal(kmeans.init.internalResult))
90  step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
91  return step3.compute()
92 
93 
94 def initStep4(data, localNodeData, step3res, method):
95  aRes = []
96  for i in range(0, len(data)):
97  # Get an input for step 4 on this node if any
98  step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
99  if not step3Output:
100  continue
101 
102  # Create an algorithm object for the step 4
103  step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
104  # Set the input data to the algorithm
105  step4.input.setInput(kmeans.init.data, data[i])
106  step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
107  step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
108  # Compute and get the result
109  step4.compute()
110  aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
111 
112  if len(aRes) == 0:
113  return None
114  if len(aRes) == 1:
115  return aRes[0]
116  # For parallelPlus algorithm
117  pMerged = RowMergedNumericTable()
118  for r in aRes:
119  pMerged.addNumericTable(r)
120  return pMerged
121 # return NumericTable.cast(pMerged)
122 
123 
124 def initCentroids_plusPlusDense(data):
125  # Internal data to be stored on the local nodes
126  localNodeData = []
127  # Numeric table to collect the results
128  pCentroids = RowMergedNumericTable()
129  # First step on the local nodes
130  pNewCentroids = initStep1(data, kmeans.init.plusPlusDense)
131  pCentroids.addNumericTable(pNewCentroids)
132 
133  # Create an algorithm object for the step 3
134  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusDense)
135  for iCenter in range(1, nClusters):
136  # Perform steps 2 and 3
137  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusDense)
138  # Perform steps 4
139  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusDense)
140  pCentroids.addNumericTable(pNewCentroids)
141  return pCentroids #NumericTable.cast(pCentroids)
142 
143 
144 def initCentroids_parallelPlusDense(data):
145  # Internal data to be stored on the local nodes
146  localNodeData = []
147  # First step on the local nodes
148  pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusDense)
149 
150  # Create an algorithm object for the step 5
151  step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
152  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
153  # Create an algorithm object for the step 3
154  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
155  for iRound in range(step5.parameter.nRounds):
156  # Perform steps 2 and 3
157  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusDense)
158  # Perform step 4
159  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusDense)
160  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
161 
162  # One more step 2
163  for i in range(nBlocks):
164  # Create an algorithm object for the step 2
165  local = kmeans.init.Distributed(step2Local, nClusters, False, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
166  local.parameter.outputForStep5Required = True
167  # Set the input data to the algorithm
168  local.input.setInput(kmeans.init.data, data[i])
169  local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
170  local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
171  # Compute, get the result and add the result to the input of step 5
172  step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
173 
174  step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
175  step5.compute()
176  return step5.finalizeCompute().get(kmeans.init.centroids)
177 
178 
179 def initCentroids(data, method):
180  if method == kmeans.init.parallelPlusDense:
181  return initCentroids_parallelPlusDense(data)
182  if method == kmeans.init.plusPlusDense:
183  return initCentroids_plusPlusDense(data)
184  assert False, "Unknown method for initCentroids"
185 
186 
187 def calculateCentroids(initialCentroids, data):
188  masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType)
189 
190  nRows = initialCentroids.getNumberOfRows()
191  nCols = initialCentroids.getNumberOfColumns()
192 
193  assignments = []
194  centroids = initialCentroids
195  objectiveFunction = None
196 
197  # Calculate centroids
198  for it in range(nIterations):
199  for i in range(nBlocks):
200  # Create an algorithm object for the K-Means algorithm
201  localAlgorithm = kmeans.Distributed(step1Local, nClusters, False, fptype=algorithmFPType)
202 
203  # Set the input data to the algorithm
204  localAlgorithm.input.set(kmeans.data, data[i])
205  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
206 
207  masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
208 
209  masterAlgorithm.compute()
210  res = masterAlgorithm.finalizeCompute()
211 
212  centroids = res.get(kmeans.centroids)
213  objectiveFunction = res.get(kmeans.objectiveFunction)
214 
215  # Calculate assignments
216  for i in range(nBlocks):
217  # Create an algorithm object for the K-Means algorithm
218  localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType)
219 
220  # Set the input data to the algorithm
221  localAlgorithm.input.set(kmeans.data, data[i])
222  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
223 
224  assignments.append(localAlgorithm.compute().get(kmeans.assignments))
225 
226  # Print the clusterization results
227  printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10)
228  printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10)
229  printNumericTable(objectiveFunction, "Objective function value:")
230 
231 
232 def runKMeans(data, method, methodName):
233  print("K-means init parameters: method = " + str(methodName))
234  centroids = initCentroids(data, method=method)
235  calculateCentroids(centroids, data)
236 
237 
238 if __name__ == "__main__":
239  data = loadData(dataFileNames)
240  runKMeans(data, kmeans.init.plusPlusDense, "plusPlusDense")
241  runKMeans(data, kmeans.init.parallelPlusDense, "parallelPlusDense")

For more complete information about compiler optimizations, see our Optimization Notice.