55 from daal
import step1Local, step2Master, step3Master
56 import daal.algorithms.kmeans
as kmeans
57 import daal.algorithms.kmeans.init
as init
58 from daal
import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
59 from daal.data_management
import FileDataSource, DataSource, RowMergedNumericTable
61 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
62 if utils_folder
not in sys.path:
63 sys.path.insert(0, utils_folder)
64 from utils
import printNumericTable, createSparseTable
67 algorithmFPType = np.float32
71 nVectorsInBlock = 2500
73 DAAL_PREFIX = os.path.join(
'..',
'data',
'distributed')
74 dataFileNames = [os.path.join(DAAL_PREFIX,
'kmeans_dense_1.csv'),
75 os.path.join(DAAL_PREFIX,
'kmeans_dense_2.csv'),
76 os.path.join(DAAL_PREFIX,
'kmeans_dense_3.csv'),
77 os.path.join(DAAL_PREFIX,
'kmeans_dense_4.csv')]
82 for i
in range(nBlocks):
84 dataSource = FileDataSource(files[i],
85 DataSource.doAllocateNumericTable,
86 DataSource.doDictionaryFromContext)
88 dataSource.loadDataBlock()
89 data.append(dataSource.getNumericTable())
93 def initStep1(data, method):
94 for i
in range(nBlocks):
96 local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
97 fptype=algorithmFPType, method=method)
98 local.input.set(kmeans.init.data, data[i])
99 pNewCenters = local.compute().get(kmeans.init.partialCentroids)
105 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
107 for i
in range(len(data)):
108 step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
109 step2.input.set(kmeans.init.data, data[i])
110 step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
111 if not bFirstIteration:
112 step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
113 res = step2.compute()
115 localNodeData.append(res.getLocal(kmeans.init.internalResult))
116 step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
117 return step3.compute()
120 def initStep4(data, localNodeData, step3res, method):
122 for i
in range(0, len(data)):
124 step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
129 step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
131 step4.input.setInput(kmeans.init.data, data[i])
132 step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
133 step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
136 aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
143 pMerged = RowMergedNumericTable()
145 pMerged.addNumericTable(r)
150 def initCentroids_plusPlusDense(data):
154 pCentroids = RowMergedNumericTable()
156 pNewCentroids = initStep1(data, kmeans.init.plusPlusDense)
157 pCentroids.addNumericTable(pNewCentroids)
160 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusDense)
161 for iCenter
in range(1, nClusters):
163 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusDense)
165 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusDense)
166 pCentroids.addNumericTable(pNewCentroids)
170 def initCentroids_parallelPlusDense(data):
174 pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusDense)
177 step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
178 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
180 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
181 for iRound
in range(step5.parameter.nRounds):
183 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusDense)
185 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusDense)
186 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
189 for i
in range(nBlocks):
191 local = kmeans.init.Distributed(step2Local, nClusters,
False, fptype=algorithmFPType, method=kmeans.init.parallelPlusDense)
192 local.parameter.outputForStep5Required =
True 194 local.input.setInput(kmeans.init.data, data[i])
195 local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
196 local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
198 step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
200 step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
202 return step5.finalizeCompute().get(kmeans.init.centroids)
205 def initCentroids(data, method):
206 if method == kmeans.init.parallelPlusDense:
207 return initCentroids_parallelPlusDense(data)
208 if method == kmeans.init.plusPlusDense:
209 return initCentroids_plusPlusDense(data)
210 assert False,
"Unknown method for initCentroids" 213 def calculateCentroids(initialCentroids, data):
214 masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType)
216 nRows = initialCentroids.getNumberOfRows()
217 nCols = initialCentroids.getNumberOfColumns()
220 centroids = initialCentroids
221 objectiveFunction =
None 224 for it
in range(nIterations):
225 for i
in range(nBlocks):
227 localAlgorithm = kmeans.Distributed(step1Local, nClusters,
False, fptype=algorithmFPType)
230 localAlgorithm.input.set(kmeans.data, data[i])
231 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
233 masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
235 masterAlgorithm.compute()
236 res = masterAlgorithm.finalizeCompute()
238 centroids = res.get(kmeans.centroids)
239 objectiveFunction = res.get(kmeans.objectiveFunction)
242 for i
in range(nBlocks):
244 localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType)
247 localAlgorithm.input.set(kmeans.data, data[i])
248 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
250 assignments.append(localAlgorithm.compute().get(kmeans.assignments))
253 printNumericTable(assignments[0],
"First 10 cluster assignments from 1st node:", 10)
254 printNumericTable(centroids,
"First 10 dimensions of centroids:", 20, 10)
255 printNumericTable(objectiveFunction,
"Objective function value:")
258 def runKMeans(data, method, methodName):
259 print(
"K-means init parameters: method = " + str(methodName))
260 centroids = initCentroids(data, method=method)
261 calculateCentroids(centroids, data)
264 if __name__ ==
"__main__":
265 data = loadData(dataFileNames)
266 runKMeans(data, kmeans.init.plusPlusDense,
"plusPlusDense")
267 runKMeans(data, kmeans.init.parallelPlusDense,
"parallelPlusDense")