11from __future__ import annotations
22
33import re
4+ from typing import List
45
56from typing_extensions import TypedDict , override
67
78from pyinfra .api import FactBase
89
910from .gpg import GpgFactBase
10- from .util import make_cat_files_command
1111
1212
1313def noninteractive_apt (command : str , force = False ):
@@ -60,44 +60,176 @@ def parse_apt_repo(name):
6060 }
6161
6262
63- class AptSources (FactBase ):
63+ def parse_deb822_stanza (lines : list [str ]):
64+ """Parse a deb822 style repository stanza.
65+
66+ deb822 sources are key/value pairs separated by blank lines, eg::
67+
68+ Types: deb
69+ URIs: http://deb.debian.org/debian
70+ Suites: bookworm
71+ Components: main contrib
72+ Architectures: amd64
73+ Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg
74+
75+ Returns a list of dicts matching the legacy ``parse_apt_repo`` output so the
76+ rest of pyinfra can remain backwards compatible. A stanza may define
77+ multiple types/URIs/suites which we expand into individual repo dicts.
6478 """
65- Returns a list of installed apt sources:
6679
67- .. code:: python
80+ if not lines :
81+ return []
82+
83+ data : dict [str , str ] = {}
84+ for line in lines :
85+ if not line or line .startswith ("#" ):
86+ continue
87+ # Field-Name: value
88+ try :
89+ key , value = line .split (":" , 1 )
90+ except ValueError : # malformed line
91+ continue
92+ data [key .strip ()] = value .strip ()
93+
94+ required = ("Types" , "URIs" , "Suites" )
95+ if not all (field in data for field in required ): # not a valid stanza
96+ return []
97+
98+ types = data .get ("Types" , "" ).split ()
99+ uris = data .get ("URIs" , "" ).split ()
100+ suites = data .get ("Suites" , "" ).split ()
101+ components = data .get ("Components" , "" ).split ()
102+
103+ # Map deb822 specific fields to legacy option names
104+ options : dict [str , object ] = {}
105+ if architectures := data .get ("Architectures" ):
106+ archs = architectures .split ()
107+ if archs :
108+ options ["arch" ] = archs if len (archs ) > 1 else archs [0 ]
109+ if signed_by := data .get ("Signed-By" ):
110+ signed = signed_by .split ()
111+ options ["signed-by" ] = signed if len (signed ) > 1 else signed [0 ]
112+ if trusted := data .get ("Trusted" ):
113+ options ["trusted" ] = trusted .lower ()
114+
115+ repos = []
116+ # Produce combinations – in most real-world cases these will each be one.
117+ for _type in types or ["deb" ]:
118+ for uri in uris :
119+ for suite in suites :
120+ repos .append (
121+ {
122+ "options" : dict (options ), # copy per entry
123+ "type" : _type ,
124+ "url" : uri ,
125+ "distribution" : suite ,
126+ "components" : components ,
127+ }
128+ )
129+ return repos
130+
131+
132+ def parse_apt_list_file (lines : List [str ]):
133+ """Parse legacy .list style apt source file.
134+
135+ Each non-comment, non-empty line is a discrete repository definition in the
136+ traditional ``deb http://... suite components`` syntax.
137+ Returns a list of repo dicts (may be empty).
138+ """
139+ repos = []
140+ for raw in lines :
141+ line = raw .strip ()
142+ if not line or line .startswith ("#" ):
143+ continue
144+ repo = parse_apt_repo (line )
145+ if repo :
146+ repos .append (repo )
147+ return repos
148+
149+
150+ def parse_deb822_sources_file (lines : List [str ]):
151+ """Parse a full deb822 ``.sources`` file.
152+
153+ Splits on blank lines into stanzas and uses ``parse_deb822_stanza`` for each
154+ stanza. Returns a combined list of repo dicts for all stanzas.
155+ """
156+ repos = []
157+ stanza : List [str ] = []
158+ for raw in lines + ["" ]: # sentinel blank line to flush last stanza
159+ line = raw .rstrip ("\n " )
160+ if line .strip () == "" :
161+ if stanza :
162+ repos .extend (parse_deb822_stanza (stanza ))
163+ stanza = []
164+ continue
165+ stanza .append (line )
166+ return repos
68167
69- [
70- {
71- "type": "deb",
72- "url": "http://archive.ubuntu.org",
73- "distribution": "trusty",
74- "components", ["main", "multiverse"],
75- },
76- ]
168+
169+ class AptSources (FactBase ):
170+ """Returns a list of installed apt sources (legacy .list + deb822 .sources).
171+
172+ Backwards compatible with historical output: a flat list of dicts:
173+
174+ {
175+ "type": "deb",
176+ "url": "http://archive.ubuntu.org",
177+ "distribution": "bookworm",
178+ "components": ["main", "contrib"],
179+ "options": { ... },
180+ }
77181 """
78182
79183 @override
80184 def command (self ) -> str :
81- return make_cat_files_command (
82- "/etc/apt/sources.list" ,
83- "/etc/apt/sources.list.d/*.list" ,
185+ # We emit file boundary markers so the parser can select the correct
186+ # parsing function based on filename extension.
187+ return (
188+ "sh -c '"
189+ "for f in "
190+ "/etc/apt/sources.list "
191+ "/etc/apt/sources.list.d/*.list "
192+ "/etc/apt/sources.list.d/*.sources; do "
193+ '[ -e "$f" ] || continue; '
194+ 'echo "##FILE $f"; '
195+ 'cat "$f"; '
196+ "echo; "
197+ "done'"
84198 )
85199
86200 @override
87201 def requires_command (self ) -> str :
88- return "apt" # if apt installed, above should exist
202+ return "apt"
89203
90204 default = list
91205
92206 @override
93- def process (self , output ):
94- repos = []
95-
96- for line in output :
97- repo = parse_apt_repo (line )
98- if repo :
99- repos .append (repo )
100-
207+ def process (self , output ): # type: ignore[override]
208+ repos : list = []
209+ current_file : str | None = None
210+ buffer : list [str ] = []
211+
212+ def flush ():
213+ nonlocal buffer , current_file , repos
214+ if current_file is None or not buffer :
215+ buffer = []
216+ return
217+ if current_file .endswith (".sources" ):
218+ repos .extend (parse_deb822_sources_file (buffer ))
219+ else : # treat anything else as legacy list syntax
220+ repos .extend (parse_apt_list_file (buffer ))
221+ buffer = []
222+
223+ for raw_line in output :
224+ if raw_line .startswith ("##FILE " ):
225+ # New file marker
226+ flush ()
227+ current_file = raw_line .split (" " , 1 )[1 ].strip ()
228+ continue
229+ buffer .append (raw_line )
230+
231+ # Flush last file
232+ flush ()
101233 return repos
102234
103235
@@ -115,14 +247,30 @@ class AptKeys(GpgFactBase):
115247 }
116248 """
117249
118- # This requires both apt-key *and* apt-key itself requires gpg
119250 @override
120251 def command (self ) -> str :
121- return "! command -v gpg || apt-key list --with-colons"
122-
123- @override
124- def requires_command (self ) -> str :
125- return "apt-key"
252+ # Prefer not to use deprecated apt-key even if present. Iterate over keyrings
253+ # directly. This maintains backwards compatibility of output with the
254+ # previous implementation which fell back to this method.
255+ return (
256+ "for f in "
257+ " /etc/apt/trusted.gpg "
258+ " /etc/apt/trusted.gpg.d/*.gpg /etc/apt/trusted.gpg.d/*.asc "
259+ " /etc/apt/keyrings/*.gpg /etc/apt/keyrings/*.asc "
260+ " /usr/share/keyrings/*.gpg /usr/share/keyrings/*.asc "
261+ "; do "
262+ ' [ -e "$f" ] || continue; '
263+ ' case "$f" in '
264+ " *.asc) "
265+ ' gpg --batch --show-keys --with-colons --keyid-format LONG "$f" '
266+ " ;; "
267+ " *) "
268+ ' gpg --batch --no-default-keyring --keyring "$f" '
269+ " --list-keys --with-colons --keyid-format LONG "
270+ " ;; "
271+ " esac; "
272+ "done"
273+ )
126274
127275
128276class AptSimulationDict (TypedDict ):
0 commit comments