simple_compression2
The sequel to my first program “simple_compression.” Even more experimenting. Coded July 2010.
import struct
from PIL import Image
import math
def writeUnsignedInt(stream,v):
stream.write(struct.pack('<I', v))
def readUnsignedInt(stream):
s = stream.read(4)
p, = struct.unpack('<I', s)
return p
def writeUnsignedShort(stream,v):
stream.write(struct.pack('<H', v))
def readUnsignedShort(stream):
s = stream.read(2)
p, = struct.unpack('<H', s)
return p
def writeDouble(stream,v):
stream.write(struct.pack('d', v))
def readDouble(stream):
s = stream.read(8)
p, = struct.unpack('d', s)
return p
def writeUnsignedChar(stream,v):
stream.write(struct.pack('c', chr(v)))
def readUnsignedChar(stream):
s = stream.read(1)
p, = struct.unpack('c', s)
return ord(p)
def testOut():
s = open("byte_test","wb")
writeUnsignedShort(s,10)
s.close()
s = open("byte_test","rb")
#print readUnsignedShort(s)
s.close()
class PixelBlock:
def __init__(self):
self.m = 0.0
self.b = 0.0
self.inds = []
def read(self, s):
self.m = readDouble(s)
self.b = readDouble(s)
for ind in range(256):
self.inds.append(readUnsignedChar(s))
def write(self, s):
#print self.inds
writeDouble(s, self.m)
writeDouble(s, self.b)
for ind in self.inds:
writeUnsignedChar(s, ind)
def toImage(self):
image = Image.new("RGB",(16,16))
pixels = image.load()
for i in range(256): #always 16x16
x = i % 16
y = math.ceil(i/16)
val = int(round(self.inds[i]*self.m+self.b)) #inds[i] amplitude or x value
pixels[x, y] = (val,val,val)
return image
def fromImage(self,image):
pixels = image.getdata()
#print len(pixels),"PIXELS",image.size
self.m, self.b, self.inds = processSector(pixels)
class CompressedFile:
def __init__(self):
self.size = [0,0]
self.blocks = []
def testSize(self):
if (self.size[0] % 16) or (self.size[1] %16):
raise Exception("Image size not evenly divisible by 16")
def writeFile(self, s):
self.testSize()
writeUnsignedInt(s, self.size[0])
writeUnsignedInt(s, self.size[1])
for block in self.blocks:
block.write(s)
def readFile(self, s):
self.size[0] = readUnsignedInt(s)
self.size[1] = readUnsignedInt(s)
self.testSize()
num_blocks = (self.size[0]/16)*(self.size[1]/16)
for i in range(num_blocks):
b = PixelBlock()
b.read(s)
self.blocks.append(b)
def toImage(self):
image = Image.new("RGB",tuple(self.size))
num_blocks_x = (self.size[0]/16)
num_blocks_y = (self.size[1]/16)
ind = 0
for i in range(num_blocks_y):
for j in range(num_blocks_x):
b = self.blocks[ind]
crop = b.toImage()
image.paste(crop,(j*16,i*16,(j*16)+16,(i*16)+16))
ind+=1
#print ind, "of", i*j
return image
def fromImage(self, image):
self.size = list(image.size)
self.testSize()
num_blocks_x = (self.size[0]/16)
num_blocks_y = (self.size[1]/16)
ind = 0
for i in range(num_blocks_y):
for j in range(num_blocks_x):
crop = image.crop((j*16,i*16,j*16+16,i*16+16))
#crop.save("crop%i.jpg"%ind)
b = PixelBlock()
b.fromImage(crop)
self.blocks.append(b)
ind+=1
#print ind, "of", num_blocks_x*num_blocks_y
def linearRegression(points): #points are stored in this form [(x,y), ...]
sum_xy = 0
sum_x = 0
sum_y = 0
sum_x2 = 0
for point in points:
sum_xy+=point[0]*point[1]
sum_x+=point[0]
sum_y+=point[1]
sum_x2+=point[0]**2
n = float(len(points))
m = (n*sum_xy - sum_x * sum_y)/(n*sum_x2-sum_x**2) #slope
b = (sum_y-m*sum_x)/n #intercept
return m, b
def processSector(data): #data is [(R,G,B), ...]
#let's just compress red to start with
buffer = [] #[(value,old_index), ...]
for i in range(len(data)):
buffer.append((data[i][0],i))
buffer.sort() #sort from lowest to highest by first element
#print buffer
linreg_buffer = []
for i in range(len(buffer)):
linreg_buffer.append((i,buffer[i][0]))
m,b = linearRegression(linreg_buffer)
#print "slope", m, "intercept", b
return_vals = [] #in original order with form [amplitude_index, ...]
#create array to insert stuff into
for i in range(len(buffer)):
return_vals.append(0)
for amplitude in range(len(buffer)):
return_vals[buffer[amplitude][1]] = amplitude #buffer[amplitude][1] is the original ind
return m,b,return_vals
#just deal with one sector (255x255) for now
def compressImage():
image = Image.open("test_image_desert_desat.jpg")
f = CompressedFile()
f.fromImage(image)
s = open("output.compressed", "wb")
f.writeFile(s)
s.close()
####now write it back and display the image
def decompressImage():
s = open("output.compressed", "rb")
f = CompressedFile()
f.readFile(s)
s.close()
image = f.toImage()
image.save("output.bmp")
#image must be evenly divisible by 16 in both dimentions
compressImage()
decompressImage()
#print linearRegression([(0,0),(2,2)])