simple_compression2


The sequel to my first program “simple_compression.” Even more experimenting. Coded July 2010.

import struct
from PIL import Image
import math

def writeUnsignedInt(stream,v):
    stream.write(struct.pack('<I', v))

def readUnsignedInt(stream):
    s = stream.read(4)
    p, = struct.unpack('<I', s)
    return p

def writeUnsignedShort(stream,v):
    stream.write(struct.pack('<H', v))

def readUnsignedShort(stream):
    s = stream.read(2)
    p, = struct.unpack('<H', s)
    return p

def writeDouble(stream,v):
    stream.write(struct.pack('d', v))

def readDouble(stream):
    s = stream.read(8)
    p, = struct.unpack('d', s)
    return p

def writeUnsignedChar(stream,v):
    stream.write(struct.pack('c', chr(v)))

def readUnsignedChar(stream):
    s = stream.read(1)
    p, = struct.unpack('c', s)
    return ord(p)

def testOut():
    s = open("byte_test","wb")
    writeUnsignedShort(s,10)
    s.close()
    s = open("byte_test","rb")
    #print readUnsignedShort(s)
    s.close()

class PixelBlock:
    def __init__(self):
        self.m = 0.0
        self.b = 0.0
        self.inds = []

    def read(self, s):
        self.m = readDouble(s)
        self.b = readDouble(s)
        for ind in range(256):
            self.inds.append(readUnsignedChar(s))

    def write(self, s):
        #print self.inds
        writeDouble(s, self.m)
        writeDouble(s, self.b)
        for ind in self.inds:
            writeUnsignedChar(s, ind)

    def toImage(self):
        image = Image.new("RGB",(16,16))
        pixels = image.load()
        for i in range(256): #always 16x16
            x = i % 16
            y = math.ceil(i/16)
            val = int(round(self.inds[i]*self.m+self.b)) #inds[i] amplitude or x value
            pixels[x, y] = (val,val,val)
        return image

    def fromImage(self,image):
        pixels = image.getdata()
        #print len(pixels),"PIXELS",image.size
        self.m, self.b, self.inds = processSector(pixels)


class CompressedFile:
    def __init__(self):
        self.size = [0,0]
        self.blocks = []

    def testSize(self):
        if (self.size[0] % 16) or (self.size[1] %16):
            raise Exception("Image size not evenly divisible by 16")
        
    def writeFile(self, s):
        self.testSize()
        
        writeUnsignedInt(s, self.size[0])
        writeUnsignedInt(s, self.size[1])

        for block in self.blocks:
            block.write(s)

    def readFile(self, s):
        self.size[0] = readUnsignedInt(s)
        self.size[1] = readUnsignedInt(s)

        self.testSize()
        
        num_blocks = (self.size[0]/16)*(self.size[1]/16)
        for i in range(num_blocks):
            b = PixelBlock()
            b.read(s)
            self.blocks.append(b)

    def toImage(self):
        image = Image.new("RGB",tuple(self.size))

        num_blocks_x = (self.size[0]/16)
        num_blocks_y = (self.size[1]/16)
        ind = 0
        for i in range(num_blocks_y):
            for j in range(num_blocks_x):
                b = self.blocks[ind]
                crop = b.toImage()
                image.paste(crop,(j*16,i*16,(j*16)+16,(i*16)+16))
                ind+=1
                #print ind, "of", i*j
        return image

    def fromImage(self, image):
        self.size = list(image.size)

        self.testSize()

        num_blocks_x = (self.size[0]/16)
        num_blocks_y = (self.size[1]/16)
        ind = 0
        for i in range(num_blocks_y):
            for j in range(num_blocks_x):
                crop = image.crop((j*16,i*16,j*16+16,i*16+16))
                #crop.save("crop%i.jpg"%ind)
                b = PixelBlock()
                b.fromImage(crop)
                self.blocks.append(b)
                ind+=1
                #print ind, "of", num_blocks_x*num_blocks_y

def linearRegression(points): #points are stored in this form [(x,y), ...]
    sum_xy = 0
    sum_x = 0
    sum_y = 0
    sum_x2 = 0
    for point in points:
        sum_xy+=point[0]*point[1]
        sum_x+=point[0]
        sum_y+=point[1]
        sum_x2+=point[0]**2
        n = float(len(points))
    m = (n*sum_xy - sum_x * sum_y)/(n*sum_x2-sum_x**2) #slope
    b = (sum_y-m*sum_x)/n #intercept
    return m, b

def processSector(data): #data is [(R,G,B), ...]
    #let's just compress red to start with
    buffer = [] #[(value,old_index), ...]
    for i in range(len(data)):
        buffer.append((data[i][0],i))
    buffer.sort() #sort from lowest to highest by first element
    #print buffer
    linreg_buffer = []
    for i in range(len(buffer)):
        linreg_buffer.append((i,buffer[i][0]))
    m,b = linearRegression(linreg_buffer)
    #print "slope", m, "intercept", b
    
    return_vals = [] #in original order with form [amplitude_index, ...]
    #create array to insert stuff into
    for i in range(len(buffer)):
        return_vals.append(0)
    for amplitude in range(len(buffer)):
        return_vals[buffer[amplitude][1]] = amplitude #buffer[amplitude][1] is the original ind
    return m,b,return_vals

#just deal with one sector (255x255) for now
def compressImage():
    image = Image.open("test_image_desert_desat.jpg")
    f = CompressedFile()
    f.fromImage(image)
    s = open("output.compressed", "wb")
    f.writeFile(s)
    s.close()
    
####now write it back and display the image
def decompressImage():
    s = open("output.compressed", "rb")
    f = CompressedFile()
    f.readFile(s)
    s.close()
    
    image = f.toImage()
    image.save("output.bmp")

#image must be evenly divisible by 16 in both dimentions
compressImage()
decompressImage()


#print linearRegression([(0,0),(2,2)])

Posted in: Code by nsundin
Copyright © 2011-2025 Programmatic Verse · RSS Feed
Built on Skeleton
Powerered by Wordpress