57 from daal
import step1Local, step2Master
58 from daal.algorithms.neural_networks
import initializers
59 from daal.algorithms.neural_networks
import layers
60 from daal.algorithms
import optimization_solver
61 from daal.algorithms.neural_networks
import prediction, training
62 from daal.data_management
import NumericTable, HomogenNumericTable, readOnly, SubtensorDescriptor, HomogenTensor
64 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
65 if utils_folder
not in sys.path:
66 sys.path.insert(0, utils_folder)
67 from utils
import printTensors, readTensorFromCSV
70 trainDatasetFileNames = [
71 os.path.join(
"..",
"data",
"distributed",
"neural_network_train_dense_1.csv"),
72 os.path.join(
"..",
"data",
"distributed",
"neural_network_train_dense_2.csv"),
73 os.path.join(
"..",
"data",
"distributed",
"neural_network_train_dense_3.csv"),
74 os.path.join(
"..",
"data",
"distributed",
"neural_network_train_dense_4.csv")
76 trainGroundTruthFileNames = [
77 os.path.join(
"..",
"data",
"distributed",
"neural_network_train_ground_truth_1.csv"),
78 os.path.join(
"..",
"data",
"distributed",
"neural_network_train_ground_truth_2.csv"),
79 os.path.join(
"..",
"data",
"distributed",
"neural_network_train_ground_truth_3.csv"),
80 os.path.join(
"..",
"data",
"distributed",
"neural_network_train_ground_truth_4.csv")
83 testDatasetFile = os.path.join(
"..",
"data",
"batch",
"neural_network_test.csv")
84 testGroundTruthFile = os.path.join(
"..",
"data",
"batch",
"neural_network_test_ground_truth.csv")
88 batchSizeLocal = int(batchSize / nNodes)
95 fullyConnectedLayer1 = layers.fullyconnected.Batch(20)
96 fullyConnectedLayer1.parameter.weightsInitializer = initializers.uniform.Batch(-0.001, 0.001)
97 fullyConnectedLayer1.parameter.biasesInitializer = initializers.uniform.Batch(0, 0.5)
100 fullyConnectedLayer2 = layers.fullyconnected.Batch(m2)
101 fullyConnectedLayer2.parameter.weightsInitializer = initializers.uniform.Batch(0.5, 1)
102 fullyConnectedLayer2.parameter.biasesInitializer = initializers.uniform.Batch(0.5, 1)
105 fullyConnectedLayer3 = layers.fullyconnected.Batch(2)
106 fullyConnectedLayer3.parameter.weightsInitializer = initializers.uniform.Batch(-0.005, 0.005)
107 fullyConnectedLayer3.parameter.biasesInitializer = initializers.uniform.Batch(0, 1)
110 softmaxCrossEntropyLayer = layers.loss.softmax_cross.Batch()
113 topology = training.Topology()
116 fc1 = topology.add(fullyConnectedLayer1)
117 fc2 = topology.add(fullyConnectedLayer2)
118 fc3 = topology.add(fullyConnectedLayer3)
119 sm = topology.add(softmaxCrossEntropyLayer)
120 topology.get(fc1).addNext(fc2)
121 topology.get(fc2).addNext(fc3)
122 topology.get(fc3).addNext(sm)
127 def getNextSubtensor(inputTensor, startPos, nElements):
128 dims = inputTensor.getDimensions()
131 subtensorBlock = SubtensorDescriptor(ntype=np.float32)
132 inputTensor.getSubtensor([], startPos, nElements, readOnly, subtensorBlock)
133 subtensorData = np.array(subtensorBlock.getArray(), dtype=np.float32)
134 inputTensor.releaseSubtensor(subtensorBlock)
136 return HomogenTensor(subtensorData, ntype=np.float32)
139 def initializeNetwork():
140 trainingData = [
None] * nNodes
141 trainingGroundTruth = [
None] * nNodes
143 for node
in range(nNodes):
144 trainingData[node] = readTensorFromCSV(trainDatasetFileNames[node])
145 trainingGroundTruth[node] = readTensorFromCSV(trainGroundTruthFileNames[node],
True)
147 sampleSize = trainingData[0].getDimensions()
148 sampleSize[0] = batchSizeLocal
151 sgdAlgorithm = optimization_solver.sgd.Batch(fptype=np.float32)
152 sgdAlgorithm.parameter.batchSize = batchSizeLocal
155 topologyMaster = configureNet()
156 net = training.Distributed(step2Master, sgdAlgorithm)
157 net.parameter.batchSize = batchSizeLocal
160 net.initialize(sampleSize, topologyMaster)
162 topology = [
None] * nNodes
163 netLocal = [
None] * nNodes
164 for node
in range(nNodes):
166 topology[node] = configureNet()
169 trainingModel = training.Model()
170 trainingModel.initialize_Float32(sampleSize, topology[node])
172 netLocal[node] = training.Distributed(step1Local)
173 netLocal[node].input.setStep1LocalInput(training.inputModel, trainingModel)
176 netLocal[node].parameter.batchSize = batchSizeLocal
178 return (net, netLocal, trainingData, trainingGroundTruth)
181 def trainModel(net, netLocal, trainingData, trainingGroundTruth):
183 sgdAlgorithm = optimization_solver.sgd.Batch(fptype=np.float32)
187 sgdAlgorithm.parameter.learningRateSequence = HomogenNumericTable(1, 1, NumericTable.doAllocate, learningRate)
190 net.parameter.optimizationSolver = sgdAlgorithm
193 nSamples = trainingData[0].getDimensions()[0]
194 for i
in range(0, nSamples - batchSizeLocal + 1, batchSizeLocal):
196 for node
in range(nNodes):
198 netLocal[node].input.setInput(training.data, getNextSubtensor(trainingData[node], i, batchSizeLocal))
199 netLocal[node].input.setInput(training.groundTruth, getNextSubtensor(trainingGroundTruth[node], i, batchSizeLocal))
202 pres = netLocal[node].compute()
205 net.input.add(training.partialResults, node, pres)
209 wb = net.getPartialResult().get(training.resultFromMaster).get(training.model).getWeightsAndBiases()
212 for node
in range(nNodes):
213 netLocal[node].input.getStep1LocalInput(training.inputModel).setWeightsAndBiases(wb)
216 res = net.finalizeCompute()
219 return res.get(training.model).getPredictionModel_Float32()
222 def testModel(predictionModel):
224 predictionData = readTensorFromCSV(testDatasetFile)
227 net = prediction.Batch()
230 net.parameter.batchSize = predictionData.getDimensionSize(0)
233 net.input.setModelInput(prediction.model, predictionModel)
234 net.input.setTensorInput(prediction.data, predictionData)
240 def printResults(testGroundTruthFile, predictionResult):
242 predictionGroundTruth = readTensorFromCSV(testGroundTruthFile)
244 printTensors(predictionGroundTruth, predictionResult.getResult(prediction.prediction),
245 "Ground truth",
"Neural network predictions: each class probability",
246 "Neural network classification results (first 20 observations):", 20)
250 init = initializeNetwork()
251 predictionModel = trainModel(*init)
252 predictionResult = testModel(predictionModel)
253 printResults(testGroundTruthFile, predictionResult)
256 if __name__ ==
"__main__":