56 if sys.version[0] ==
'2':
63 from daal.data_management
import Compressor_Zlib, Decompressor_Zlib, level9, DecompressionStream, CompressionStream
65 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
66 if utils_folder
not in sys.path:
67 sys.path.insert(0, utils_folder)
68 from utils
import getCRC32, readTextFile
70 datasetFileName = os.path.join(
'..',
'data',
'online',
'logitboost_train.csv')
74 sendReceiveQueue = Queue.Queue()
76 maxDataBlockSize = 16384
77 userDefinedBlockSize = 7000
79 def getDataBlock(sentDataStream, availableDataSize):
80 cur_pos = sentDataStream.size - availableDataSize
83 if availableDataSize >= maxDataBlockSize:
84 return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
85 elif availableDataSize < maxDataBlockSize
and availableDataSize > 0:
86 return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
90 def sendDataBlock(block):
91 currentBlock = np.copy(block)
93 sendReceiveQueue.put(currentBlock)
96 def receiveDataBlock():
98 if sendReceiveQueue.empty():
101 return np.copy(sendReceiveQueue.get())
104 def printCRC32(sentDataStream, receivedDataStream):
106 crcSentDataStream = getCRC32(sentDataStream)
107 crcReceivedDataStream = getCRC32(receivedDataStream)
109 print(
"\nCompression example program results:\n")
111 print(
"Input data checksum: 0x{:02X}".format(crcSentDataStream))
112 print(
"Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
114 if sentDataStream.size != receivedDataStream.size:
115 print(
"ERROR: Received data size mismatches with the sent data size")
117 elif crcSentDataStream != crcReceivedDataStream:
118 print(
"ERROR: Received data CRC mismatches with the sent data CRC")
120 print(
"OK: Received data CRC matches with the sent data CRC")
123 if __name__ ==
"__main__":
125 sentDataStream = readTextFile(datasetFileName)
128 compressor = Compressor_Zlib()
129 compressor.parameter.gzHeader =
True
130 compressor.parameter.level = level9
133 compressionStream = CompressionStream(compressor)
136 (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, sentDataStream.size)
137 while uncompressedDataBlock
is not None:
139 compressionStream.push_back(uncompressedDataBlock)
142 compressedBlocks = compressionStream.getCompressedBlocksCollection()
145 for i
in range(compressedBlocks.size()):
147 sendDataBlock(compressedBlocks[i].getArray())
150 (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, availableDataSize)
153 decompressor = Decompressor_Zlib()
154 decompressor.parameter.gzHeader =
True
157 decompressionStream = DecompressionStream(decompressor)
163 receivedDataStream = np.empty(0, dtype=np.uint8)
164 tmp_block = np.empty(userDefinedBlockSize, dtype=np.uint8)
167 compressedDataBlock = receiveDataBlock()
169 while compressedDataBlock
is not None:
171 decompressionStream.push_back(compressedDataBlock)
176 readSize = decompressionStream.copyDecompressedArray(tmp_block)
180 receivedDataStream = np.concatenate((receivedDataStream, tmp_block[:readSize]))
183 compressedDataBlock = receiveDataBlock()
186 printCRC32(sentDataStream, receivedDataStream)