-
Notifications
You must be signed in to change notification settings - Fork 2
/
00b_lakehouse_etl.py
95 lines (67 loc) · 2.37 KB
/
00b_lakehouse_etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# Databricks notebook source
# MAGIC %md
# MAGIC ### Setup
# MAGIC
# MAGIC In this case we'll grab a CSV from the web, but we could also use Python or Spark to read data from databases or cloud storage.
# COMMAND ----------
# MAGIC %sh
# MAGIC wget https://raw.githubusercontent.com/IBM/telco-customer-churn-on-icp4d/master/data/Telco-Customer-Churn.csv
# COMMAND ----------
# MAGIC %md
# MAGIC ### Load into Delta Lake
# COMMAND ----------
# MAGIC %md
# MAGIC #### Path configs
# COMMAND ----------
# MAGIC %run ./00_reset
# COMMAND ----------
# MAGIC %fs ls /home/[email protected]/ibm-telco-churn/
# COMMAND ----------
# copy the data from driver to DBFS
# dbutils.fs.cp('file:/databricks/driver/Telco-Customer-Churn.csv', driver_to_dbfs_path)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Read and display
# COMMAND ----------
# Define schema
schema = StructType([
StructField('customerID', StringType()),
StructField('gender', StringType()),
StructField('seniorCitizen', DoubleType()),
StructField('partner', StringType()),
StructField('dependents', StringType()),
StructField('tenure', DoubleType()),
StructField('phoneService', StringType()),
StructField('multipleLines', StringType()),
StructField('internetService', StringType()),
StructField('onlineSecurity', StringType()),
StructField('onlineBackup', StringType()),
StructField('deviceProtection', StringType()),
StructField('techSupport', StringType()),
StructField('streamingTV', StringType()),
StructField('streamingMovies', StringType()),
StructField('contract', StringType()),
StructField('paperlessBilling', StringType()),
StructField('paymentMethod', StringType()),
StructField('monthlyCharges', DoubleType()),
StructField('totalCharges', DoubleType()),
StructField('churnString', StringType())
])
# Read CSV, write to Delta and take a look
bronze_df = spark.read.format('csv').schema(schema).option('header','true')\
.load(driver_to_dbfs_path)
bronze_df.write.format('delta').mode('overwrite').save(bronze_tbl_path)
display(bronze_df)
# COMMAND ----------
dbutils.fs.ls(bronze_tbl_path)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Create bronze
# COMMAND ----------
# Create bronze table
_ = spark.sql('''
CREATE TABLE `{}`.{}
USING DELTA
LOCATION '{}'
'''.format(database_name,bronze_tbl_name,bronze_tbl_path))
# COMMAND ----------