55 from daal
import step1Local, step2Master, step3Master
56 import daal.algorithms.kmeans
as kmeans
57 import daal.algorithms.kmeans.init
as init
58 from daal
import step1Local, step2Master, step2Local, step3Master, step4Local, step5Master
59 from daal.data_management
import RowMergedNumericTable
61 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
62 if utils_folder
not in sys.path:
63 sys.path.insert(0, utils_folder)
64 from utils
import printNumericTable, createSparseTable
67 algorithmFPType = np.float32
71 nVectorsInBlock = 8000
73 DAAL_PREFIX = os.path.join(
'..',
'data',
'distributed')
74 dataFileNames = [os.path.join(DAAL_PREFIX,
'kmeans_csr_1.csv'),
75 os.path.join(DAAL_PREFIX,
'kmeans_csr_2.csv'),
76 os.path.join(DAAL_PREFIX,
'kmeans_csr_3.csv'),
77 os.path.join(DAAL_PREFIX,
'kmeans_csr_4.csv')]
81 return [createSparseTable(f, ntype=np.float32)
for f
in files]
84 def initStep1(data, method):
85 for i
in range(nBlocks):
87 local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
88 fptype=algorithmFPType, method=method)
89 local.input.set(kmeans.init.data, data[i])
90 pNewCenters = local.compute().get(kmeans.init.partialCentroids)
96 def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
98 for i
in range(len(data)):
99 step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
100 step2.input.set(kmeans.init.data, data[i])
101 step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
102 if not bFirstIteration:
103 step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
104 res = step2.compute()
106 localNodeData.append(res.getLocal(kmeans.init.internalResult))
107 step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
108 return step3.compute()
111 def initStep4(data, localNodeData, step3res, method):
113 for i
in range(0, len(data)):
115 step3Output = step3res.getOutput(kmeans.init.outputOfStep3ForStep4, i)
120 step4 = kmeans.init.Distributed(step4Local, nClusters, fptype=algorithmFPType, method=method)
122 step4.input.setInput(kmeans.init.data, data[i])
123 step4.input.setLocal(kmeans.init.internalInput, localNodeData[i])
124 step4.input.setStepInput(kmeans.init.inputOfStep4FromStep3, step3Output)
127 aRes.append(step4.compute().get(kmeans.init.outputOfStep4))
134 pMerged = RowMergedNumericTable()
136 pMerged.addNumericTable(r)
141 def initCentroids_plusPlusCSR(data):
145 pCentroids = RowMergedNumericTable()
147 pNewCentroids = initStep1(data, kmeans.init.plusPlusCSR)
148 pCentroids.addNumericTable(pNewCentroids)
151 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.plusPlusCSR)
152 for iCenter
in range(1, nClusters):
154 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iCenter == 1, method=kmeans.init.plusPlusCSR)
156 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.plusPlusCSR)
157 pCentroids.addNumericTable(pNewCentroids)
161 def initCentroids_parallelPlusCSR(data):
165 pNewCentroids = initStep1(data, method=kmeans.init.parallelPlusCSR)
168 step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
169 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
171 step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
172 for iRound
in range(step5.parameter.nRounds):
174 step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusCSR)
176 pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusCSR)
177 step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
180 for i
in range(nBlocks):
182 local = kmeans.init.Distributed(step2Local, nClusters,
False, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
183 local.parameter.outputForStep5Required =
True 185 local.input.setInput(kmeans.init.data, data[i])
186 local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
187 local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
189 step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
191 step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
193 return step5.finalizeCompute().get(kmeans.init.centroids)
196 def initCentroids(data, method):
197 if method == kmeans.init.parallelPlusCSR:
198 return initCentroids_parallelPlusCSR(data)
199 if method == kmeans.init.plusPlusCSR:
200 return initCentroids_plusPlusCSR(data)
201 assert False,
"Unknown method for initCentroids" 204 def calculateCentroids(initialCentroids, data):
205 masterAlgorithm = kmeans.Distributed(step2Master, nClusters, fptype=algorithmFPType, method=kmeans.lloydCSR)
207 nRows = initialCentroids.getNumberOfRows()
208 nCols = initialCentroids.getNumberOfColumns()
211 centroids = initialCentroids
212 objectiveFunction =
None 215 for it
in range(nIterations):
216 for i
in range(nBlocks):
218 localAlgorithm = kmeans.Distributed(step1Local, nClusters,
False, fptype=algorithmFPType, methods=kmeans.lloydCSR)
221 localAlgorithm.input.set(kmeans.data, data[i])
222 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
224 masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
226 masterAlgorithm.compute()
227 res = masterAlgorithm.finalizeCompute()
229 centroids = res.get(kmeans.centroids)
230 objectiveFunction = res.get(kmeans.objectiveFunction)
233 for i
in range(nBlocks):
235 localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType, method=kmeans.lloydCSR)
238 localAlgorithm.input.set(kmeans.data, data[i])
239 localAlgorithm.input.set(kmeans.inputCentroids, centroids)
241 assignments.append(localAlgorithm.compute().get(kmeans.assignments))
244 printNumericTable(assignments[0],
"First 10 cluster assignments from 1st node:", 10)
245 printNumericTable(centroids,
"First 10 dimensions of centroids:", 20, 10)
246 printNumericTable(objectiveFunction,
"Objective function value:")
249 def runKMeans(data, method, methodName):
250 print(
"K-means init parameters: method = " + str(methodName))
251 centroids = initCentroids(data, method=method)
252 calculateCentroids(centroids, data)
255 if __name__ ==
"__main__":
256 data = loadData(dataFileNames)
257 runKMeans(data, kmeans.init.plusPlusCSR,
"plusPlusCSR")
258 runKMeans(data, kmeans.init.parallelPlusCSR,
"parallelPlusCSR")