利用哈夫曼树进行数据压缩

发布于 2018-03-25  4,483 次阅读


数据通信中,需要对信息进行二进制编码,最简单方式就是等长编码,如“A”为“000”, “B”为“001”。然而人们希望信息的总长度尽量短,且实际应用中,部分字符出现频次高,另一些则比较低,因此编码时让高频字符使用短码,低频字符使用长码,可以实现数据的无损压缩。

哈夫曼树,即最优二叉树,是带权路径最短的树之一。设二叉树具有n 个带权值的叶结点,那么从根结点到各个叶结点的路径长度与相应结点权值的乘积之和叫做二叉树的带权路径长度,记为:

\[ ∑_{k=1}^{n}W_k*L_k \]

其中​​\( W_k \)​为第k个叶结点的权值,​\( L_k \)​为第k个叶结点的路径长度。而哈夫曼树就是二叉树中WPL最小的那一个。

设权值为各字符出现的频次,则频次越高,字符越靠近树的根部,路径越短。而各字符对应的编码由其路径决定,从根部出发,向左编码“0”,向右则为“1”

具体实现过程:

压缩

第零步,准备工作

创建哈夫曼节点类

class HuffmanNode:
    """
    A node in a Huffman tree.
    Symbols occur only at leaves.
    Each node has a number attribute that can be used for node-numbering.

    Attributes:
    ===========
    @param int symbol: symbol located in this node, if any
    @param HuffmanNode left: left subtree
    @param HuffmanNode right: right subtree
    @param int number: node number
    """

    def __init__(self, symbol=None, left=None, right=None):
        """
        Create a new HuffmanNode with the given parameters.

        @param HuffmanNode self: this HuffmanNode
        @param int|Node symbol: symbol to be stored in this node, if any
        @param HuffmanNode|Node left: a tree rooted at 'left', if any
        @param HuffmanNode|Node right: a tree rooted at 'right', if any
        @rtype: NoneType
        """
        self.symbol = symbol
        self.left, self.right = left, right
        self.number = None

    def __eq__(self, other):
        """
        Return True iff self is equivalent to other.

        @param HuffmanNode self: this HuffmanNode tree
        @param HuffmanNode|Any other: a tree rooted at the HuffmanNode 'other'
        @rtype: bool

        >>> a = HuffmanNode(4)
        >>> b = HuffmanNode(4)
        >>> a == b
        True
        >>> b = HuffmanNode(5)
        >>> a == b
        False
        """
        return (type(self) == type(other) and self.symbol == other.symbol and
                self.left == other.left and self.right == other.right)

    def __repr__(self):
        """
        Return constructor-style string representation.

        @param HuffmanNode self: this HuffmanNode tree
        @rtype: str
        """
        return 'HuffmanNode({}, {}, {})'.format(self.symbol,
                                                self.left, self.right)

    def is_leaf(self):
        """
        Return True iff self is a leaf.

        @param HuffmanNode self: this HuffmanNode tree
        @rtype: bool

        >>> t = HuffmanNode(None)
        >>> t.is_leaf()
        True
        """
        return not self.left and not self.right

创建实际节点类

class ReadNode:
    """
    A node as read from a compressed file.
    Each node consists of type and data information as described in the handout.
    This class offers a clean way to collect this information for each node.

    Attributes:
    ===========
    @param int l_type: 0/1 (if the corresponding HuffmanNode's left is a leaf)
    @param int l_data: a symbol or the node number of a HuffmanNode's left
    @param int r_type: 0/1 (if the corresponding HuffmanNode's right is a leaf)
    @param int r_data: a symbol or the node number of a HuffmanNode's right
    """

    def __init__(self, l_type, l_data, r_type, r_data):
        """
        Create a new ReadNode with the given parameters.

        @param ReadNode self: this ReadNode
        @param int l_type: used to initialize self.l_type
        @param int l_data: used to initialize self.l_data
        @param int r_type: used to initialize self.r_type
        @param int r_data: used to initialize self.r_data
        @rtype: NoneType
        """
        self.l_type, self.l_data = l_type, l_data
        self.r_type, self.r_data = r_type, r_data

    def __repr__(self):
        """
        Return constructor-style string representation.

        @param ReadNode self: this ReadNode
        @rtype: str
        """
        return 'ReadNode({}, {}, {}, {})'.format(
            self.l_type, self.l_data, self.r_type, self.r_data)

第一步,创建频次字典

  1. 从以二进制的方式打开的文件创建列表
  2. 已排序后的该列表的深度拷贝创建集合
  3. 以及合理的元素位键,元素在列表种出现的频次为值,创建频次字典
def make_freq_dict(text: bytes) -> dict:
    """
    Return a dictionary that maps each byte in text to its frequency.

    @param bytes text: a bytes object
    @rtype: dict(int,int)

    >>> d = make_freq_dict(bytes([65, 66, 67, 66]))
    >>> d == {65: 1, 66: 2, 67: 1}
    True
    """
    lst = list(text)
    elements = set(sorted(lst[:]))
    dic = dict()
    for element in elements:
        dic.setdefault(element, lst.count(element))
    return dic

第二步,创建哈夫曼树

  1. 以频次字典中的键创建哈夫曼节点,节点的标识为键,左右皆为None
  2. 将所有的叶节点(左右值为None)和对应的频次分别放入两个列表以创建循环结构
  3. 利用循环结构创建哈夫曼树:
    • 节点列表的长度等于1时:
      1. 返回枝节点,标识为None,左为Node,右为None
    • 节点列表的长度大于1时:
      1. 找到频次列表中最小的两个值,a和b,记录并移除节点列表中对应的节点Node(a)和Node(b)
      2. 创建枝节点,标识为None,左为Node(a),右为Node(b),其频次为a+b
      3. 将枝节点和其频次分别加到节点列表,频次列表的末尾
  4. (循环结束)返回节点列表的首项,该项即目标哈夫曼树
def huffman_tree(freq_dict: dict) -> 'HuffmanNode':
    """
    Return the root HuffmanNode of a Huffman tree corresponding
    to frequency dictionary freq_dict.

    @param dict(int,int) freq_dict: a frequency dictionary
    @rtype: HuffmanNode

    >>> freq = {2: 6, 3: 4}
    >>> t = huffman_tree(freq)
    >>> result1 = HuffmanNode(None, HuffmanNode(3), HuffmanNode(2))
    >>> result2 = HuffmanNode(None, HuffmanNode(2), HuffmanNode(3))
    >>> t == result1 or t == result2
    True
    """

    def minimum(l: list) -> tuple:
        a = min(l)
        in_a = l.index(a)
        if l.count(a) > 1:
            after = l[in_a + 1:]
            b = min(after)
            in_b = after.index(b) + in_a + 1
        else:
            l.remove(a)
            b = min(l)
            l.insert(in_a, a)
            in_b = l.index(b)
        l.remove(a)
        l.remove(b)
        add = a + b
        l.append(add)
        return in_a, in_b

    value = []
    frequency = []

    for key in freq_dict:
        node = HuffmanNode(key)
        value.append(node)
        frequency.append(freq_dict[key])
    if len(value) == 1:
        return HuffmanNode(None, value[0])
    while len(value) >= 2:
        mini = minimum(frequency)
        first = value[mini[0]]
        second = value[mini[1]]
        value.remove(first)
        value.remove(second)
        value.append(HuffmanNode(None, first, second))
    return value[0]

第三步,根据哈夫曼树创建字符的编码字典

  1. 创建空字典
  2. 遍历树,获得叶节点的标识,以列表的形式储存
  3. 根据标识列表的长度设定字典的键和值:
    • 当列表长度为1时:
      1. 返回{标识: “0”}
    • 当列表长度为2时:
      1. 返回{标识0: “0”, 标识2: “1”}
    • 其他:
      1. 以列表长度建立循环:
        1. 设定字典的键为当前的标识
        2. 键所对应的值可以利用辅助函数在树中递归检索获得
  4. 返回字典
def get_codes(tree: 'HuffmanNode') -> dict:
    """
    Return a dict mapping symbols from Huffman tree to codes.

    @param HuffmanNode tree: a Huffman tree rooted at node 'tree'
    @rtype: dict(int,str)

    >>> tree = HuffmanNode(None, HuffmanNode(3), HuffmanNode(2))
    >>> d = get_codes(tree)
    >>> d == {3: "0", 2: "1"}
    True
    """
    def check_point(n: 'HuffmanNode') -> int:
        def _find_right_min(node: 'HuffmanNode') -> int:
            return _find_right_min(node.left) if node.left else node.symbol
        return _find_right_min(n.right) if n.right else n.symbol

    def path_finder(value: int, t: 'HuffmanNode') -> str:
        path = ""
        if all_nodes[value] == t.symbol:
            return path
        check = check_point(t)
        if value < all_nodes.index(check):
            path += "0" + path_finder(value, t.left)
        else:
            path += "1" + path_finder(value, t.right)
        return path

    def in_order(t: 'HuffmanNode') -> list:
        left = in_order(t.left) if t.left is not None else []
        right = in_order(t.right) if t.right is not None else []
        return left + [t.symbol] + right if t.symbol is not None else left + right

    dic = dict()
    all_nodes = in_order(tree)
    if len(all_nodes) == 1:
        dic.setdefault(all_nodes[0], "0")
    elif len(all_nodes) == 2:
        dic.setdefault(all_nodes[0], "0")
        dic.setdefault(all_nodes[1], "1")
    else:
        for i in range(len(all_nodes)):
            dic.setdefault(all_nodes[i], path_finder(i, tree))
    return dic

第四步,以后序遍历标注各枝节点(解压时所需)

  1. 利用递归对哈夫曼树进行后序遍历,底层为连接叶节点的枝节点,获得节点列表
  2. 根据列表长度循环,节点的数字标注即其index
def number_nodes(tree: 'HuffmanNode') -> None:
    """
    Number internal nodes in tree according to post-order traversal;
    start numbering at 0.

    @param HuffmanNode tree:  a Huffman tree rooted at node 'tree'
    @rtype: NoneType

    >>> left = HuffmanNode(None, HuffmanNode(3), HuffmanNode(2))
    >>> right = HuffmanNode(None, HuffmanNode(9), HuffmanNode(10))
    >>> tree = HuffmanNode(None, left, right)
    >>> number_nodes(tree)
    >>> tree.left.number
    0
    >>> tree.right.number
    1
    >>> tree.number
    2
    """
    def post(t: 'HuffmanNode') -> list:
        lst = []
        if t.left.is_leaf() and t.right.is_leaf():
            lst.append(t)
            return lst
        if not t.left.is_leaf():
            lst += post(t.left)
        if not t.right.is_leaf():
            lst += post(t.right)
        lst.append(t)
        return lst

    nodes = post(tree)
    for index in range(len(nodes)):
        nodes[index].number = index

第五步,生成压缩后的编码

  1. 创建空列表,空字符串来辅助循环中的运算
  2. 以列表化的二进制原文本为循环:
    • 当前文本的编码从之前生成的编码字典中获取
    • 记录当前编码
    • 判断当前编码的长度,当长度不小于8时:
      1. 列表加入前8位编码
      2. 当前编码更新为第9位至末位
  3. (循环结束)判断最后一个编码的长度,当小于8时:
    1. 在末尾增加“0”,使其长度为8
  4. 记录当前编码
  5. 利用辅助函数将列表里的bit编码转化为byte,再将列表转化为bytes,最后返回
def generate_compressed(text: bytes, codes: dict) -> bytes:
    """
    Return compressed form of text, using mapping in codes for each symbol.

    @param bytes text: a bytes object
    @param dict(int,str) codes: mapping from symbols to codes
    @rtype: bytes
    """

    def bits_to_byte(bits: str) -> int:
        """
        Return int represented by bits, padded on right.

        @param str bits: a string representation of some bits
        @rtype: int

        >>> bits_to_byte("00000101")
        5
        >>> bits_to_byte("101") == 0b10100000
        True
        """
        return sum([int(bits[pos]) << (7 - pos)
                    for pos in range(len(bits))])

    lst = list(text)
    represents = []
    compressed = ""
    count = 0
    for element in lst:
        count += 1
        code = codes[element]
        compressed += code

        if len(compressed) >= 8:
            represent = compressed[:8]
            compressed = compressed[8:]
            represents.append(represent)

    if len(compressed) < 8:
        compressed += "0" * (8 - len(compressed))
    represent = compressed[:]
    represents.append(represent)

    return bytes([bits_to_byte(byte) for byte in represents])

第六步,将哈夫曼树转化为bytes(解压时所需)

  1. 以后序的方式遍历树,每个节点由四个byte表示,并以列表的形式储存:
    • 第一个byte:左子节点如果是叶节点,为0,否则为1
    • 第二个byte:如果一号位是0,为叶节点的标识(symbol),否则为左子节点的标注(number)
    • 第三个byte:右子节点如果是叶节点,为0,否则为1
    • 第四个byte:如果一号位是0,为叶节点的标识(symbol),否则为右子节点的标注(number)
  2. 将列表转化为bytes并返回
def tree_to_bytes(tree: 'HuffmanNode') -> bytes:
    """
    Return a bytes representation of the Huffman tree rooted at tree.

    @param HuffmanNode tree: a Huffman tree rooted at node 'tree'
    @rtype: bytes

    The representation should be based on the post-order traversal of tree
    internal nodes, starting from 0.
    Precondition: tree has its nodes numbered.

    >>> tree = HuffmanNode(None, HuffmanNode(3), HuffmanNode(2))
    >>> number_nodes(tree)
    >>> list(tree_to_bytes(tree))
    [0, 3, 0, 2]
    >>> left = HuffmanNode(None, HuffmanNode(3), HuffmanNode(2))
    >>> right = HuffmanNode(5)
    >>> tree = HuffmanNode(None, left, right)
    >>> number_nodes(tree)
    >>> list(tree_to_bytes(tree))
    [0, 3, 0, 2, 1, 0, 0, 5]
    """
    def post(t: 'HuffmanNode') -> None:

        if t.left.is_leaf() and t.right.is_leaf():
            nodes.append(0)
            nodes.append(t.left.symbol)
            nodes.append(0)
            nodes.append(t.right.symbol)

        elif not t.left.is_leaf() and t.right.is_leaf():
            post(t.left)
            nodes.append(1)
            nodes.append(t.left.number)
            nodes.append(0)
            nodes.append(t.right.symbol)

        elif t.left.is_leaf() and not t.right.is_leaf():
            post(t.right)
            nodes.append(0)
            nodes.append(t.left.symbol)
            nodes.append(1)
            nodes.append(t.right.number)

        else:
            post(t.left)
            post(t.right)
            nodes.append(1)
            nodes.append(t.left.number)
            nodes.append(1)
            nodes.append(t.right.number)

    nodes = []
    post(tree)
    return bytes(nodes)

第七步,调用函数实现压缩

  1. 以二进制的形式打开文件
  2. 生成频次字典(第一步)
  3. 生成哈夫曼树(第二步)
  4. 生成编码字典(第三步)
  5. 标注哈夫曼树的节点(第四步)
  6. 写入部分的组成:
    • 哈夫曼树的节点数
    • Bytes形式的树(第六步)
    • 源文件的大小
    • 编码后的文件
  7. 以二进制的形式写入新文件
def compress(in_file: str, out_file: str) -> None:
    """
    Compress contents of in_file and store results in out_file.

    @param str in_file: input file to compress
    @param str out_file: output file to store compressed result
    @rtype: NoneType
    """
    def size_to_bytes(size: int) -> bytes:
        """
        Return the size as a bytes object.

        @param int size: a 32-bit integer to convert to bytes
        @rtype: bytes

        >>> list(size_to_bytes(300))
        [44, 1, 0, 0]
        """
        # little-endian representation of 32-bit (4-byte)
        # int size
        return size.to_bytes(4, "little")

    def num_nodes_to_bytes(t: 'HuffmanNode') -> bytes:
        """
        Return number of nodes required to represent tree (the root of a
        numbered Huffman tree).

        @param HuffmanNode t: a Huffman tree rooted at node 'tree'
        @rtype: bytes
        """
        return bytes([t.number + 1])
    
    with open(in_file, "rb") as f1:
        text = f1.read()
    freq = make_freq_dict(text)
    tree = huffman_tree(freq)
    codes = get_codes(tree)
    number_nodes(tree)
    print("Bits per symbol:", avg_length(tree, freq))
    result = (num_nodes_to_bytes(tree) + tree_to_bytes(tree) +
              size_to_bytes(len(text)))
    result += generate_compressed(text, codes)
    with open(out_file, "wb") as f2:
        f2.write(result)

部分非核心辅助函数

def avg_length(tree: 'HuffmanNode', freq_dict: dict) -> float:
    """
    Return the number of bits per symbol required to compress text
    made of the symbols and frequencies in freq_dict, using the Huffman tree.

    @param HuffmanNode tree: a Huffman tree rooted at node 'tree'
    @param dict(int,int) freq_dict: frequency dictionary
    @rtype: float

    >>> freq = {3: 2, 2: 7, 9: 1}
    >>> left = HuffmanNode(None, HuffmanNode(3), HuffmanNode(2))
    >>> right = HuffmanNode(9)
    >>> tree = HuffmanNode(None, left, right)
    >>> avg_length(tree, freq)
    1.9
    """
    represents = get_codes(tree)
    total = sum([freq_dict[key] for key in freq_dict])
    occurrence = sum([freq_dict[key]*len(represents[key]) for key in freq_dict])
    return occurrence/total

解压缩

相比压缩过程,解压过程就轻松许多

第一步,重建哈夫曼树

  1. 利用辅助函数将bytes形式的树转化成实际节点类的列表,每个实际节点的4的参数分别对应之前压缩时生成的byte
  2. 新建空列表,根据之前的实际节点列表建立循环:
    • 判断一号位:
      1. 为0则根据2号位数值创建哈夫曼节点(左)
      2. 为1则读取新列表中2号位数值所代表的节点
    • 判断三号位
      1. 为0则根据4号位数值创建哈夫曼节点(右)
      2. 为1则读取新列表中4号位数值所代表的节点
    • 新建枝节点,标识为None,左右分别为之前创建的节点
    • 将枝节点标注并加入列表
  3. (循环结束)返回列表中的根节点
def generate_tree(buff: bytes, root_index) -> 'HuffmanNode':

    """
    Return the root of the Huffman tree corresponding
    to node_lst[root_index].

    The function assumes nothing about the order of the nodes in node_lst.

    @param bytes buff: a list of ReadNode objects
    @param int root_index: index in 'node_lst'
    @rtype: HuffmanNode

    >>> lst = [ReadNode(0, 5, 0, 7), ReadNode(0, 10, 0, 12), \
    ReadNode(1, 1, 1, 0)]
    >>> generate_tree(lst, 2)
    HuffmanNode(None, HuffmanNode(None, HuffmanNode(10, None, None), HuffmanNode(12, None, None)), \
HuffmanNode(None, HuffmanNode(5, None, None), HuffmanNode(7, None, None)))
    """
    def bytes_to_nodes(buf: bytes) -> list:
        """
        Return a list of ReadNodes corresponding to the bytes in buf.

        @param bytes buf: a bytes object
        @rtype: list[ReadNode]

        >>> bytes_to_nodes(bytes([0, 1, 0, 2]))
        [ReadNode(0, 1, 0, 2)]
        """
        lst = []
        for i in range(0, len(buf), 4):
            l_type = buf[i]
            l_data = buf[i + 1]
            r_type = buf[i + 2]
            r_data = buf[i + 3]
            lst.append(ReadNode(l_type, l_data, r_type, r_data))
        return lst

    branches = []
    node_lst = bytes_to_nodes(buff)
    for node in node_lst:
        if not node.l_type:
            left = HuffmanNode(node.l_data)
        else:
            left = branches[node.l_data]

        if not node.r_type:
            right = HuffmanNode(node.r_data)
        else:
            right = branches[node.r_data]

        current = HuffmanNode(None, left, right)
        current.number = node_lst.index(node)
        branches.append(current)

    return branches[root_index]

第二步,根据重建的树,压缩后的编码,原文件大小,来进行解压

  1. 将压缩后的编码重新转为bit并合并在一个字符串中
  2. 新建空列表记录原文件内容
  3. 新建计数器
  4. 根据列表的长度进行循环,当列表长度小于原文件大小时:
    • 新建指向树根部的“指针”
    • 根据字符串在计数器显示的位数的字符判断指针方向:
      1. 为“0”,指针指向左子节点
      2. 为“1”,指针指向右子节点
    • 根据指针所指节点的属性进行循环, 当节点不是叶节点时:
      1. 计数器增加1
      2. 根据字符串在计数器显示的位数的字符判断指针方向:
        1. 为“0”,指针指向左子节点
        2. 为“1”,指针指向右子节点
    • (内循环结束)列表增加当前指针所指节点的标识
    • 计数器增加1
  5. (外循环结束)返回bytes化的列表
def generate_uncompressed(tree: 'HuffmanNode', text: bytes, size: int) -> bytes:
    """
    Use Huffman tree to decompress size bytes from text.

    @param HuffmanNode tree: a HuffmanNode tree rooted at 'tree'
    @param bytes text: text to decompress
    @param int size: number of bytes to decompress from text.
    @rtype: bytes
    """
    def get_bit(byte: int, bit_num: int) -> int:
        """
        Return bit number bit_num from right in byte.

        @param int byte: a given byte
        @param int bit_num: a specific bit number within the byte
        @rtype: int

        >>> get_bit(0b00000101, 2)
        1
        >>> get_bit(0b00000101, 1)
        0
        """
        return (byte & (1 << bit_num)) >> bit_num

    def byte_to_bits(byte: int) -> str:
        """
        Return the representation of a byte as a string of bits.

        @param int byte: a given byte
        @rtype: str

        >>> byte_to_bits(14)
        '00001110'
        """
        return "".join([str(get_bit(byte, bit_num))
                        for bit_num in range(7, -1, -1)])

    compressed = "".join([byte_to_bits(byte) for byte in text])
    buff = []
    count = 0
    while len(buff) < size:
        current = tree
        if compressed[count] == "0":
            current = current.left
        else:
            current = current.right

        while not current.is_leaf():
            count += 1
            if compressed[count] == "0":
                current = current.left
            else:
                current = current.right

        buff.append(current.symbol)
        count += 1
    return bytes(buff)

第三步,调用函数实现压缩

  1. 以二进制读取的方式打开压缩后的文件
  2. 根据之前写入的数据,分别读取出
    • 哈夫曼树的节点数
    • Bytes形式的树(第六步)
    • 源文件的大小
    • 编码后的文件
  3. 重建树(第一步)
  4. 解压(第二步)
  5. 以二进制写入的方式将解压后的内容写入新文件
def uncompress(in_file: str, out_file: str) -> None:
    """
    Uncompress contents of in_file and store results in out_file.

    @param str in_file: input file to uncompress
    @param str out_file: output file that will hold the uncompressed results
    @rtype: NoneType
    """
    def bytes_to_size(buff: bytes) -> int:
        """ Return the size corresponding to the
        given 4-byte little-endian representation.

        @param bytes buff: a bytes object
        @rtype: int

        >>> bytes_to_size(bytes([44, 1, 0, 0]))
        300
        """
        return int.from_bytes(buff, "little")

    with open(in_file, "rb") as f:
        num_nodes = f.read(1)[0]
        buf = f.read(num_nodes * 4)
        tree = generate_tree(buf, num_nodes - 1)
        size = bytes_to_size(f.read(4))
        with open(out_file, "wb") as g:
            text = f.read()
            g.write(generate_uncompressed(tree, text, size))

后记

哈夫曼算法可以用于任何类型的文件,但仅在“字符”频次分布不均的时候能有较好效果,如文本文件,bmp类型的图像文件等;像mp3等类型的文件几乎没有压缩的效果。其次,压缩过后的文档不经过解压则无法使用,强行打开就是一堆乱码。

这里写出来的哈夫曼算法也还有很多可以改进的地方,比如减少循环和递归次数以减少运算时间。在测试中,压缩一个1.14MB的txt大致需要6.5s,压缩率56.75%;压缩一个4.07MB的bmp大致需要19s,压缩率55.19%,而压缩另一个11.7MB的bmp则大致需要41.5s,压缩率61.76%。在现在针对特定类型文件已经有很多高级压缩算法的情况下,这个成绩也就是能用来自娱自乐,基本派不上实际应用。


终有一日, 仰望星空