Python* API Reference for Intel® Data Analytics Acceleration Library 2018 Update 2

kmeans_init_dense_distr.py

1 # file: kmeans_init_dense_distr.py
2 #===============================================================================
3 # Copyright 2014-2018 Intel Corporation
4 # All Rights Reserved.
5 #
6 # If this software was obtained under the Intel Simplified Software License,
7 # the following terms apply:
8 #
9 # The source code, information and material ("Material") contained herein is
10 # owned by Intel Corporation or its suppliers or licensors, and title to such
11 # Material remains with Intel Corporation or its suppliers or licensors. The
12 # Material contains proprietary information of Intel or its suppliers and
13 # licensors. The Material is protected by worldwide copyright laws and treaty
14 # provisions. No part of the Material may be used, copied, reproduced,
15 # modified, published, uploaded, posted, transmitted, distributed or disclosed
16 # in any way without Intel's prior express written permission. No license under
17 # any patent, copyright or other intellectual property rights in the Material
18 # is granted to or conferred upon you, either expressly, by implication,
19 # inducement, estoppel or otherwise. Any license under such intellectual
20 # property rights must be express and approved by Intel in writing.
21 #
22 # Unless otherwise agreed by Intel in writing, you may not remove or alter this
23 # notice or any other notice embedded in Materials by Intel or Intel's
24 # suppliers or licensors in any way.
25 #
26 #
27 # If this software was obtained under the Apache License, Version 2.0 (the
28 # "License"), the following terms apply:
29 #
30 # You may not use this file except in compliance with the License. You may
31 # obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
32 #
33 #
34 # Unless required by applicable law or agreed to in writing, software
35 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
36 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
37 #
38 # See the License for the specific language governing permissions and
39 # limitations under the License.
40 #===============================================================================
41 
42 #
43 # ! Content:
44 # ! Python example of dense K-Means clustering in the distributed processing mode
45 # !*****************************************************************************
46 
47 #
48 
49 
50 #
51 
52 import sys, os
53 import numpy as np
54 
55 from daal import step1Local, step2Master, step3Master
56 import daal.algorithms.kmeans as kmeans
57 import daal.algorithms.kmeans.init as init
58 from daal import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
59 from daal.data_management import FileDataSource, DataSource, RowMergedNumericTable
60 
61 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
62 if utils_folder not in sys.path:
63  sys.path.insert(0, utils_folder)
64 from utils import printNumericTable, createSparseTable
65 
66 # K-Means algorithm parameters
67 algorithmFPType = np.float32
68 nClusters = 20
69 nIterations = 5
70 nBlocks = 4
71 nVectorsInBlock = 2500
72 
73 DAAL_PREFIX = os.path.join('..', 'data', 'distributed')
74 dataFileNames = [os.path.join(DAAL_PREFIX, 'kmeans_dense_1.csv'),
75  os.path.join(DAAL_PREFIX, 'kmeans_dense_2.csv'),
76  os.path.join(DAAL_PREFIX, 'kmeans_dense_3.csv'),
77  os.path.join(DAAL_PREFIX, 'kmeans_dense_4.csv')]
78 
79 
80 def loadData(files):
81  data = []
82  for i in range(nBlocks):
83  # Initialize FileDataSource<CSVFeatureManager> to retrieve the input data from a .csv file
84  dataSource = FileDataSource(files[i],
85  DataSource.doAllocateNumericTable,
86  DataSource.doDictionaryFromContext)
87  # Retrieve the data from the input file
88  dataSource.loadDataBlock()
89  data.append(dataSource.getNumericTable())
90  return data
91 
92 
93 def initStep1(data, method):
94  for i in range(nBlocks):
95  # Create an algorithm object for the K-Means algorithm
96  local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
97  fptype=algorithmFPType, method=method)
98  local.input.set(kmeans.init.data, data[i])
99  pNewCenters = local.compute().get(kmeans.init.partialCentroids)
100  if pNewCenters:
101  return pNewCenters
102  return None
103 
104 
105 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
106 # kmeans.init.Distributed(nClusters, bFirstIteration, step=step3Master, fptype=algorithmFPType, method=method)
107  for i in range(len(data)):
108  step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
109  step2.input.set(kmeans.init.data, data[i])
110  step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
111  if not bFirstIteration:
112  step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
113  res = step2.compute()
114  if bFirstIteration:
115  localNodeData.append(res.getLocal(kmeans.init.internalResult))
116  step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
117  return step3.compute()
118 
119 
120 def initStep4(data, localNodeData, step3res, method):
121  aRes = []
122  for i in range(0, len(data)):
123  # Get an input for step 4 on this node if any
124  step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
125  if not step3Output:
126  continue
127 
128  # Create an algorithm object for the step 4
129  step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
130  # Set the input data to the algorithm
131  step4.input.setInput(kmeans.init.data, data[i])
132  step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
133  step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
134  # Compute and get the result
135  step4.compute()
136  aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
137 
138  if len(aRes) == 0:
139  return None
140  if len(aRes) == 1:
141  return aRes[0]
142  # For parallelPlus algorithm
143  pMerged = RowMergedNumericTable()
144  for r in aRes:
145  pMerged.addNumericTable(r)
146  return pMerged
147 # return NumericTable.cast(pMerged)
148 
149 
150 def initCentroids_plusPlusDense(data):
151  # Internal data to be stored on the local nodes
152  localNodeData = []
153  # Numeric table to collect the results
154  pCentroids = RowMergedNumericTable()
155  # First step on the local nodes
156  pNewCentroids = initStep1(data, kmeans.init.plusPlusDense)
157  pCentroids.addNumericTable(pNewCentroids)
158 
159  # Create an algorithm object for the step 3
160  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusDense)
161  for iCenter in range(1, nClusters):
162  # Perform steps 2 and 3
163  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusDense)
164  # Perform steps 4
165  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusDense)
166  pCentroids.addNumericTable(pNewCentroids)
167  return pCentroids #NumericTable.cast(pCentroids)
168 
169 
170 def initCentroids_parallelPlusDense(data):
171  # Internal data to be stored on the local nodes
172  localNodeData = []
173  # First step on the local nodes
174  pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusDense)
175 
176  # Create an algorithm object for the step 5
177  step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
178  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
179  # Create an algorithm object for the step 3
180  step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
181  for iRound in range(step5.parameter.nRounds):
182  # Perform steps 2 and 3
183  step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusDense)
184  # Perform step 4
185  pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusDense)
186  step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
187 
188  # One more step 2
189  for i in range(nBlocks):
190  # Create an algorithm object for the step 2
191  local = kmeans.init.Distributed(step2Local, nClusters, False, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
192  local.parameter.outputForStep5Required = True
193  # Set the input data to the algorithm
194  local.input.setInput(kmeans.init.data, data[i])
195  local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
196  local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
197  # Compute, get the result and add the result to the input of step 5
198  step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
199 
200  step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
201  step5.compute()
202  return step5.finalizeCompute().get(kmeans.init.centroids)
203 
204 
205 def initCentroids(data, method):
206  if method == kmeans.init.parallelPlusDense:
207  return initCentroids_parallelPlusDense(data)
208  if method == kmeans.init.plusPlusDense:
209  return initCentroids_plusPlusDense(data)
210  assert False, "Unknown method for initCentroids"
211 
212 
213 def calculateCentroids(initialCentroids, data):
214  masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType)
215 
216  nRows = initialCentroids.getNumberOfRows()
217  nCols = initialCentroids.getNumberOfColumns()
218 
219  assignments = []
220  centroids = initialCentroids
221  objectiveFunction = None
222 
223  # Calculate centroids
224  for it in range(nIterations):
225  for i in range(nBlocks):
226  # Create an algorithm object for the K-Means algorithm
227  localAlgorithm = kmeans.Distributed(step1Local, nClusters, False, fptype=algorithmFPType)
228 
229  # Set the input data to the algorithm
230  localAlgorithm.input.set(kmeans.data, data[i])
231  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
232 
233  masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
234 
235  masterAlgorithm.compute()
236  res = masterAlgorithm.finalizeCompute()
237 
238  centroids = res.get(kmeans.centroids)
239  objectiveFunction = res.get(kmeans.objectiveFunction)
240 
241  # Calculate assignments
242  for i in range(nBlocks):
243  # Create an algorithm object for the K-Means algorithm
244  localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType)
245 
246  # Set the input data to the algorithm
247  localAlgorithm.input.set(kmeans.data, data[i])
248  localAlgorithm.input.set(kmeans.inputCentroids, centroids)
249 
250  assignments.append(localAlgorithm.compute().get(kmeans.assignments))
251 
252  # Print the clusterization results
253  printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10)
254  printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10)
255  printNumericTable(objectiveFunction, "Objective function value:")
256 
257 
258 def runKMeans(data, method, methodName):
259  print("K-means init parameters: method = " + str(methodName))
260  centroids = initCentroids(data, method=method)
261  calculateCentroids(centroids, data)
262 
263 
264 if __name__ == "__main__":
265  data = loadData(dataFileNames)
266  runKMeans(data, kmeans.init.plusPlusDense, "plusPlusDense")
267  runKMeans(data, kmeans.init.parallelPlusDense, "parallelPlusDense")

For more complete information about compiler optimizations, see our Optimization Notice.