最新修改于2025-11-29 23:40
def varint2bytes(x: int) -> bytes:
out = []
while x > 0x7F:
out.append((x & 0x7F) | 0x80)
x >>= 7
out.append(x & 0x7F)
return bytes(out)
def bytes2varint(b: bytes) -> int:
value, shift = 0, 0
for _b in b:
value |= (_b & 0x7F) << shift
if not (_b & 0x80):
break
shift += 7
return value
def tag_from(field: int, wire: int) -> bytes:
assert 0 <= wire <= 7
assert field >= 1
varint = (field << 3) | wire
return varint2bytes(varint)
def from_tag(tag_bytes: bytes):
tag_varint = bytes2varint(tag_bytes)
return tag_varint >> 3, tag_varint & 0x07
def msg_from(field: int, wire: int, msg: bytes) -> bytes:
t = tag_from(field, wire)
if wire == 0:
return t + varint2bytes(msg)
return t + varint2bytes(len(msg)) + msg
def read_protobuf(data: bytes) -> dict:
res = {}
i = 0
if isinstance(data, str):
data = bytes.fromhex(data)
n = len(data)
while i < n:
tag_bytes = []
while i < n:
b = data[i]
i += 1
tag_bytes.append(b)
if not (b & 0x80): # MSB = 0 → last byte
break
if not tag_bytes:
break
tag_val = bytes2varint(bytes(tag_bytes))
field_num = tag_val >> 3
wire_type = tag_val & 0x07
if field_num not in res:
res[field_num] = {'wire': wire_type, 'values': []}
value_bytes = b''
if wire_type == 0: # Varint (int32, int64, uint32, uint64, sint32, sint64, bool, enum)
varint_bytes = []
while i < n:
b = data[i]
i += 1
varint_bytes.append(b)
if not (b & 0x80):
break
value_bytes = bytes(varint_bytes)
elif wire_type == 1: # 64-bit (fixed64, sfixed64, double)
if i + 8 > n:
raise ValueError('Truncated 64-bit value')
value_bytes = data[i:i+8]
i += 8
elif wire_type == 2: # Length-delimited (string, bytes, message, packed repeated)
len_bytes = []
while i < n:
b = data[i]
i += 1
len_bytes.append(b)
if not (b & 0x80):
break
length = bytes2varint(bytes(len_bytes))
if i + length > n:
raise ValueError(f'Message truncated: expected {length} bytes, got {n - i}')
value_bytes = data[i:i+length]
i += length
elif wire_type == 5: # 32-bit (fixed32, sfixed32, float)
if i + 4 > n:
raise ValueError('Truncated 32-bit value')
value_bytes = data[i:i+4]
i += 4
else:
continue
res[field_num]['values'].append(value_bytes)
return res