1+ package com .conveyal .datatools .manager ;
2+
3+ import com .conveyal .datatools .common .utils .aws .CheckedAWSException ;
4+ import com .conveyal .datatools .manager .models .FeedVersion ;
5+ import com .conveyal .datatools .manager .persistence .FeedStore ;
6+ import com .conveyal .datatools .manager .persistence .Persistence ;
7+ import com .conveyal .gtfs .GTFS ;
8+ import com .conveyal .gtfs .util .InvalidNamespaceException ;
9+ import com .conveyal .gtfs .util .Util ;
10+ import com .google .common .collect .Lists ;
11+ import com .mongodb .client .model .Projections ;
12+ import org .apache .commons .cli .CommandLine ;
13+ import org .apache .commons .cli .CommandLineParser ;
14+ import org .apache .commons .cli .DefaultParser ;
15+ import org .apache .commons .cli .HelpFormatter ;
16+ import org .apache .commons .cli .Option ;
17+ import org .apache .commons .cli .Options ;
18+ import org .apache .commons .cli .ParseException ;
19+ import org .bson .Document ;
20+ import org .bson .conversions .Bson ;
21+ import org .slf4j .Logger ;
22+ import org .slf4j .LoggerFactory ;
23+
24+ import java .io .IOException ;
25+ import java .sql .Connection ;
26+ import java .sql .PreparedStatement ;
27+ import java .sql .ResultSet ;
28+ import java .sql .SQLException ;
29+ import java .util .ArrayList ;
30+ import java .util .HashSet ;
31+ import java .util .List ;
32+ import java .util .Set ;
33+ import java .util .stream .Collectors ;
34+
35+ import static com .conveyal .datatools .manager .DataManager .GTFS_DATA_SOURCE ;
36+ import static com .conveyal .datatools .manager .DataManager .initializeApplication ;
37+ import static com .mongodb .client .model .Aggregates .project ;
38+ import static com .mongodb .client .model .Filters .nin ;
39+
40+ /**
41+ * The Data sanitizer requires the env.yml and server.yml files for configuration. Data sanitizer specific command-line parameters
42+ * should be provided after these e.g.:
43+ * configurations/test/env.yml.tmp configurations/test/server.yml.tmp --orphaned delete (or -O d)
44+ */
45+ public class DataSanitizer {
46+ private static final Logger LOG = LoggerFactory .getLogger (DataSanitizer .class );
47+
48+ public static void main (String [] args ) throws IOException {
49+ initializeApplication (args , false );
50+ parseArguments (args );
51+ }
52+
53+ /**
54+ * Parse the arguments provided on the command line. The first two arguments must reference the env.yml and
55+ * server.yml files.
56+ */
57+ public static void parseArguments (String [] arguments ) {
58+ Options options = new Options ();
59+ Option orphanedOption = Option .builder ("O" )
60+ .longOpt ("orphaned" )
61+ .desc ("Optional delete command for orphaned items" )
62+ .optionalArg (true )
63+ .argName ("deleteCommand" )
64+ .build ();
65+ options .addOption (orphanedOption );
66+
67+ try {
68+ CommandLineParser parser = new DefaultParser ();
69+ CommandLine cmd = parser .parse (options , arguments );
70+ if (cmd .hasOption ("O" )) {
71+ String deleteCommand = cmd .getOptionValue ("O" );
72+ boolean delete = "delete" .equalsIgnoreCase (deleteCommand ) || "d" .equalsIgnoreCase (deleteCommand );
73+ sanitizeFeedVersions (delete );
74+ sanitizeDBSchemas (delete );
75+ }
76+ } catch (ParseException e ) {
77+ System .out .println (e .getMessage ());
78+ HelpFormatter formatter = new HelpFormatter ();
79+ formatter .printHelp ("utility-name" , options );
80+ System .exit (1 );
81+ }
82+ }
83+
84+ /**
85+ * Group orphaned feed versions and optionally delete.
86+ */
87+ public static int sanitizeFeedVersions (boolean delete ) {
88+ List <FeedVersion > feedVersions = getOrphanedFeedVersions ();
89+ int orphaned = feedVersions .size ();
90+ if (orphaned == 0 ) {
91+ System .out .println ("No orphaned feed versions found!" );
92+ } else {
93+ FeedStore gtfsPlusStore = new FeedStore (DataManager .GTFS_PLUS_SUBDIR );
94+ System .out .printf ("%d orphaned feed versions:%n" , orphaned );
95+ for (FeedVersion feedVersion : feedVersions ) {
96+ boolean hasGTFSPlus = hasGTFSPlus (feedVersion , gtfsPlusStore );
97+ System .out .printf ("%-10s | %-10s | %-10s | %-10s | %-10s%n" , "ID" , "Version" , "Created" , "Updated" , "GTFS+" );
98+ System .out .printf (
99+ "%-10s | %-10s | %-10s | %-10s | %-10s%n" ,
100+ feedVersion .name ,
101+ feedVersion .version ,
102+ feedVersion .dateCreated ,
103+ feedVersion .lastUpdated ,
104+ hasGTFSPlus
105+ );
106+ }
107+ }
108+
109+ if (delete && !feedVersions .isEmpty ()) {
110+ System .out .println ("Total orphaned feed versions deleted: " + deleteOrphanedFeedVersions (feedVersions ));
111+ }
112+ return orphaned ;
113+ }
114+
115+ private static boolean hasGTFSPlus (FeedVersion feedVersion , FeedStore gtfsPlusStore ) {
116+ return DataManager .isModuleEnabled ("gtfsplus" ) && gtfsPlusStore .getFeed (feedVersion .id + ".db" ) != null ;
117+ }
118+
119+ /**
120+ * Group orphaned schemas and optionally delete.
121+ */
122+ public static void sanitizeDBSchemas (boolean delete ) {
123+ Set <String > orphanedSchemas = getOrphanedDBSchemas (getFieldFromDocument ("namespace" , "FeedVersion" ));
124+ if (orphanedSchemas .isEmpty ()) {
125+ System .out .println ("No orphaned DB schemas found!" );
126+
127+ } else {
128+ System .out .println ("Orphaned DB schemas: " + orphanedSchemas .size ());
129+ for (String schema : orphanedSchemas ) {
130+ System .out .println (schema );
131+ }
132+ }
133+
134+ if (delete && !orphanedSchemas .isEmpty ()) {
135+ System .out .println ("Total orphaned DB schemas deleted: " + deleteOrphanedDBSchemas (orphanedSchemas ));
136+ }
137+ }
138+
139+ /**
140+ * Delete orphaned feed versions.
141+ */
142+ private static int deleteOrphanedFeedVersions (List <FeedVersion > feedVersions ) {
143+ int deletedFeedVersions = 0 ;
144+ for (FeedVersion feedVersion : feedVersions ) {
145+ try {
146+ System .out .println ("Deleting orphaned feed version: " + feedVersion .id );
147+ feedVersion .deleteOrphan ();
148+ deletedFeedVersions ++;
149+ } catch (SQLException | CheckedAWSException | InvalidNamespaceException e ) {
150+ System .err .printf ("Failed to delete feed version: %s. %s%n" , feedVersion .id , e .getMessage ());
151+ }
152+ }
153+ return deletedFeedVersions ;
154+ }
155+
156+ /**
157+ * Delete orphaned DB schemas.
158+ */
159+ public static int deleteOrphanedDBSchemas (Set <String > orphanedSchemas ) {
160+ int deletedSchemas = 0 ;
161+ for (String orphanedSchema : orphanedSchemas ) {
162+ try {
163+ GTFS .delete (orphanedSchema , DataManager .GTFS_DATA_SOURCE );
164+ LOG .info ("Dropped orphaned DB schema from Postgres." );
165+ } catch (SQLException | InvalidNamespaceException e ) {
166+ System .err .printf ("Failed to delete DB schema: %s. %s%n" , orphanedSchema , e .getMessage ());
167+ }
168+ deletedSchemas ++;
169+ }
170+ return deletedSchemas ;
171+ }
172+
173+ /**
174+ * Produce a list of feed versions that are not attached to a feed source.
175+ */
176+ private static List <FeedVersion > getOrphanedFeedVersions () {
177+ Set <String > feedSourceIds = getFieldFromDocument ("_id" , "FeedSource" );
178+ return feedSourceIds .isEmpty ()
179+ ? new ArrayList <>()
180+ : Persistence .feedVersions .getFiltered (nin ("feedSourceId" , feedSourceIds ));
181+ }
182+
183+ /**
184+ * Get all qualifying schemas that are not associated with a feed version.
185+ */
186+ public static Set <String > getOrphanedDBSchemas (Set <String > associatedSchemas ) {
187+ String whereClause = associatedSchemas .isEmpty () ? "" : String .format (" WHERE nspname NOT IN (%s)" , associatedSchemas
188+ .stream ()
189+ .map (schema -> "'" + schema + "'" )
190+ .collect (Collectors .joining (", " ))
191+ );
192+ Set <String > orphanedSchemas = new HashSet <>();
193+ try (Connection connection = GTFS_DATA_SOURCE .getConnection ()) {
194+ String sql = String .format ("SELECT nspname FROM pg_namespace %s" , whereClause );
195+ LOG .info (sql );
196+ PreparedStatement preparedStatement = connection .prepareStatement (sql );
197+ ResultSet resultSet = preparedStatement .executeQuery ();
198+ while (resultSet .next ()) {
199+ String schemaName = resultSet .getString (1 );
200+ if (isValidSchema (schemaName )) {
201+ orphanedSchemas .add (schemaName );
202+ }
203+ }
204+ } catch (SQLException e ) {
205+ LOG .error ("Unable to get orphaned DB schemas" , e );
206+ }
207+ return orphanedSchemas ;
208+ }
209+
210+ /**
211+ * Make sure the schema qualifies as datatools-related schema.
212+ */
213+ private static boolean isValidSchema (String schemaName ) {
214+ List <String > criticalSchemas = List .of ("catalog" , "information_schema" , "public" , "temp" , "toast" );
215+ try {
216+ Util .ensureValidNamespace (schemaName );
217+ if (criticalSchemas .stream ().noneMatch (schemaName ::contains )) {
218+ // Belts and braces in case the previous check changes.
219+ return true ;
220+ }
221+ } catch (InvalidNamespaceException e ) {
222+ return false ;
223+ }
224+ return false ;
225+ }
226+
227+ /**
228+ * Extract a list of fields from all documents.
229+ */
230+ public static Set <String > getFieldFromDocument (String field , String document ) {
231+ Set <String > fields = new HashSet <>();
232+
233+ List <Bson > stages = Lists .newArrayList (
234+ project (
235+ Projections .fields (Projections .include (field ))
236+ )
237+ );
238+ for (Document feedVersionDocument : Persistence .getDocuments (document , stages )) {
239+ fields .add (feedVersionDocument .getString (field ));
240+ }
241+ return fields ;
242+ }
243+ }
0 commit comments