-
Notifications
You must be signed in to change notification settings - Fork 11
/
dsm2dtm.py
360 lines (319 loc) · 13.2 KB
/
dsm2dtm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
"""
dsm2dtm - Generate DTM (Digital Terrain Model) from DSM (Digital Surface Model)
Author: Naman Jain
www.namanji.wixsite.com/naman/
"""
import os
import numpy as np
import rasterio
import argparse
try:
import gdal
except:
from osgeo import gdal
def downsample_raster(in_path, out_path, downsampling_factor):
gdal_raster = gdal.Open(in_path)
width, height = gdal_raster.RasterXSize, gdal_raster.RasterYSize
gdal.Translate(
out_path,
in_path,
width=int((width // downsampling_factor)),
height=int((height // downsampling_factor)),
outputType=gdal.GDT_Float32,
)
def upsample_raster(in_path, out_path, target_height, target_width):
gdal.Translate(
out_path,
in_path,
width=target_width,
height=target_height,
resampleAlg="bilinear",
outputType=gdal.GDT_Float32,
)
def generate_slope_raster(in_path, out_path):
"""
Generates a slope raster from the input DEM raster.
Input:
in_path: {string} path to the DEM raster
Output:
out_path: {string} path to the generated slope image
"""
cmd = "gdaldem slope -alg ZevenbergenThorne {} {}".format(in_path, out_path)
os.system(cmd)
def get_mean(raster_path, ignore_value=-9999.0):
np_raster = np.array(gdal.Open(raster_path).ReadAsArray())
return np_raster[np_raster != ignore_value].mean()
def extract_dtm(dsm_path, ground_dem_path, non_ground_dem_path, radius, terrain_slope):
"""
Generates a ground DEM and non-ground DEM raster from the input DSM raster.
Input:
dsm_path: {string} path to the DSM raster
radius: {int} Search radius of kernel in cells.
terrain_slope: {float} average slope of the input terrain
Output:
ground_dem_path: {string} path to the generated ground DEM raster
non_ground_dem_path: {string} path to the generated non-ground DEM raster
"""
cmd = "saga_cmd grid_filter 7 -INPUT {} -RADIUS {} -TERRAINSLOPE {} -GROUND {} -NONGROUND {}".format(
dsm_path, radius, terrain_slope, ground_dem_path, non_ground_dem_path
)
os.system(cmd)
def remove_noise(ground_dem_path, out_path, ignore_value=-99999.0):
"""
Removes noise (high elevation data points like roofs, etc.) from the ground DEM raster.
Replaces values in those pixels with No data Value (-99999.0)
Input:
ground_dem_path: {string} path to the generated ground DEM raster
no_data_value: {float} replacing value in the ground raster (to be treated as No Data Value)
Output:
out_path: {string} path to the filtered ground DEM raster
"""
ground_np = np.array(gdal.Open(ground_dem_path).ReadAsArray())
std = ground_np[ground_np != ignore_value].std()
mean = ground_np[ground_np != ignore_value].mean()
threshold_value = mean + 1.5 * std
ground_np[ground_np >= threshold_value] = -99999.0
save_array_as_geotif(ground_np, ground_dem_path, out_path)
def save_array_as_geotif(array, source_tif_path, out_path):
"""
Generates a geotiff raster from the input numpy array (height * width * depth)
Input:
array: {numpy array} numpy array to be saved as geotiff
source_tif_path: {string} path to the geotiff from which projection and geotransformation information will be extracted.
Output:
out_path: {string} path to the generated Geotiff raster
"""
if len(array.shape) > 2:
height, width, depth = array.shape
else:
height, width = array.shape
depth = 1
source_tif = gdal.Open(source_tif_path)
driver = gdal.GetDriverByName("GTiff")
dataset = driver.Create(out_path, width, height, depth, gdal.GDT_Float32)
if depth != 1:
for i in range(depth):
dataset.GetRasterBand(i + 1).WriteArray(array[:, :, i])
else:
dataset.GetRasterBand(1).WriteArray(array)
geotrans = source_tif.GetGeoTransform()
proj = source_tif.GetProjection()
dataset.SetGeoTransform(geotrans)
dataset.SetProjection(proj)
dataset.FlushCache()
dataset = None
def sdat_to_gtiff(sdat_raster_path, out_gtiff_path):
gdal.Translate(
out_gtiff_path,
sdat_raster_path,
format="GTiff",
)
def close_gaps(in_path, out_path, threshold=0.1):
"""
Interpolates the holes (no data value) in the input raster.
Input:
in_path: {string} path to the input raster with holes
threshold: {float} Tension Threshold
Output:
out_path: {string} path to the generated raster with closed holes.
"""
cmd = "saga_cmd grid_tools 7 -INPUT {} -THRESHOLD {} -RESULT {}".format(
in_path, threshold, out_path
)
os.system(cmd)
def smoothen_raster(in_path, out_path, radius=2):
"""
Applies gaussian filter to the input raster.
Input:
in_path: {string} path to the input raster
radius: {int} kernel radius to be used for smoothing
Output:
out_path: {string} path to the generated smoothened raster
"""
cmd = "saga_cmd grid_filter 1 -INPUT {} -RESULT {} -KERNEL_TYPE 0 -KERNEL_RADIUS {}".format(
in_path, out_path, radius
)
os.system(cmd)
def subtract_rasters(rasterA_path, rasterB_path, out_path, no_data_value=-99999.0):
cmd = 'gdal_calc.py -A {} -B {} --outfile {} --NoDataValue={} --calc="A-B"'.format(
rasterA_path, rasterB_path, out_path, no_data_value
)
os.system(cmd)
def replace_values(
rasterA_path, rasterB_path, out_path, no_data_value=-99999.0, threshold=0.98
):
"""
Replaces values in input rasterA with no_data_value where cell value >= threshold in rasterB
Input:
rasterA_path: {string} path to the input rasterA
rasterB_path: {string} path to the input rasterB
Output:
out_path: {string} path to the generated raster
"""
cmd = 'gdal_calc.py -A {} --NoDataValue={} -B {} --outfile {} --calc="{}*(B>={}) + (A)*(B<{})"'.format(
rasterA_path,
no_data_value,
rasterB_path,
out_path,
no_data_value,
threshold,
threshold,
)
os.system(cmd)
def expand_holes_in_raster(
in_path, search_window=7, no_data_value=-99999.0, threshold=50
):
"""
Expands holes (cells with no_data_value) in the input raster.
Input:
in_path: {string} path to the input raster
search_window: {int} kernel size to be used as window
threshold: {float} threshold on percentage of cells with no_data_value
Output:
np_raster: {numpy array} Returns the modified input raster's array
"""
np_raster = np.array(gdal.Open(in_path).ReadAsArray())
height, width = np_raster.shape[0], np_raster.shape[1]
for i in range(int((search_window - 1) / 2), width, 1):
for j in range(int((search_window - 1) / 2), height, 1):
window = np_raster[
int(i - (search_window - 1) / 2) : int(i - (search_window - 1) / 2)
+ search_window,
int(j - (search_window - 1) / 2) : int(j - (search_window - 1) / 2)
+ search_window,
]
if (
np.count_nonzero(window == no_data_value)
>= (threshold * search_window ** 2) / 100
):
try:
np_raster[i, j] = no_data_value
except:
pass
return np_raster
def get_raster_crs(raster_path):
"""
Returns the CRS (Coordinate Reference System) of the raster
Input:
raster_path: {string} path to the source tif image
"""
raster = rasterio.open(raster_path)
return raster.crs
def get_raster_resolution(raster_path):
raster = gdal.Open(raster_path)
raster_geotrans = raster.GetGeoTransform()
x_res = raster_geotrans[1]
y_res = -raster_geotrans[5]
return x_res, y_res
def get_res_and_downsample(dsm_path, temp_dir):
# check DSM resolution. Downsample if DSM is of very high resolution to save processing time.
x_res, y_res = get_raster_resolution(dsm_path) # resolutions are in meters
dsm_name = dsm_path.split("/")[-1].split(".")[0]
dsm_crs = get_raster_crs(dsm_path)
if dsm_crs != 4326:
if x_res < 0.3 or y_res < 0.3:
target_res = 0.3 # downsample to this resolution (in meters)
downsampling_factor = target_res / gdal.Open(dsm_path).GetGeoTransform()[1]
downsampled_dsm_path = os.path.join(temp_dir, dsm_name + "_ds.tif")
# Dowmsampling DSM
downsample_raster(dsm_path, downsampled_dsm_path, downsampling_factor)
dsm_path = downsampled_dsm_path
else:
if x_res < 2.514e-06 or y_res < 2.514e-06:
target_res = 2.514e-06 # downsample to this resolution (in degrees)
downsampling_factor = target_res / gdal.Open(dsm_path).GetGeoTransform()[1]
downsampled_dsm_path = os.path.join(temp_dir, dsm_name + "_ds.tif")
# Dowmsampling DSM
downsample_raster(dsm_path, downsampled_dsm_path, downsampling_factor)
dsm_path = downsampled_dsm_path
return dsm_path
def get_updated_params(dsm_path, search_radius, smoothen_radius):
# search_radius and smoothen_radius are set wrt to 30cm DSM
# returns updated parameters if DSM is of coarser resolution
x_res, y_res = get_raster_resolution(dsm_path) # resolutions are in meters
dsm_crs = get_raster_crs(dsm_path)
if dsm_crs != 4326:
if x_res > 0.3 or y_res > 0.3:
search_radius = int((min(x_res, y_res) * search_radius) / 0.3)
smoothen_radius = int((min(x_res, y_res) * smoothen_radius) / 0.3)
else:
if x_res > 2.514e-06 or y_res > 2.514e-06:
search_radius = int((min(x_res, y_res) * search_radius) / 2.514e-06)
smoothen_radius = int((min(x_res, y_res) * smoothen_radius) / 2.514e-06)
return search_radius, smoothen_radius
def main(
dsm_path,
out_dir,
search_radius=40,
smoothen_radius=45,
dsm_replace_threshold_val=0.98,
):
# master function that calls all other functions
os.makedirs(out_dir, exist_ok=True)
temp_dir = os.path.join(out_dir, "temp_files")
os.makedirs(temp_dir, exist_ok=True)
dsm_path = get_res_and_downsample(dsm_path, temp_dir)
# get updated params wrt to DSM resolution
search_radius, smoothen_radius = get_updated_params(
dsm_path, search_radius, smoothen_radius
)
# Generate DTM
# STEP 1: Generate slope raster from dsm to get average slope value
dsm_name = dsm_path.split("/")[-1].split(".")[0]
dsm_slp_path = os.path.join(temp_dir, dsm_name + "_slp.tif")
generate_slope_raster(dsm_path, dsm_slp_path)
avg_slp = int(get_mean(dsm_slp_path))
# STEP 2: Split DSM into ground and non-ground surface rasters
ground_dem_path = os.path.join(temp_dir, dsm_name + "_ground.sdat")
non_ground_dem_path = os.path.join(temp_dir, dsm_name + "_non_ground.sdat")
extract_dtm(
dsm_path,
ground_dem_path,
non_ground_dem_path,
search_radius,
avg_slp,
)
# STEP 3: Applying Gaussian Filter on the generated ground raster (parameters: radius = 45, mode = Circle)
smoothened_ground_path = os.path.join(temp_dir, dsm_name + "_ground_smth.sdat")
smoothen_raster(ground_dem_path, smoothened_ground_path, smoothen_radius)
# STEP 4: Generating a difference raster (ground DEM - smoothened ground DEM)
diff_raster_path = os.path.join(temp_dir, dsm_name + "_ground_diff.sdat")
subtract_rasters(ground_dem_path, smoothened_ground_path, diff_raster_path)
# STEP 5: Thresholding on the difference raster to replace values in Ground DEM by no-data values (threshold = 0.98)
thresholded_ground_path = os.path.join(
temp_dir, dsm_name + "_ground_thresholded.sdat"
)
replace_values(
ground_dem_path,
diff_raster_path,
thresholded_ground_path,
threshold=dsm_replace_threshold_val,
)
# STEP 6: Removing noisy spikes from the generated DTM
ground_dem_filtered_path = os.path.join(temp_dir, dsm_name + "_ground_filtered.tif")
remove_noise(thresholded_ground_path, ground_dem_filtered_path)
# STEP 7: Expanding holes in the thresholded ground raster
bigger_holes_ground_path = os.path.join(
temp_dir, dsm_name + "_ground_bigger_holes.sdat"
)
temp = expand_holes_in_raster(ground_dem_filtered_path)
save_array_as_geotif(temp, ground_dem_filtered_path, bigger_holes_ground_path)
# STEP 8: Close gaps in the DTM
dtm_path = os.path.join(temp_dir, dsm_name + "_dtm.sdat")
close_gaps(bigger_holes_ground_path, dtm_path)
# STEP 9: Convert to GeoTiff
dtm_array = gdal.Open(dtm_path).ReadAsArray()
dtm_tif_path = os.path.join(out_dir, dsm_name + "_dtm.tif")
# save_array_as_geotif(dtm_array, dsm_path, dtm_tif_path)
sdat_to_gtiff(dtm_path, dtm_tif_path)
return dtm_tif_path
# -----------------------------------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate DTM from DSM")
parser.add_argument("--dsm", help="dsm path string")
args = parser.parse_args()
dsm_path = args.dsm
out_dir = "generated_dtm"
dtm_path = main(dsm_path, out_dir)
print("######### DTM generated at: ", dtm_path)