1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one
3
+ * or more contributor license agreements. See the NOTICE file
4
+ * distributed with this work for additional information
5
+ * regarding copyright ownership. The ASF licenses this file
6
+ * to you under the Apache License, Version 2.0 (the
7
+ * "License"); you may not use this file except in compliance
8
+ * with the License. You may obtain a copy of the License at
9
+ *
10
+ * http://www.apache.org/licenses/LICENSE-2.0
11
+ *
12
+ * Unless required by applicable law or agreed to in writing, software
13
+ * distributed under the License is distributed on an "AS IS" BASIS,
14
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
+ * See the License for the specific language governing permissions and
16
+ * limitations under the License.
17
+ */
18
+ package org .apache .drill .exec .server .profile ;
19
+
20
+ import static org .apache .drill .exec .ExecConstants .DRILL_SYS_FILE_SUFFIX ;
21
+
22
+ import java .io .IOException ;
23
+ import java .io .InputStream ;
24
+ import java .text .SimpleDateFormat ;
25
+ import java .util .Date ;
26
+ import java .util .HashMap ;
27
+ import java .util .List ;
28
+ import java .util .Map ;
29
+ import java .util .concurrent .TimeUnit ;
30
+
31
+ import org .apache .commons .io .IOUtils ;
32
+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
33
+ import org .apache .drill .common .config .DrillConfig ;
34
+ import org .apache .drill .exec .ExecConstants ;
35
+ import org .apache .drill .exec .coord .ClusterCoordinator ;
36
+ import org .apache .drill .exec .coord .DistributedSemaphore ;
37
+ import org .apache .drill .exec .coord .DistributedSemaphore .DistributedLease ;
38
+ import org .apache .drill .exec .coord .zk .ZKClusterCoordinator ;
39
+ import org .apache .drill .exec .coord .zk .ZkDistributedSemaphore ;
40
+ import org .apache .drill .exec .exception .StoreException ;
41
+ import org .apache .drill .exec .proto .UserBitShared .QueryProfile ;
42
+ import org .apache .drill .exec .server .DrillbitContext ;
43
+ import org .apache .drill .exec .server .QueryProfileStoreContext ;
44
+ import org .apache .drill .exec .store .dfs .DrillFileSystem ;
45
+ import org .apache .drill .exec .store .sys .PersistentStoreConfig ;
46
+ import org .apache .drill .exec .store .sys .store .DrillSysFilePathFilter ;
47
+ import org .apache .drill .exec .store .sys .store .LocalPersistentStore ;
48
+ import org .apache .drill .exec .store .sys .store .ProfileSet ;
49
+ import org .apache .drill .exec .store .sys .store .provider .ZookeeperPersistentStoreProvider ;
50
+ import org .apache .drill .exec .util .DrillFileSystemUtil ;
51
+ import org .apache .drill .shaded .guava .com .google .common .base .Stopwatch ;
52
+ import org .apache .hadoop .fs .FileStatus ;
53
+ import org .apache .hadoop .fs .Path ;
54
+ import org .apache .hadoop .fs .PathFilter ;
55
+ import org .slf4j .Logger ;
56
+ import org .slf4j .LoggerFactory ;
57
+
58
+ /**
59
+ * Manage profiles by archiving
60
+ */
61
+ public class ProfileIndexer {
62
+ private static final Logger logger = LoggerFactory .getLogger (ProfileIndexer .class );
63
+ private static final String lockPathString = "/profileIndexer" ;
64
+ private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX .length ();
65
+
66
+ private final ZKClusterCoordinator zkCoord ;
67
+ private final DrillFileSystem fs ;
68
+ private final Path basePath ;
69
+ private final ProfileSet profiles ;
70
+ private final int indexingRate ;
71
+ private final PathFilter sysFileSuffixFilter ;
72
+ private SimpleDateFormat indexedPathFormat ;
73
+ private final boolean useZkCoordinatedManagement ;
74
+ private DrillConfig drillConfig ;
75
+
76
+ private PersistentStoreConfig <QueryProfile > pStoreConfig ;
77
+ private LocalPersistentStore <QueryProfile > completedProfileStore ;
78
+ private Stopwatch indexWatch ;
79
+ private int indexedCount ;
80
+ private int currentProfileCount ;
81
+
82
+
83
+ /**
84
+ * ProfileIndexer
85
+ */
86
+ public ProfileIndexer (ClusterCoordinator coord , DrillbitContext context ) throws StoreException , IOException {
87
+ drillConfig = context .getConfig ();
88
+
89
+ // FileSystem
90
+ try {
91
+ this .fs = inferFileSystem (drillConfig );
92
+ } catch (IOException ex ) {
93
+ throw new StoreException ("Unable to get filesystem" , ex );
94
+ }
95
+
96
+ //Use Zookeeper for coordinated management
97
+ final List <String > supportedFS = drillConfig .getStringList (ExecConstants .PROFILES_STORE_INDEX_SUPPORTED_FS );
98
+ if (this .useZkCoordinatedManagement = supportedFS .contains (fs .getScheme ())) {
99
+ this .zkCoord = (ZKClusterCoordinator ) coord ;
100
+ } else {
101
+ this .zkCoord = null ;
102
+ }
103
+
104
+ // Query Profile Store
105
+ QueryProfileStoreContext pStoreContext = context .getProfileStoreContext ();
106
+ this .completedProfileStore = (LocalPersistentStore <QueryProfile >) pStoreContext .getCompletedProfileStore ();
107
+ this .pStoreConfig = pStoreContext .getProfileStoreConfig ();
108
+ this .basePath = completedProfileStore .getBasePath ();
109
+
110
+ this .indexingRate = drillConfig .getInt (ExecConstants .PROFILES_STORE_INDEX_MAX );
111
+ this .profiles = new ProfileSet (indexingRate );
112
+ this .indexWatch = Stopwatch .createUnstarted ();
113
+ this .sysFileSuffixFilter = new DrillSysFilePathFilter ();
114
+ String indexPathPattern = drillConfig .getString (ExecConstants .PROFILES_STORE_INDEX_FORMAT );
115
+ this .indexedPathFormat = new SimpleDateFormat (indexPathPattern );
116
+ logger .info ("Organizing any existing unindexed profiles" );
117
+ }
118
+
119
+
120
+ /**
121
+ * Index profiles
122
+ */
123
+ public void indexProfiles () {
124
+ this .indexWatch .start ();
125
+
126
+ // Acquire lock IFF required
127
+ if (useZkCoordinatedManagement ) {
128
+ DistributedSemaphore indexerMutex = new ZkDistributedSemaphore (zkCoord .getCurator (), lockPathString , 1 );
129
+ try (DistributedLease lease = indexerMutex .acquire (0 , TimeUnit .SECONDS )) {
130
+ if (lease != null ) {
131
+ listAndIndex ();
132
+ } else {
133
+ logger .debug ("Couldn't get a lease acquisition" );
134
+ }
135
+ } catch (Exception e ) {
136
+ //DoNothing since lease acquisition failed
137
+ logger .error ("Exception during lease-acquisition:: {}" , e );
138
+ }
139
+ } else {
140
+ try {
141
+ listAndIndex ();
142
+ } catch (IOException e ) {
143
+ logger .error ("Failed to index: {}" , e );
144
+ }
145
+ }
146
+ logger .info ("Successfully indexed {} of {} profiles during startup in {} seconds" , indexedCount , currentProfileCount , this .indexWatch .stop ().elapsed (TimeUnit .SECONDS ));
147
+ }
148
+
149
+
150
+ //Lists and Indexes the latest profiles
151
+ private void listAndIndex () throws IOException {
152
+ currentProfileCount = listForArchiving ();
153
+ indexedCount = 0 ;
154
+ logger .info ("Found {} profiles that need to be indexed. Will attempt to index {} profiles" , currentProfileCount ,
155
+ (currentProfileCount > this .indexingRate ) ? this .indexingRate : currentProfileCount );
156
+
157
+ // Track MRU index paths
158
+ Map <String , Path > mruIndexPath = new HashMap <>();
159
+ if (currentProfileCount > 0 ) {
160
+ while (!this .profiles .isEmpty ()) {
161
+ String profileToIndex = profiles .removeYoungest () + DRILL_SYS_FILE_SUFFIX ;
162
+ Path srcPath = new Path (basePath , profileToIndex );
163
+ long profileStartTime = getProfileStart (srcPath );
164
+ if (profileStartTime < 0 ) {
165
+ logger .debug ("Will skip indexing {}" , srcPath );
166
+ continue ;
167
+ }
168
+ String indexPath = indexedPathFormat .format (new Date (profileStartTime ));
169
+ //Check if dest dir exists
170
+ Path indexDestPath = null ;
171
+ if (!mruIndexPath .containsKey (indexPath )) {
172
+ indexDestPath = new Path (basePath , indexPath );
173
+ if (!fs .isDirectory (indexDestPath )) {
174
+ // Build dir
175
+ if (fs .mkdirs (indexDestPath )) {
176
+ mruIndexPath .put (indexPath , indexDestPath );
177
+ } else {
178
+ //Creation failed. Did someone else create?
179
+ if (fs .isDirectory (indexDestPath )) {
180
+ mruIndexPath .put (indexPath , indexDestPath );
181
+ }
182
+ }
183
+ } else {
184
+ mruIndexPath .put (indexPath , indexDestPath );
185
+ }
186
+ } else {
187
+ indexDestPath = mruIndexPath .get (indexPath );
188
+ }
189
+
190
+ //Attempt Move
191
+ boolean renameStatus = false ;
192
+ if (indexDestPath != null ) {
193
+ Path destPath = new Path (indexDestPath , profileToIndex );
194
+ renameStatus = DrillFileSystemUtil .rename (fs , srcPath , destPath );
195
+ if (renameStatus ) {
196
+ indexedCount ++;
197
+ }
198
+ }
199
+ if (indexDestPath == null || !renameStatus ) {
200
+ // Stop attempting any more archiving since other StoreProviders might be archiving
201
+ logger .error ("Move failed for {} [{} | {}]" , srcPath , indexDestPath == null , renameStatus );
202
+ continue ;
203
+ }
204
+ }
205
+ }
206
+ }
207
+
208
+ // Deserialized and extract the profile's start time
209
+ private long getProfileStart (Path srcPath ) {
210
+ try (InputStream is = fs .open (srcPath )) {
211
+ QueryProfile profile = pStoreConfig .getSerializer ().deserialize (IOUtils .toByteArray (is ));
212
+ return profile .getStart ();
213
+ } catch (IOException e ) {
214
+ logger .info ("Unable to deserialize {}\n ---{}====" , srcPath , e .getMessage ()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538]
215
+ logger .info ("deserialization RCA==> \n {}" , ExceptionUtils .getRootCause (e ));
216
+ }
217
+ return Long .MIN_VALUE ;
218
+ }
219
+
220
+ // List all profiles in store's root and identify potential candidates for archiving
221
+ private int listForArchiving () throws IOException {
222
+ // Not performing recursive search of profiles
223
+ List <FileStatus > fileStatuses = DrillFileSystemUtil .listFiles (fs , basePath , false , sysFileSuffixFilter );
224
+
225
+ int numProfilesInStore = 0 ;
226
+ for (FileStatus stat : fileStatuses ) {
227
+ String profileName = stat .getPath ().getName ();
228
+ //Strip extension and store only query ID
229
+ profiles .add (profileName .substring (0 , profileName .length () - DRILL_SYS_FILE_EXT_SIZE ), false );
230
+ numProfilesInStore ++;
231
+ }
232
+
233
+ return numProfilesInStore ;
234
+ }
235
+
236
+ // Infers File System of Local Store
237
+ private DrillFileSystem inferFileSystem (DrillConfig drillConfig ) throws IOException {
238
+ boolean hasZkBlobRoot = drillConfig .hasPath (ZookeeperPersistentStoreProvider .DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT );
239
+ final Path blobRoot = hasZkBlobRoot ?
240
+ new org .apache .hadoop .fs .Path (drillConfig .getString (ZookeeperPersistentStoreProvider .DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT )) :
241
+ LocalPersistentStore .getLogDir ();
242
+
243
+ return LocalPersistentStore .getFileSystem (drillConfig , blobRoot );
244
+ }
245
+
246
+ }
0 commit comments