Protobuf逆向

mikoto2464 于 2025-11-29 发布 浏览量

最新修改于2025-11-29 23:40

def varint2bytes(x: int) -> bytes:
    out = []
    while x > 0x7F:
        out.append((x & 0x7F) | 0x80)
        x >>= 7
    out.append(x & 0x7F)
    return bytes(out)

def bytes2varint(b: bytes) -> int:
    value, shift = 0, 0
    for _b in b:
        value |= (_b & 0x7F) << shift
        if not (_b & 0x80):
            break
        shift += 7
    return value

def tag_from(field: int, wire: int) -> bytes:
    assert 0 <= wire <= 7
    assert field >= 1
    varint = (field << 3) | wire
    return varint2bytes(varint)

def from_tag(tag_bytes: bytes):
    tag_varint = bytes2varint(tag_bytes)
    return tag_varint >> 3, tag_varint & 0x07

def msg_from(field: int, wire: int, msg: bytes) -> bytes:
    t = tag_from(field, wire)
    if wire == 0:
        return t + varint2bytes(msg)
    return t + varint2bytes(len(msg)) + msg

def read_protobuf(data: bytes) -> dict:
    res = {}
    i = 0
    if isinstance(data, str):
        data = bytes.fromhex(data)
    n = len(data)

    while i < n:
        tag_bytes = []
        while i < n:
            b = data[i]
            i += 1
            tag_bytes.append(b)
            if not (b & 0x80):  # MSB = 0 → last byte
                break
        if not tag_bytes:
            break

        tag_val = bytes2varint(bytes(tag_bytes))
        field_num = tag_val >> 3
        wire_type = tag_val & 0x07

        if field_num not in res:
            res[field_num] = {'wire': wire_type, 'values': []}

        value_bytes = b''

        if wire_type == 0:  # Varint (int32, int64, uint32, uint64, sint32, sint64, bool, enum)
            varint_bytes = []
            while i < n:
                b = data[i]
                i += 1
                varint_bytes.append(b)
                if not (b & 0x80):
                    break
            value_bytes = bytes(varint_bytes)

        elif wire_type == 1:  # 64-bit (fixed64, sfixed64, double)
            if i + 8 > n:
                raise ValueError('Truncated 64-bit value')
            value_bytes = data[i:i+8]
            i += 8

        elif wire_type == 2:  # Length-delimited (string, bytes, message, packed repeated)
            len_bytes = []
            while i < n:
                b = data[i]
                i += 1
                len_bytes.append(b)
                if not (b & 0x80):
                    break
            length = bytes2varint(bytes(len_bytes))
            if i + length > n:
                raise ValueError(f'Message truncated: expected {length} bytes, got {n - i}')
            value_bytes = data[i:i+length]
            i += length

        elif wire_type == 5:  # 32-bit (fixed32, sfixed32, float)
            if i + 4 > n:
                raise ValueError('Truncated 32-bit value')
            value_bytes = data[i:i+4]
            i += 4

        else:
            continue

        res[field_num]['values'].append(value_bytes)

    return res