Python* API Reference for Intel® Data Analytics Acceleration Library 2019

compression_online.py

1 # file: compression_online.py
2 #===============================================================================
3 # Copyright 2014-2018 Intel Corporation.
4 #
5 # This software and the related documents are Intel copyrighted materials, and
6 # your use of them is governed by the express license under which they were
7 # provided to you (License). Unless the License provides otherwise, you may not
8 # use, modify, copy, publish, distribute, disclose or transmit this software or
9 # the related documents without Intel's prior written permission.
10 #
11 # This software and the related documents are provided as is, with no express
12 # or implied warranties, other than those that are expressly stated in the
13 # License.
14 #===============================================================================
15 
16 #
17 # ! Content:
18 # ! Python example of compression in the online processing mode
19 # !
20 # !*****************************************************************************
21 
22 #
23 ## <a name="DAAL-EXAMPLE-PY-COMPRESSION_ONLINE"></a>
24 ## \example compression_online.py
25 #
26 
27 import os
28 import sys
29 
30 if sys.version[0] == '2':
31  import Queue as Queue
32 else:
33  import queue as Queue
34 
35 import numpy as np
36 
37 from daal.data_management import Compressor_Zlib, Decompressor_Zlib, level9, DecompressionStream, CompressionStream
38 
39 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
40 if utils_folder not in sys.path:
41  sys.path.insert(0, utils_folder)
42 from utils import getCRC32, readTextFile
43 
44 datasetFileName = os.path.join('..', 'data', 'online', 'logitboost_train.csv')
45 
46 # Queue for sending and receiving compressed data blocks
47 # queue_DataBlock
48 sendReceiveQueue = Queue.Queue()
49 
50 maxDataBlockSize = 16384 # Maximum size of a data block
51 userDefinedBlockSize = 7000 # Size for read data from a decompression stream
52 
53 def getDataBlock(sentDataStream, availableDataSize):
54  cur_pos = sentDataStream.size - availableDataSize
55 
56  # return next slice of array and remaining datasize
57  if availableDataSize >= maxDataBlockSize:
58  return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
59  elif availableDataSize < maxDataBlockSize and availableDataSize > 0:
60  return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
61  return (None,None)
62 
63 
64 def sendDataBlock(block):
65  currentBlock = np.copy(block)
66  # Push the current compressed block to the queue
67  sendReceiveQueue.put(currentBlock)
68 
69 
70 def receiveDataBlock():
71  # Stop at the end of the queue
72  if sendReceiveQueue.empty():
73  return None
74  # Receive and copy the current compressed block from the queue
75  return np.copy(sendReceiveQueue.get())
76 
77 
78 def printCRC32(sentDataStream, receivedDataStream):
79  # Compute checksums for full input data and full received data
80  crcSentDataStream = getCRC32(sentDataStream)
81  crcReceivedDataStream = getCRC32(receivedDataStream)
82 
83  print("\nCompression example program results:\n")
84 
85  print("Input data checksum: 0x{:02X}".format(crcSentDataStream))
86  print("Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
87 
88  if sentDataStream.size != receivedDataStream.size:
89  print("ERROR: Received data size mismatches with the sent data size")
90 
91  elif crcSentDataStream != crcReceivedDataStream:
92  print("ERROR: Received data CRC mismatches with the sent data CRC")
93  else:
94  print("OK: Received data CRC matches with the sent data CRC")
95 
96 
97 if __name__ == "__main__":
98  # Read data from a file and allocate memory
99  sentDataStream = readTextFile(datasetFileName)
100 
101  # Create a compressor
102  compressor = Compressor_Zlib()
103  compressor.parameter.gzHeader = True
104  compressor.parameter.level = level9
105 
106  # Create a stream for compression
107  compressionStream = CompressionStream(compressor)
108 
109  # Receive data by blocks from sentDataStream for further compression and send it
110  (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, sentDataStream.size)
111  while uncompressedDataBlock is not None:
112  # Put a data block to compressionStream and compress if needed
113  compressionStream.push_back(uncompressedDataBlock)
114 
115  # Get access to compressed blocks stored in compressionStream without an actual compressed data copy
116  compressedBlocks = compressionStream.getCompressedBlocksCollection()
117 
118  # Send compressed blocks stored in compressionStream
119  for i in range(compressedBlocks.size()):
120  # Send the current compressed block from compressionStream
121  sendDataBlock(compressedBlocks[i].getArray())
122 
123  # Receive the next data block for compression
124  (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, availableDataSize)
125 
126  # Create a decompressor
127  decompressor = Decompressor_Zlib()
128  decompressor.parameter.gzHeader = True
129 
130  # Create a stream for decompression
131  decompressionStream = DecompressionStream(decompressor)
132 
133  # Actual size of decompressed data currently read from decompressionStream
134  readSize = 0
135 
136  # Received uncompressed data stream
137  receivedDataStream = np.empty(0, dtype=np.uint8)
138  tmp_block = np.empty(userDefinedBlockSize, dtype=np.uint8)
139 
140  # Receive compressed data by blocks
141  compressedDataBlock = receiveDataBlock()
142 
143  while compressedDataBlock is not None:
144  # Write a received block to decompressionStream
145  decompressionStream.push_back(compressedDataBlock)
146 
147  # Asynchronous read from decompressionStream
148  while True:
149  # Read userDefinedBlockSize bytes from decompressionStream to the end of receivedDataStream
150  readSize = decompressionStream.copyDecompressedArray(tmp_block)
151  if readSize == 0:
152  break
153  # Update the actual data size in receivedDataStream
154  receivedDataStream = np.concatenate((receivedDataStream, tmp_block[:readSize]))
155 
156  # Receive next block
157  compressedDataBlock = receiveDataBlock()
158 
159  # Compute and print checksums for sentDataStream and receivedDataStream
160  printCRC32(sentDataStream, receivedDataStream)

For more complete information about compiler optimizations, see our Optimization Notice.