Skip to content

Commit 586c0e4

Browse files
berrangejoholl
authored andcommitted
tpmstream.io: add support for swtpm log file format
The swtpm program that is used to provide QEMU/KVM virtual machines with a virtual TPM can save a log of all TPM command packets. The file format looks like this: Ctrl Cmd: length 4 00 00 00 10 Ctrl Rsp: length 4 00 00 00 00 SWTPM_IO_Read: length 10 80 01 00 00 00 0A 00 00 01 81 SWTPM_IO_Write: length 10 80 01 00 00 00 0A 00 00 01 01 Ctrl Cmd: length 4 00 00 00 01 Ctrl Rsp: length 8 00 00 00 00 00 01 FF FF SWTPM_IO_Read: length 12 80 01 00 00 00 0C 00 00 01 44 00 00 SWTPM_IO_Write: length 10 80 01 00 00 00 0A 00 00 00 00 SWTPM_IO_Read: length 22 80 01 00 00 00 16 00 00 01 7A 00 00 00 05 00 00 00 00 00 00 00 01 SWTPM_IO_Write: length 43 80 01 00 00 00 2B 00 00 00 00 00 00 00 00 05 00 .... "Ctrl Cmd" and "Ctrl Rsp" are markers for messages on SWTPM's control channel, followed by data, which should be ignored. "SWTPM_IO_Read" and "SWTPM_IO_WRITE" are markers for TPM commands and responses respectively, where we can capture the following data and convert to binary. This allows viewing the swtpm command stream using # tpmstream convert swtpm.log the auto format detection looks for " Ctrl Cmd:" in the leading bytes. The format can also be requested explicitly # tpmstream convert --in swtpm-log swtpm.log Signed-off-by: Daniel P. Berrangé <[email protected]>
1 parent b244c0a commit 586c0e4

File tree

3 files changed

+140
-1
lines changed

3 files changed

+140
-1
lines changed

src/tpmstream/__main__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from .io.hex import Hex
2222
from .io.pcapng import Pcapng
2323
from .io.pretty import Pretty
24+
from .io.swtpm_log import SWTPMLog
2425
from .spec import all_types
2526
from .spec.commands import CommandResponseStream, Response
2627

@@ -65,6 +66,7 @@ def convert(args):
6566
"binary": Binary,
6667
"hex": Hex,
6768
"pcapng": Pcapng,
69+
"swtpm-log": SWTPMLog,
6870
}[args.format_in]
6971

7072
format_out = {
@@ -208,6 +210,7 @@ def find_type(args):
208210
"binary": Binary,
209211
"hex": Hex,
210212
"pcapng": Pcapng,
213+
"swtpm-log": SWTPMLog,
211214
}[args.format_in]
212215

213216
buffer = bytes(bytes_from_files(args.file))
@@ -264,7 +267,7 @@ def parse_all_types(format_in, buffer):
264267
format_in_arg = {
265268
"dest": "format_in",
266269
"type": str,
267-
"choices": ["binary", "hex", "pcapng", "auto"],
270+
"choices": ["binary", "hex", "pcapng", "swtpm-log" ,"auto"],
268271
"default": "auto",
269272
"help": "input stream format, default is auto (--in=auto only works with --type=CommandResponseStream)",
270273
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from ...common.event import MarshalEvent
2+
from .marshal import marshal
3+
4+
5+
class SWTPMLog:
6+
@staticmethod
7+
def marshal(tpm_type, buffer, root_path=None, command_code=None, **kwargs):
8+
return marshal(
9+
tpm_type, buffer, root_path=root_path, command_code=command_code, **kwargs
10+
)
11+
12+
@staticmethod
13+
def unmarshal(events: list[MarshalEvent]):
14+
raise NotImplementedError()
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
from ..binary import Binary
2+
3+
CMD_MARKER = b"SWTPM_IO"
4+
CTRL_MARKER = b"Ctrl"
5+
6+
VALID_HEX = b"0123456789ABCDEF"
7+
VALID_WS = b" \r\n"
8+
9+
STATE_WANT_CMD_MARKER = 0
10+
STATE_WANT_CMD_START = 1
11+
STATE_WANT_HIGH_NIBBLE = 2
12+
STATE_WANT_LOW_NIBBLE = 3
13+
14+
# File format we're parsing looks like this:
15+
#
16+
# Ctrl Cmd: length 4
17+
# 00 00 00 10
18+
# Ctrl Rsp: length 4
19+
# 00 00 00 00
20+
# SWTPM_IO_Read: length 10
21+
# 80 01 00 00 00 0A 00 00 01 81
22+
# SWTPM_IO_Write: length 10
23+
# 80 01 00 00 00 0A 00 00 01 01
24+
# Ctrl Cmd: length 4
25+
# 00 00 00 01
26+
# Ctrl Rsp: length 8
27+
# 00 00 00 00 00 01 FF FF
28+
# SWTPM_IO_Read: length 12
29+
# 80 01 00 00 00 0C 00 00 01 44 00 00
30+
# SWTPM_IO_Write: length 10
31+
# 80 01 00 00 00 0A 00 00 00 00
32+
# SWTPM_IO_Read: length 22
33+
# 80 01 00 00 00 16 00 00 01 7A 00 00 00 05 00 00
34+
# 00 00 00 00 00 01
35+
# SWTPM_IO_Write: length 43
36+
# 80 01 00 00 00 2B 00 00 00 00 00 00 00 00 05 00
37+
# ....
38+
#
39+
# "Ctrl Cmd" and "Ctrl Rsp" are markers for messages
40+
# on SWTPM's control channel, followed by data, which
41+
# we ignore.
42+
#
43+
# "SWTPM_IO_Read" and "SWTPM_IO_WRITE" are markers for
44+
# TPM commands and responses respectively, where we
45+
# capture the following data and convert to binary.
46+
47+
def parse_hex_string(buffer):
48+
"""Generator: hex string to bytes."""
49+
buffer = iter(buffer)
50+
51+
value = bytes()
52+
marker = bytes()
53+
state = STATE_WANT_CMD_MARKER
54+
55+
while True:
56+
try:
57+
b = bytes([next(buffer)])
58+
except StopIteration:
59+
b = None
60+
61+
if state == STATE_WANT_CMD_MARKER:
62+
if b is None:
63+
if len(marker) != 0:
64+
raise ValueError("Incomplete command marker '%s'" % str(marker))
65+
return
66+
elif b == CMD_MARKER[len(marker):len(marker)+1]:
67+
marker += b
68+
69+
if marker == CMD_MARKER:
70+
state = STATE_WANT_CMD_START
71+
marker = bytes()
72+
else:
73+
marker = bytes()
74+
continue
75+
elif state == STATE_WANT_CMD_START:
76+
if b is None:
77+
raise ValueError("Missing command payload")
78+
elif b == b'\n':
79+
state = STATE_WANT_HIGH_NIBBLE
80+
else:
81+
continue
82+
elif state == STATE_WANT_HIGH_NIBBLE:
83+
if b is None:
84+
return
85+
elif b in VALID_WS:
86+
continue
87+
elif b == CMD_MARKER[0:1]:
88+
state = STATE_WANT_CMD_MARKER
89+
marker += b
90+
elif b not in VALID_HEX:
91+
raise ValueError("Invalid hex digit '%s'" % str(b))
92+
else:
93+
value += b
94+
state = STATE_WANT_LOW_NIBBLE
95+
elif state == STATE_WANT_LOW_NIBBLE:
96+
if b is None:
97+
raise ValueError("Incomplete command byte")
98+
elif value == CTRL_MARKER[0:1] and b == CTRL_MARKER[1:2]:
99+
state = STATE_WANT_CMD_MARKER
100+
value = bytes()
101+
elif b not in VALID_HEX:
102+
raise ValueError("Invalid hex digit '%s'" % str(b))
103+
else:
104+
value += b
105+
i = int(value, 16)
106+
value = bytes()
107+
state = STATE_WANT_HIGH_NIBBLE
108+
yield i
109+
110+
111+
def marshal(tpm_type, buffer, root_path=None, command_code=None, **kwargs):
112+
"""Generator. Take iterable which yields single bytes. Yield MarshalEvents."""
113+
parsed_bytes = parse_hex_string(buffer)
114+
115+
result = yield from Binary.marshal(
116+
tpm_type=tpm_type,
117+
buffer=parsed_bytes,
118+
root_path=root_path,
119+
command_code=command_code,
120+
**kwargs,
121+
)
122+
return result

0 commit comments

Comments
 (0)