Python* API Reference for Intel® Data Analytics Acceleration Library 2019 Update 4

compressor.py

1 # file: compressor.py
2 #===============================================================================
3 # Copyright 2014-2019 Intel Corporation.
4 #
5 # This software and the related documents are Intel copyrighted materials, and
6 # your use of them is governed by the express license under which they were
7 # provided to you (License). Unless the License provides otherwise, you may not
8 # use, modify, copy, publish, distribute, disclose or transmit this software or
9 # the related documents without Intel's prior written permission.
10 #
11 # This software and the related documents are provided as is, with no express
12 # or implied warranties, other than those that are expressly stated in the
13 # License.
14 #===============================================================================
15 
16 #
17 # ! Content:
18 # ! Python example of a compressor
19 # !
20 # !*****************************************************************************
21 
22 #
23 
24 
25 #
26 
27 import os
28 import sys
29 
30 if sys.version[0] == '2':
31  import Queue as Queue
32 else:
33  import queue as Queue
34 
35 import numpy as np
36 
37 from daal.data_management import Compressor_Zlib, Decompressor_Zlib
38 
39 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
40 if utils_folder not in sys.path:
41  sys.path.insert(0, utils_folder)
42 from utils import getCRC32, readTextFile
43 
44 datasetFileName = os.path.join('..', 'data', 'batch', 'logitboost_train.csv')
45 
46 # Queue for sending and receiving compressed data blocks
47 sendReceiveQueue = Queue.Queue()
48 
49 maxDataBlockSize = 16384 # Maximum size of a data block
50 
51 def getUncompressedDataBlock(sentDataStream, availableDataSize):
52  cur_pos = sentDataStream.size - availableDataSize
53 
54  # return next slice of array and remaining datasize
55  if availableDataSize >= maxDataBlockSize:
56  return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
57  elif availableDataSize < maxDataBlockSize and availableDataSize > 0:
58  return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
59  return (None,None)
60 
61 
62 def sendCompressedDataBlock(block):
63  currentBlock = np.copy(block)
64  # Push the current compressed block to the queue
65  sendReceiveQueue.put(currentBlock)
66 
67 
68 def receiveCompressedDataBlock():
69  # Stop at the end of the queue
70  if sendReceiveQueue.empty():
71  return None
72  # Receive and copy the current compressed block from the queue
73  return np.copy(sendReceiveQueue.get())
74 
75 
76 def printCRC32(sentDataStream, receivedDataStream):
77  # Compute checksums for full input data and full received data
78  crcSentDataStream = getCRC32(sentDataStream)
79  crcReceivedDataStream = getCRC32(receivedDataStream)
80 
81  print("\nCompression example program results:\n")
82 
83  print("Input data checksum: 0x{:02X}".format(crcSentDataStream))
84  print("Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
85 
86  if sentDataStream.size != receivedDataStream.size:
87  print("ERROR: Received data size mismatches with the sent data size")
88 
89  elif crcSentDataStream != crcReceivedDataStream:
90  print("ERROR: Received data CRC mismatches with the sent data CRC")
91  else:
92  print("OK: Received data CRC matches with the sent data CRC")
93 
94 
95 if __name__ == "__main__":
96 
97  # Read data from a file
98  sentDataStream = readTextFile(datasetFileName)
99 
100  # Allocate buffers for compressed and received data
101  compressedDataBlock = np.empty(maxDataBlockSize, dtype=np.uint8)
102  receivedDataStream = np.empty(sentDataStream.size, dtype=np.uint8)
103 
104  # Create a compressor
105  compressor = Compressor_Zlib()
106 
107  # Receive the next data block for compression
108  (uncompressedDataBlock, availableDataSize) = getUncompressedDataBlock(sentDataStream, sentDataStream.size)
109  while uncompressedDataBlock is not None:
110  # Associate data to compress with the compressor
111  compressor.setInputDataBlock(uncompressedDataBlock, 0)
112 
113  # Memory for a compressed block might not be enough to compress the input block at once
114  while True:
115  # Compress uncompressedDataBlock to compressedDataBlock
116  compressor.run(compressedDataBlock, 0)
117 
118  # Get the actual size of a compressed block
119  compressedDataView = compressedDataBlock[0:compressor.getUsedOutputDataBlockSize()]
120 
121  # Send the current compressed block
122  sendCompressedDataBlock(compressedDataView)
123 
124  # Check if an additional data block is needed to complete compression
125  if not compressor.isOutputDataBlockFull():
126  break
127 
128  # Receive the next data block for compression
129  (uncompressedDataBlock, availableDataSize) = getUncompressedDataBlock(sentDataStream, availableDataSize)
130 
131  # Create a decompressor
132  decompressor = Decompressor_Zlib()
133 
134  # Receive compressed data by blocks
135  compressedDataBlock = receiveCompressedDataBlock()
136  offset = 0
137 
138  while compressedDataBlock is not None:
139  # Associate compressed data with the decompressor
140  decompressor.setInputDataBlock(compressedDataBlock, 0)
141 
142  # Decompress an incoming block to the end of receivedDataStream
143  decompressor.run(receivedDataStream, offset)
144 
145  # Update the size of actual data in receivedDataStream
146  offset += decompressor.getUsedOutputDataBlockSize()
147 
148  # Receive next block
149  compressedDataBlock = receiveCompressedDataBlock()
150 
151  # Compute and print checksums for sentDataStream and receivedDataStream
152  printCRC32(sentDataStream, receivedDataStream)

For more complete information about compiler optimizations, see our Optimization Notice.