Python* API Reference for Intel® Data Analytics Acceleration Library 2019 Update 5

compression_online.py

Deprecation Notice: With the introduction of daal4py, a package that supersedes PyDAAL, Intel is deprecating PyDAAL and will discontinue support starting with Intel® DAAL 2021 and Intel® Distribution for Python 2021. Until then Intel will continue to provide compatible pyDAAL pip and conda packages for newer releases of Intel DAAL and make it available in open source. However, Intel will not add the new features of Intel DAAL to pyDAAL. Intel recommends developers switch to and use daal4py.

Note: To find daal4py examples, refer to daal4py documentation or browse github repository.

1 # file: compression_online.py
2 #===============================================================================
3 # Copyright 2014-2019 Intel Corporation.
4 #
5 # This software and the related documents are Intel copyrighted materials, and
6 # your use of them is governed by the express license under which they were
7 # provided to you (License). Unless the License provides otherwise, you may not
8 # use, modify, copy, publish, distribute, disclose or transmit this software or
9 # the related documents without Intel's prior written permission.
10 #
11 # This software and the related documents are provided as is, with no express
12 # or implied warranties, other than those that are expressly stated in the
13 # License.
14 #===============================================================================
15 
16 #
17 # ! Content:
18 # ! Python example of compression in the online processing mode
19 # !
20 # !*****************************************************************************
21 
22 #
23 ## <a name="DAAL-EXAMPLE-PY-COMPRESSION_ONLINE"></a>
24 ## \example compression_online.py
25 #
26 
27 import os
28 import sys
29 
30 if sys.version[0] == '2':
31  import Queue as Queue
32 else:
33  import queue as Queue
34 
35 import numpy as np
36 
37 from daal.data_management import Compressor_Zlib, Decompressor_Zlib, level9, DecompressionStream, CompressionStream
38 
39 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
40 if utils_folder not in sys.path:
41  sys.path.insert(0, utils_folder)
42 from utils import getCRC32, readTextFile
43 
44 datasetFileName = os.path.join('..', 'data', 'online', 'logitboost_train.csv')
45 
46 # Queue for sending and receiving compressed data blocks
47 # queue_DataBlock
48 sendReceiveQueue = Queue.Queue()
49 
50 maxDataBlockSize = 16384 # Maximum size of a data block
51 userDefinedBlockSize = 7000 # Size for read data from a decompression stream
52 
53 def getDataBlock(sentDataStream, availableDataSize):
54  cur_pos = sentDataStream.size - availableDataSize
55 
56  # return next slice of array and remaining datasize
57  if availableDataSize >= maxDataBlockSize:
58  return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
59  elif availableDataSize < maxDataBlockSize and availableDataSize > 0:
60  return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
61  return (None,None)
62 
63 
64 def sendDataBlock(block):
65  currentBlock = np.copy(block)
66  # Push the current compressed block to the queue
67  sendReceiveQueue.put(currentBlock)
68 
69 
70 def receiveDataBlock():
71  # Stop at the end of the queue
72  if sendReceiveQueue.empty():
73  return None
74  # Receive and copy the current compressed block from the queue
75  return np.copy(sendReceiveQueue.get())
76 
77 
78 def printCRC32(sentDataStream, receivedDataStream):
79  # Compute checksums for full input data and full received data
80  crcSentDataStream = getCRC32(sentDataStream)
81  crcReceivedDataStream = getCRC32(receivedDataStream)
82 
83  print("\nCompression example program results:\n")
84 
85  print("Input data checksum: 0x{:02X}".format(crcSentDataStream))
86  print("Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
87 
88  if sentDataStream.size != receivedDataStream.size:
89  print("ERROR: Received data size mismatches with the sent data size")
90 
91  elif crcSentDataStream != crcReceivedDataStream:
92  print("ERROR: Received data CRC mismatches with the sent data CRC")
93  else:
94  print("OK: Received data CRC matches with the sent data CRC")
95 
96 
97 if __name__ == "__main__":
98  # Read data from a file and allocate memory
99  sentDataStream = readTextFile(datasetFileName)
100 
101  # Create a compressor
102  compressor = Compressor_Zlib()
103  compressor.parameter.gzHeader = True
104  compressor.parameter.level = level9
105 
106  # Create a stream for compression
107  compressionStream = CompressionStream(compressor)
108 
109  # Receive data by blocks from sentDataStream for further compression and send it
110  (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, sentDataStream.size)
111  while uncompressedDataBlock is not None:
112  # Put a data block to compressionStream and compress if needed
113  compressionStream.push_back(uncompressedDataBlock)
114 
115  # Get access to compressed blocks stored in compressionStream without an actual compressed data copy
116  compressedBlocks = compressionStream.getCompressedBlocksCollection()
117 
118  # Send compressed blocks stored in compressionStream
119  for i in range(compressedBlocks.size()):
120  # Send the current compressed block from compressionStream
121  sendDataBlock(compressedBlocks[i].getArray())
122 
123  # Receive the next data block for compression
124  (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, availableDataSize)
125 
126  # Create a decompressor
127  decompressor = Decompressor_Zlib()
128  decompressor.parameter.gzHeader = True
129 
130  # Create a stream for decompression
131  decompressionStream = DecompressionStream(decompressor)
132 
133  # Actual size of decompressed data currently read from decompressionStream
134  readSize = 0
135 
136  # Received uncompressed data stream
137  receivedDataStream = np.empty(0, dtype=np.uint8)
138  tmp_block = np.empty(userDefinedBlockSize, dtype=np.uint8)
139 
140  # Receive compressed data by blocks
141  compressedDataBlock = receiveDataBlock()
142 
143  while compressedDataBlock is not None:
144  # Write a received block to decompressionStream
145  decompressionStream.push_back(compressedDataBlock)
146 
147  # Asynchronous read from decompressionStream
148  while True:
149  # Read userDefinedBlockSize bytes from decompressionStream to the end of receivedDataStream
150  readSize = decompressionStream.copyDecompressedArray(tmp_block)
151  if readSize == 0:
152  break
153  # Update the actual data size in receivedDataStream
154  receivedDataStream = np.concatenate((receivedDataStream, tmp_block[:readSize]))
155 
156  # Receive next block
157  compressedDataBlock = receiveDataBlock()
158 
159  # Compute and print checksums for sentDataStream and receivedDataStream
160  printCRC32(sentDataStream, receivedDataStream)

For more complete information about compiler optimizations, see our Optimization Notice.