29 from daal
import step1Local, step2Master, step3Master
30 import daal.algorithms.kmeans
as kmeans
31 import daal.algorithms.kmeans.init
as init
32 from daal
import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
33 from daal.data_management
import RowMergedNumericTable
35 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
36 if utils_folder
not in sys.path:
37 sys.path.insert(0, utils_folder)
38 from utils
import printNumericTable, createSparseTable
41 algorithmFPType = np.float32
45 nVectorsInBlock = 8000
47 DAAL_PREFIX = os.path.join(
'..',
'data',
'distributed')
48 dataFileNames = [os.path.join(DAAL_PREFIX,
'kmeans_csr_1.csv'),
49 os.path.join(DAAL_PREFIX,
'kmeans_csr_2.csv'),
50 os.path.join(DAAL_PREFIX,
'kmeans_csr_3.csv'),
51 os.path.join(DAAL_PREFIX,
'kmeans_csr_4.csv')]
55 return [createSparseTable(f, ntype=np.float32)
for f
in files]
58 def initStep1(data, method):
59 for i
in range(nBlocks):
61 local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
62 fptype=algorithmFPType, method=method)
63 local.input.set(kmeans.init.data, data[i])
64 pNewCenters = local.compute().get(kmeans.init.partialCentroids)
70 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
72 for i
in range(len(data)):
73 step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
74 step2.input.set(kmeans.init.data, data[i])
75 step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
76 if not bFirstIteration:
77 step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
80 localNodeData.append(res.getLocal(kmeans.init.internalResult))
81 step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
82 return step3.compute()
85 def initStep4(data, localNodeData, step3res, method):
87 for i
in range(0, len(data)):
89 step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
94 step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
96 step4.input.setInput(kmeans.init.data, data[i])
97 step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
98 step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
101 aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
108 pMerged = RowMergedNumericTable()
110 pMerged.addNumericTable(r)
115 def initCentroids_plusPlusCSR(data):
119 pCentroids = RowMergedNumericTable()
121 pNewCentroids = initStep1(data, kmeans.init.plusPlusCSR)
122 pCentroids.addNumericTable(pNewCentroids)
125 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusCSR)
126 for iCenter
in range(1, nClusters):
128 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusCSR)
130 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusCSR)
131 pCentroids.addNumericTable(pNewCentroids)
135 def initCentroids_parallelPlusCSR(data):
139 pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusCSR)
142 step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
143 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
145 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
146 for iRound
in range(step5.parameter.nRounds):
148 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusCSR)
150 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusCSR)
151 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
154 for i
in range(nBlocks):
156 local = kmeans.init.Distributed(step2Local, nClusters,
False, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
157 local.parameter.outputForStep5Required =
True
159 local.input.setInput(kmeans.init.data, data[i])
160 local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
161 local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
163 step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
165 step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
167 return step5.finalizeCompute().get(kmeans.init.centroids)
170 def initCentroids(data, method):
171 if method == kmeans.init.parallelPlusCSR:
172 return initCentroids_parallelPlusCSR(data)
173 if method == kmeans.init.plusPlusCSR:
174 return initCentroids_plusPlusCSR(data)
175 assert False,
"Unknown method for initCentroids"
178 def calculateCentroids(initialCentroids, data):
179 masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType, method=kmeans.lloydCSR)
181 nRows = initialCentroids.getNumberOfRows()
182 nCols = initialCentroids.getNumberOfColumns()
185 centroids = initialCentroids
186 objectiveFunction =
None
189 for it
in range(nIterations):
190 for i
in range(nBlocks):
192 localAlgorithm = kmeans.Distributed(step1Local, nClusters,
False, fptype=algorithmFPType, methods=kmeans.lloydCSR)
195 localAlgorithm.input.set(kmeans.data, data[i])
196 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
198 masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
200 masterAlgorithm.compute()
201 res = masterAlgorithm.finalizeCompute()
203 centroids = res.get(kmeans.centroids)
204 objectiveFunction = res.get(kmeans.objectiveFunction)
207 for i
in range(nBlocks):
209 localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType, method=kmeans.lloydCSR)
212 localAlgorithm.input.set(kmeans.data, data[i])
213 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
215 assignments.append(localAlgorithm.compute().get(kmeans.assignments))
218 printNumericTable(assignments[0],
"First 10 cluster assignments from 1st node:", 10)
219 printNumericTable(centroids,
"First 10 dimensions of centroids:", 20, 10)
220 printNumericTable(objectiveFunction,
"Objective function value:")
223 def runKMeans(data, method, methodName):
224 print(
"K-means init parameters: method = " + str(methodName))
225 centroids = initCentroids(data, method=method)
226 calculateCentroids(centroids, data)
229 if __name__ ==
"__main__":
230 data = loadData(dataFileNames)
231 runKMeans(data, kmeans.init.plusPlusCSR,
"plusPlusCSR")
232 runKMeans(data, kmeans.init.parallelPlusCSR,
"parallelPlusCSR")