Python* API Reference for Intel® Data Analytics Acceleration Library 2019 Update 5

compressor.py

Deprecation Notice: With the introduction of daal4py, a package that supersedes PyDAAL, Intel is deprecating PyDAAL and will discontinue support starting with Intel® DAAL 2021 and Intel® Distribution for Python 2021. Until then Intel will continue to provide compatible pyDAAL pip and conda packages for newer releases of Intel DAAL and make it available in open source. However, Intel will not add the new features of Intel DAAL to pyDAAL. Intel recommends developers switch to and use daal4py.

Note: To find daal4py examples, refer to daal4py documentation or browse github repository.

1 # file: compressor.py
2 #===============================================================================
3 # Copyright 2014-2019 Intel Corporation.
4 #
5 # This software and the related documents are Intel copyrighted materials, and
6 # your use of them is governed by the express license under which they were
7 # provided to you (License). Unless the License provides otherwise, you may not
8 # use, modify, copy, publish, distribute, disclose or transmit this software or
9 # the related documents without Intel's prior written permission.
10 #
11 # This software and the related documents are provided as is, with no express
12 # or implied warranties, other than those that are expressly stated in the
13 # License.
14 #===============================================================================
15 
16 #
17 # ! Content:
18 # ! Python example of a compressor
19 # !
20 # !*****************************************************************************
21 
22 #
23 ## <a name="DAAL-EXAMPLE-PY-COMPRESSOR"></a>
24 ## \example compressor.py
25 #
26 
27 import os
28 import sys
29 
30 if sys.version[0] == '2':
31  import Queue as Queue
32 else:
33  import queue as Queue
34 
35 import numpy as np
36 
37 from daal.data_management import Compressor_Zlib, Decompressor_Zlib
38 
39 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
40 if utils_folder not in sys.path:
41  sys.path.insert(0, utils_folder)
42 from utils import getCRC32, readTextFile
43 
44 datasetFileName = os.path.join('..', 'data', 'batch', 'logitboost_train.csv')
45 
46 # Queue for sending and receiving compressed data blocks
47 sendReceiveQueue = Queue.Queue()
48 
49 maxDataBlockSize = 16384 # Maximum size of a data block
50 
51 def getUncompressedDataBlock(sentDataStream, availableDataSize):
52  cur_pos = sentDataStream.size - availableDataSize
53 
54  # return next slice of array and remaining datasize
55  if availableDataSize >= maxDataBlockSize:
56  return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
57  elif availableDataSize < maxDataBlockSize and availableDataSize > 0:
58  return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
59  return (None,None)
60 
61 
62 def sendCompressedDataBlock(block):
63  currentBlock = np.copy(block)
64  # Push the current compressed block to the queue
65  sendReceiveQueue.put(currentBlock)
66 
67 
68 def receiveCompressedDataBlock():
69  # Stop at the end of the queue
70  if sendReceiveQueue.empty():
71  return None
72  # Receive and copy the current compressed block from the queue
73  return np.copy(sendReceiveQueue.get())
74 
75 
76 def printCRC32(sentDataStream, receivedDataStream):
77  # Compute checksums for full input data and full received data
78  crcSentDataStream = getCRC32(sentDataStream)
79  crcReceivedDataStream = getCRC32(receivedDataStream)
80 
81  print("\nCompression example program results:\n")
82 
83  print("Input data checksum: 0x{:02X}".format(crcSentDataStream))
84  print("Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
85 
86  if sentDataStream.size != receivedDataStream.size:
87  print("ERROR: Received data size mismatches with the sent data size")
88 
89  elif crcSentDataStream != crcReceivedDataStream:
90  print("ERROR: Received data CRC mismatches with the sent data CRC")
91  else:
92  print("OK: Received data CRC matches with the sent data CRC")
93 
94 
95 if __name__ == "__main__":
96 
97  # Read data from a file
98  sentDataStream = readTextFile(datasetFileName)
99 
100  # Allocate buffers for compressed and received data
101  compressedDataBlock = np.empty(maxDataBlockSize, dtype=np.uint8)
102  receivedDataStream = np.empty(sentDataStream.size, dtype=np.uint8)
103 
104  # Create a compressor
105  compressor = Compressor_Zlib()
106 
107  # Receive the next data block for compression
108  (uncompressedDataBlock, availableDataSize) = getUncompressedDataBlock(sentDataStream, sentDataStream.size)
109  while uncompressedDataBlock is not None:
110  # Associate data to compress with the compressor
111  compressor.setInputDataBlock(uncompressedDataBlock, 0)
112 
113  # Memory for a compressed block might not be enough to compress the input block at once
114  while True:
115  # Compress uncompressedDataBlock to compressedDataBlock
116  compressor.run(compressedDataBlock, 0)
117 
118  # Get the actual size of a compressed block
119  compressedDataView = compressedDataBlock[0:compressor.getUsedOutputDataBlockSize()]
120 
121  # Send the current compressed block
122  sendCompressedDataBlock(compressedDataView)
123 
124  # Check if an additional data block is needed to complete compression
125  if not compressor.isOutputDataBlockFull():
126  break
127 
128  # Receive the next data block for compression
129  (uncompressedDataBlock, availableDataSize) = getUncompressedDataBlock(sentDataStream, availableDataSize)
130 
131  # Create a decompressor
132  decompressor = Decompressor_Zlib()
133 
134  # Receive compressed data by blocks
135  compressedDataBlock = receiveCompressedDataBlock()
136  offset = 0
137 
138  while compressedDataBlock is not None:
139  # Associate compressed data with the decompressor
140  decompressor.setInputDataBlock(compressedDataBlock, 0)
141 
142  # Decompress an incoming block to the end of receivedDataStream
143  decompressor.run(receivedDataStream, offset)
144 
145  # Update the size of actual data in receivedDataStream
146  offset += decompressor.getUsedOutputDataBlockSize()
147 
148  # Receive next block
149  compressedDataBlock = receiveCompressedDataBlock()
150 
151  # Compute and print checksums for sentDataStream and receivedDataStream
152  printCRC32(sentDataStream, receivedDataStream)

For more complete information about compiler optimizations, see our Optimization Notice.