30 if sys.version[0] ==
'2':
37 from daal.data_management
import Compressor_Zlib, Decompressor_Zlib, level9, DecompressionStream, CompressionStream
39 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
40 if utils_folder
not in sys.path:
41 sys.path.insert(0, utils_folder)
42 from utils
import getCRC32, readTextFile
44 datasetFileName = os.path.join(
'..',
'data',
'online',
'logitboost_train.csv')
48 sendReceiveQueue = Queue.Queue()
50 maxDataBlockSize = 16384
51 userDefinedBlockSize = 7000
53 def getDataBlock(sentDataStream, availableDataSize):
54 cur_pos = sentDataStream.size - availableDataSize
57 if availableDataSize >= maxDataBlockSize:
58 return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
59 elif availableDataSize < maxDataBlockSize
and availableDataSize > 0:
60 return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
64 def sendDataBlock(block):
65 currentBlock = np.copy(block)
67 sendReceiveQueue.put(currentBlock)
70 def receiveDataBlock():
72 if sendReceiveQueue.empty():
75 return np.copy(sendReceiveQueue.get())
78 def printCRC32(sentDataStream, receivedDataStream):
80 crcSentDataStream = getCRC32(sentDataStream)
81 crcReceivedDataStream = getCRC32(receivedDataStream)
83 print(
"\nCompression example program results:\n")
85 print(
"Input data checksum: 0x{:02X}".format(crcSentDataStream))
86 print(
"Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
88 if sentDataStream.size != receivedDataStream.size:
89 print(
"ERROR: Received data size mismatches with the sent data size")
91 elif crcSentDataStream != crcReceivedDataStream:
92 print(
"ERROR: Received data CRC mismatches with the sent data CRC")
94 print(
"OK: Received data CRC matches with the sent data CRC")
97 if __name__ ==
"__main__":
99 sentDataStream = readTextFile(datasetFileName)
102 compressor = Compressor_Zlib()
103 compressor.parameter.gzHeader =
True
104 compressor.parameter.level = level9
107 compressionStream = CompressionStream(compressor)
110 (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, sentDataStream.size)
111 while uncompressedDataBlock
is not None:
113 compressionStream.push_back(uncompressedDataBlock)
116 compressedBlocks = compressionStream.getCompressedBlocksCollection()
119 for i
in range(compressedBlocks.size()):
121 sendDataBlock(compressedBlocks[i].getArray())
124 (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, availableDataSize)
127 decompressor = Decompressor_Zlib()
128 decompressor.parameter.gzHeader =
True
131 decompressionStream = DecompressionStream(decompressor)
137 receivedDataStream = np.empty(0, dtype=np.uint8)
138 tmp_block = np.empty(userDefinedBlockSize, dtype=np.uint8)
141 compressedDataBlock = receiveDataBlock()
143 while compressedDataBlock
is not None:
145 decompressionStream.push_back(compressedDataBlock)
150 readSize = decompressionStream.copyDecompressedArray(tmp_block)
154 receivedDataStream = np.concatenate((receivedDataStream, tmp_block[:readSize]))
157 compressedDataBlock = receiveDataBlock()
160 printCRC32(sentDataStream, receivedDataStream)