Python* API Reference for Intel® Data Analytics Acceleration Library 2018 Update 1

compression_online.py

1 # file: compression_online.py
2 #===============================================================================
3 # Copyright 2014-2017 Intel Corporation
4 # All Rights Reserved.
5 #
6 # If this software was obtained under the Intel Simplified Software License,
7 # the following terms apply:
8 #
9 # The source code, information and material ("Material") contained herein is
10 # owned by Intel Corporation or its suppliers or licensors, and title to such
11 # Material remains with Intel Corporation or its suppliers or licensors. The
12 # Material contains proprietary information of Intel or its suppliers and
13 # licensors. The Material is protected by worldwide copyright laws and treaty
14 # provisions. No part of the Material may be used, copied, reproduced,
15 # modified, published, uploaded, posted, transmitted, distributed or disclosed
16 # in any way without Intel's prior express written permission. No license under
17 # any patent, copyright or other intellectual property rights in the Material
18 # is granted to or conferred upon you, either expressly, by implication,
19 # inducement, estoppel or otherwise. Any license under such intellectual
20 # property rights must be express and approved by Intel in writing.
21 #
22 # Unless otherwise agreed by Intel in writing, you may not remove or alter this
23 # notice or any other notice embedded in Materials by Intel or Intel's
24 # suppliers or licensors in any way.
25 #
26 #
27 # If this software was obtained under the Apache License, Version 2.0 (the
28 # "License"), the following terms apply:
29 #
30 # You may not use this file except in compliance with the License. You may
31 # obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
32 #
33 #
34 # Unless required by applicable law or agreed to in writing, software
35 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
36 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
37 #
38 # See the License for the specific language governing permissions and
39 # limitations under the License.
40 #===============================================================================
41 
42 #
43 # ! Content:
44 # ! Python example of compression in the online processing mode
45 # !
46 # !*****************************************************************************
47 
48 #
49 ## <a name="DAAL-EXAMPLE-PY-COMPRESSION_ONLINE"></a>
50 ## \example compression_online.py
51 #
52 
53 import os
54 import sys
55 
56 if sys.version[0] == '2':
57  import Queue as Queue
58 else:
59  import queue as Queue
60 
61 import numpy as np
62 
63 from daal.data_management import Compressor_Zlib, Decompressor_Zlib, level9, DecompressionStream, CompressionStream
64 
65 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
66 if utils_folder not in sys.path:
67  sys.path.insert(0, utils_folder)
68 from utils import getCRC32, readTextFile
69 
70 datasetFileName = os.path.join('..', 'data', 'online', 'logitboost_train.csv')
71 
72 # Queue for sending and receiving compressed data blocks
73 # queue_DataBlock
74 sendReceiveQueue = Queue.Queue()
75 
76 maxDataBlockSize = 16384 # Maximum size of a data block
77 userDefinedBlockSize = 7000 # Size for read data from a decompression stream
78 
79 def getDataBlock(sentDataStream, availableDataSize):
80  cur_pos = sentDataStream.size - availableDataSize
81 
82  # return next slice of array and remaining datasize
83  if availableDataSize >= maxDataBlockSize:
84  return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
85  elif availableDataSize < maxDataBlockSize and availableDataSize > 0:
86  return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
87  return (None,None)
88 
89 
90 def sendDataBlock(block):
91  currentBlock = np.copy(block)
92  # Push the current compressed block to the queue
93  sendReceiveQueue.put(currentBlock)
94 
95 
96 def receiveDataBlock():
97  # Stop at the end of the queue
98  if sendReceiveQueue.empty():
99  return None
100  # Receive and copy the current compressed block from the queue
101  return np.copy(sendReceiveQueue.get())
102 
103 
104 def printCRC32(sentDataStream, receivedDataStream):
105  # Compute checksums for full input data and full received data
106  crcSentDataStream = getCRC32(sentDataStream)
107  crcReceivedDataStream = getCRC32(receivedDataStream)
108 
109  print("\nCompression example program results:\n")
110 
111  print("Input data checksum: 0x{:02X}".format(crcSentDataStream))
112  print("Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
113 
114  if sentDataStream.size != receivedDataStream.size:
115  print("ERROR: Received data size mismatches with the sent data size")
116 
117  elif crcSentDataStream != crcReceivedDataStream:
118  print("ERROR: Received data CRC mismatches with the sent data CRC")
119  else:
120  print("OK: Received data CRC matches with the sent data CRC")
121 
122 
123 if __name__ == "__main__":
124  # Read data from a file and allocate memory
125  sentDataStream = readTextFile(datasetFileName)
126 
127  # Create a compressor
128  compressor = Compressor_Zlib()
129  compressor.parameter.gzHeader = True
130  compressor.parameter.level = level9
131 
132  # Create a stream for compression
133  compressionStream = CompressionStream(compressor)
134 
135  # Receive data by blocks from sentDataStream for further compression and send it
136  (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, sentDataStream.size)
137  while uncompressedDataBlock is not None:
138  # Put a data block to compressionStream and compress if needed
139  compressionStream.push_back(uncompressedDataBlock)
140 
141  # Get access to compressed blocks stored in compressionStream without an actual compressed data copy
142  compressedBlocks = compressionStream.getCompressedBlocksCollection()
143 
144  # Send compressed blocks stored in compressionStream
145  for i in range(compressedBlocks.size()):
146  # Send the current compressed block from compressionStream
147  sendDataBlock(compressedBlocks[i].getArray())
148 
149  # Receive the next data block for compression
150  (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, availableDataSize)
151 
152  # Create a decompressor
153  decompressor = Decompressor_Zlib()
154  decompressor.parameter.gzHeader = True
155 
156  # Create a stream for decompression
157  decompressionStream = DecompressionStream(decompressor)
158 
159  # Actual size of decompressed data currently read from decompressionStream
160  readSize = 0
161 
162  # Received uncompressed data stream
163  receivedDataStream = np.empty(0, dtype=np.uint8)
164  tmp_block = np.empty(userDefinedBlockSize, dtype=np.uint8)
165 
166  # Receive compressed data by blocks
167  compressedDataBlock = receiveDataBlock()
168 
169  while compressedDataBlock is not None:
170  # Write a received block to decompressionStream
171  decompressionStream.push_back(compressedDataBlock)
172 
173  # Asynchronous read from decompressionStream
174  while True:
175  # Read userDefinedBlockSize bytes from decompressionStream to the end of receivedDataStream
176  readSize = decompressionStream.copyDecompressedArray(tmp_block)
177  if readSize == 0:
178  break
179  # Update the actual data size in receivedDataStream
180  receivedDataStream = np.concatenate((receivedDataStream, tmp_block[:readSize]))
181 
182  # Receive next block
183  compressedDataBlock = receiveDataBlock()
184 
185  # Compute and print checksums for sentDataStream and receivedDataStream
186  printCRC32(sentDataStream, receivedDataStream)

For more complete information about compiler optimizations, see our Optimization Notice.