29 from daal
import step1Local, step2Master, step3Master
30 import daal.algorithms.kmeans
as kmeans
31 import daal.algorithms.kmeans.init
as init
32 from daal
import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
33 from daal.data_management
import FileDataSource, DataSource, RowMergedNumericTable
35 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
36 if utils_folder
not in sys.path:
37 sys.path.insert(0, utils_folder)
38 from utils
import printNumericTable, createSparseTable
41 algorithmFPType = np.float32
45 nVectorsInBlock = 2500
47 DAAL_PREFIX = os.path.join(
'..',
'data',
'distributed')
48 dataFileNames = [os.path.join(DAAL_PREFIX,
'kmeans_dense_1.csv'),
49 os.path.join(DAAL_PREFIX,
'kmeans_dense_2.csv'),
50 os.path.join(DAAL_PREFIX,
'kmeans_dense_3.csv'),
51 os.path.join(DAAL_PREFIX,
'kmeans_dense_4.csv')]
56 for i
in range(nBlocks):
58 dataSource = FileDataSource(files[i],
59 DataSource.doAllocateNumericTable,
60 DataSource.doDictionaryFromContext)
62 dataSource.loadDataBlock()
63 data.append(dataSource.getNumericTable())
67 def initStep1(data, method):
68 for i
in range(nBlocks):
70 local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
71 fptype=algorithmFPType, method=method)
72 local.input.set(kmeans.init.data, data[i])
73 pNewCenters = local.compute().get(kmeans.init.partialCentroids)
79 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
81 for i
in range(len(data)):
82 step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
83 step2.input.set(kmeans.init.data, data[i])
84 step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
85 if not bFirstIteration:
86 step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
89 localNodeData.append(res.getLocal(kmeans.init.internalResult))
90 step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
91 return step3.compute()
94 def initStep4(data, localNodeData, step3res, method):
96 for i
in range(0, len(data)):
98 step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
103 step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
105 step4.input.setInput(kmeans.init.data, data[i])
106 step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
107 step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
110 aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
117 pMerged = RowMergedNumericTable()
119 pMerged.addNumericTable(r)
124 def initCentroids_plusPlusDense(data):
128 pCentroids = RowMergedNumericTable()
130 pNewCentroids = initStep1(data, kmeans.init.plusPlusDense)
131 pCentroids.addNumericTable(pNewCentroids)
134 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusDense)
135 for iCenter
in range(1, nClusters):
137 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusDense)
139 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusDense)
140 pCentroids.addNumericTable(pNewCentroids)
144 def initCentroids_parallelPlusDense(data):
148 pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusDense)
151 step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
152 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
154 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
155 for iRound
in range(step5.parameter.nRounds):
157 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusDense)
159 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusDense)
160 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
163 for i
in range(nBlocks):
165 local = kmeans.init.Distributed(step2Local, nClusters,
False, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
166 local.parameter.outputForStep5Required =
True
168 local.input.setInput(kmeans.init.data, data[i])
169 local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
170 local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
172 step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
174 step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
176 return step5.finalizeCompute().get(kmeans.init.centroids)
179 def initCentroids(data, method):
180 if method == kmeans.init.parallelPlusDense:
181 return initCentroids_parallelPlusDense(data)
182 if method == kmeans.init.plusPlusDense:
183 return initCentroids_plusPlusDense(data)
184 assert False,
"Unknown method for initCentroids"
187 def calculateCentroids(initialCentroids, data):
188 masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType)
190 nRows = initialCentroids.getNumberOfRows()
191 nCols = initialCentroids.getNumberOfColumns()
194 centroids = initialCentroids
195 objectiveFunction =
None
198 for it
in range(nIterations):
199 for i
in range(nBlocks):
201 localAlgorithm = kmeans.Distributed(step1Local, nClusters,
False, fptype=algorithmFPType)
204 localAlgorithm.input.set(kmeans.data, data[i])
205 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
207 masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
209 masterAlgorithm.compute()
210 res = masterAlgorithm.finalizeCompute()
212 centroids = res.get(kmeans.centroids)
213 objectiveFunction = res.get(kmeans.objectiveFunction)
216 for i
in range(nBlocks):
218 localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType)
221 localAlgorithm.input.set(kmeans.data, data[i])
222 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
224 assignments.append(localAlgorithm.compute().get(kmeans.assignments))
227 printNumericTable(assignments[0],
"First 10 cluster assignments from 1st node:", 10)
228 printNumericTable(centroids,
"First 10 dimensions of centroids:", 20, 10)
229 printNumericTable(objectiveFunction,
"Objective function value:")
232 def runKMeans(data, method, methodName):
233 print(
"K-means init parameters: method = " + str(methodName))
234 centroids = initCentroids(data, method=method)
235 calculateCentroids(centroids, data)
238 if __name__ ==
"__main__":
239 data = loadData(dataFileNames)
240 runKMeans(data, kmeans.init.plusPlusDense,
"plusPlusDense")
241 runKMeans(data, kmeans.init.parallelPlusDense,
"parallelPlusDense")