Skip to content

Commit bef7787

Browse files
committed
fix issue 611; sd2pq parms
1 parent d116327 commit bef7787

File tree

5 files changed

+97
-69
lines changed

5 files changed

+97
-69
lines changed

saspy/sasbase.py

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,15 +2000,13 @@ def sasdata2dataframe(self, table: str, libref: str = '', dsopts: dict = None,
20002000
return df
20012001

20022002
def sd2pq(self, parquet_file_path: str, table: str, libref: str ='', dsopts: dict = None,
2003-
pa_parquet_kwargs = {"compression": 'snappy',
2004-
"flavor":"spark",
2005-
"write_statistics":False},
2006-
pa_pandas_kwargs = {},
2007-
partitioned = False,
2003+
pa_parquet_kwargs = None,
2004+
pa_pandas_kwargs = None,
2005+
partitioned = False,
20082006
partition_size_mb = 128,
2009-
chunk_size_mb = 4,
2010-
coerce_timestamp_errors=True,
2011-
static_columns:list = None,
2007+
chunk_size_mb = 4,
2008+
coerce_timestamp_errors = True,
2009+
static_columns:list = None,
20122010
rowsep: str = '\x01', colsep: str = '\x02',
20132011
rowrep: str = ' ', colrep: str = ' ',
20142012
**kwargs) -> None:
@@ -2055,13 +2053,19 @@ def sd2pq(self, parquet_file_path: str, table: str, libref: str ='', dsopts: dic
20552053
20562054
:return: None
20572055
"""
2058-
dsopts = dsopts if dsopts is not None else {}
2056+
dsopts = dsopts if dsopts is not None else {}
2057+
parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
2058+
"flavor":"spark",
2059+
"write_statistics":False
2060+
}
2061+
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}
2062+
20592063
return self.sasdata2parquet(parquet_file_path = parquet_file_path,
20602064
table = table,
20612065
libref = libref,
20622066
dsopts = dsopts,
2063-
pa_parquet_kwargs = pa_parquet_kwargs,
2064-
pa_pandas_kwargs = pa_pandas_kwargs,
2067+
pa_parquet_kwargs = parquet_kwargs,
2068+
pa_pandas_kwargs = pandas_kwargs,
20652069
partitioned = partitioned,
20662070
partition_size_mb = partition_size_mb,
20672071
chunk_size_mb = chunk_size_mb,
@@ -2077,17 +2081,15 @@ def sd2pq(self, parquet_file_path: str, table: str, libref: str ='', dsopts: dic
20772081
def sasdata2parquet(self,
20782082
parquet_file_path: str,
20792083
table: str,
2080-
libref: str ='',
2081-
dsopts: dict = None,
2082-
pa_parquet_kwargs = {"compression": 'snappy',
2083-
"flavor":"spark",
2084-
"write_statistics":False},
2085-
pa_pandas_kwargs = {},
2086-
partitioned = False,
2084+
libref: str ='',
2085+
dsopts: dict = None,
2086+
pa_parquet_kwargs = None,
2087+
pa_pandas_kwargs = None,
2088+
partitioned = False,
20872089
partition_size_mb = 128,
2088-
chunk_size_mb = 4,
2089-
coerce_timestamp_errors=True,
2090-
static_columns:list = None,
2090+
chunk_size_mb = 4,
2091+
coerce_timestamp_errors = True,
2092+
static_columns:list = None,
20912093
rowsep: str = '\x01',
20922094
colsep: str = '\x02',
20932095
rowrep: str = ' ',
@@ -2139,6 +2141,12 @@ def sasdata2parquet(self,
21392141
lastlog = len(self._io._log)
21402142

21412143
dsopts = dsopts if dsopts is not None else {}
2144+
parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
2145+
"flavor":"spark",
2146+
"write_statistics":False
2147+
}
2148+
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}
2149+
21422150
if self.exist(table, libref) == 0:
21432151
logger.error('The SAS Data Set ' + libref + '.' + table + ' does not exist')
21442152
if self.sascfg.bcv < 3007009:
@@ -2154,8 +2162,8 @@ def sasdata2parquet(self,
21542162
table = table,
21552163
libref = libref,
21562164
dsopts = dsopts,
2157-
pa_parquet_kwargs = pa_parquet_kwargs,
2158-
pa_pandas_kwargs = pa_pandas_kwargs,
2165+
pa_parquet_kwargs = parquet_kwargs,
2166+
pa_pandas_kwargs = pandas_kwargs,
21592167
partitioned = partitioned,
21602168
partition_size_mb = partition_size_mb,
21612169
chunk_size_mb = chunk_size_mb,

saspy/sasdata.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,15 +1159,13 @@ def score(self, file: str = '', code: str = '', out: 'SASdata' = None) -> 'SASda
11591159
return ll
11601160

11611161
def to_pq(self, parquet_file_path: str,
1162-
pa_parquet_kwargs = {"compression": 'snappy',
1163-
"flavor":"spark",
1164-
"write_statistics":False},
1165-
pa_pandas_kwargs = {},
1166-
partitioned = False,
1162+
pa_parquet_kwargs = None,
1163+
pa_pandas_kwargs = None,
1164+
partitioned = False,
11671165
partition_size_mb = 128,
1168-
chunk_size_mb = 4,
1169-
coerce_timestamp_errors=True,
1170-
static_columns:list = None,
1166+
chunk_size_mb = 4,
1167+
coerce_timestamp_errors = True,
1168+
static_columns:list = None,
11711169
rowsep: str = '\x01', colsep: str = '\x02',
11721170
rowrep: str = ' ', colrep: str = ' ',
11731171
**kwargs) -> None:
@@ -1207,6 +1205,13 @@ def to_pq(self, parquet_file_path: str,
12071205
:return: None
12081206
"""
12091207
lastlog = len(self.sas._io._log)
1208+
1209+
parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
1210+
"flavor":"spark",
1211+
"write_statistics":False
1212+
}
1213+
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}
1214+
12101215
ll = self._is_valid()
12111216
self.sas._lastlog = self.sas._io._log[lastlog:]
12121217
if ll:
@@ -1217,8 +1222,8 @@ def to_pq(self, parquet_file_path: str,
12171222
table = self.table,
12181223
libref = self.libref,
12191224
dsopts = self.dsopts,
1220-
pa_parquet_kwargs = pa_parquet_kwargs,
1221-
pa_pandas_kwargs = pa_pandas_kwargs,
1225+
pa_parquet_kwargs = parquet_kwargs,
1226+
pa_pandas_kwargs = pandas_kwargs,
12221227
partitioned = partitioned,
12231228
partition_size_mb = partition_size_mb,
12241229
chunk_size_mb = chunk_size_mb,

saspy/sasiohttp.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2283,10 +2283,8 @@ def sasdata2parquet(self,
22832283
table: str,
22842284
libref: str ='',
22852285
dsopts: dict = None,
2286-
pa_parquet_kwargs = {"compression": 'snappy',
2287-
"flavor":"spark",
2288-
"write_statistics":False},
2289-
pa_pandas_kwargs = {},
2286+
pa_parquet_kwargs = None,
2287+
pa_pandas_kwargs = None,
22902288
partitioned = False,
22912289
partition_size_mb = 128,
22922290
chunk_size_mb = 4,
@@ -2324,8 +2322,15 @@ def sasdata2parquet(self,
23242322
if not pa:
23252323
logger.error("pyarrow was not imported. This method can't be used without it.")
23262324
return None
2325+
2326+
parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
2327+
"flavor":"spark",
2328+
"write_statistics":False
2329+
}
2330+
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}
2331+
23272332
try:
2328-
compression = pa_parquet_kwargs["compression"]
2333+
compression = parquet_kwargs["compression"]
23292334
except KeyError:
23302335
raise KeyError("The pa_parquet_kwargs dict needs to contain at least the parameter 'compression'. Default value is 'snappy'")
23312336

@@ -2604,12 +2609,12 @@ def dts_to_pyarrow_schema(dtype_dict):
26042609
schema = pa.schema(fields)
26052610
return schema
26062611
# derive parque schema if not defined by user.
2607-
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
2612+
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
26082613
custom_schema = False
2609-
pa_parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
2614+
parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
26102615
else:
26112616
custom_schema = True
2612-
pa_pandas_kwargs["schema"] = pa_parquet_kwargs["schema"]
2617+
pandas_kwargs["schema"] = parquet_kwargs["schema"]
26132618

26142619
##### START STERAM #####
26152620
parquet_writer = None
@@ -2685,7 +2690,7 @@ def dts_to_pyarrow_schema(dtype_dict):
26852690
raise ValueError(f"""The column {dvarlist[i]} contains an unparseable timestamp.
26862691
Consider setting a different pd_timestamp_format or set coerce_timestamp_errors = True and they will be cast as Null""")
26872692

2688-
pa_table = pa.Table.from_pandas(df,**pa_pandas_kwargs)
2693+
pa_table = pa.Table.from_pandas(df,**pandas_kwargs)
26892694

26902695
if not custom_schema:
26912696
#cast the int64 columns to timestamp
@@ -2727,9 +2732,9 @@ def dts_to_pyarrow_schema(dtype_dict):
27272732
raise e
27282733

27292734
if not parquet_writer:
2730-
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
2731-
pa_parquet_kwargs["schema"] = pa_table.schema
2732-
parquet_writer = pq.ParquetWriter(path,**pa_parquet_kwargs)#use_deprecated_int96_timestamps=True,
2735+
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
2736+
parquet_kwargs["schema"] = pa_table.schema
2737+
parquet_writer = pq.ParquetWriter(path,**parquet_kwargs)#use_deprecated_int96_timestamps=True,
27332738

27342739
# Write the table chunk to the Parquet file
27352740
parquet_writer.write_table(pa_table)

saspy/sasioiom.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2304,10 +2304,8 @@ def sasdata2parquet(self,
23042304
table: str,
23052305
libref: str ='',
23062306
dsopts: dict = None,
2307-
pa_parquet_kwargs = {"compression": 'snappy',
2308-
"flavor":"spark",
2309-
"write_statistics":False},
2310-
pa_pandas_kwargs = {},
2307+
pa_parquet_kwargs = None,
2308+
pa_pandas_kwargs = None,
23112309
partitioned = False,
23122310
partition_size_mb = 128,
23132311
chunk_size_mb = 4,
@@ -2345,8 +2343,15 @@ def sasdata2parquet(self,
23452343
if not pa:
23462344
logger.error("pyarrow was not imported. This method can't be used without it.")
23472345
return None
2346+
2347+
parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
2348+
"flavor":"spark",
2349+
"write_statistics":False
2350+
}
2351+
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}
2352+
23482353
try:
2349-
compression = pa_parquet_kwargs["compression"]
2354+
compression = parquet_kwargs["compression"]
23502355
except KeyError:
23512356
raise KeyError("The pa_parquet_kwargs dict needs to contain at least the parameter 'compression'. Default value is 'snappy'")
23522357

@@ -2603,12 +2608,12 @@ def dts_to_pyarrow_schema(dtype_dict):
26032608
schema = pa.schema(fields)
26042609
return schema
26052610
# derive parque schema if not defined by user.
2606-
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
2611+
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
26072612
custom_schema = False
2608-
pa_parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
2613+
parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
26092614
else:
26102615
custom_schema = True
2611-
pa_pandas_kwargs["schema"] = pa_parquet_kwargs["schema"]
2616+
pandas_kwargs["schema"] = parquet_kwargs["schema"]
26122617

26132618
##### START STERAM #####
26142619
parquet_writer = None
@@ -2685,7 +2690,7 @@ def dts_to_pyarrow_schema(dtype_dict):
26852690
raise ValueError(f"""The column {dvarlist[i]} contains an unparseable timestamp.
26862691
Consider setting a different pd_timestamp_format or set coerce_timestamp_errors = True and they will be cast as Null""")
26872692

2688-
pa_table = pa.Table.from_pandas(df,**pa_pandas_kwargs)
2693+
pa_table = pa.Table.from_pandas(df,**pandas_kwargs)
26892694

26902695
if not custom_schema:
26912696
#cast the int64 columns to timestamp
@@ -2726,9 +2731,9 @@ def dts_to_pyarrow_schema(dtype_dict):
27262731
)
27272732
raise e
27282733
if not parquet_writer:
2729-
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
2730-
pa_parquet_kwargs["schema"] = pa_table.schema
2731-
parquet_writer = pq.ParquetWriter(path,**pa_parquet_kwargs)#use_deprecated_int96_timestamps=True,
2734+
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
2735+
parquet_kwargs["schema"] = pa_table.schema
2736+
parquet_writer = pq.ParquetWriter(path,**parquet_kwargs)#use_deprecated_int96_timestamps=True,
27322737

27332738
# Write the table chunk to the Parquet file
27342739
parquet_writer.write_table(pa_table)

saspy/sasiostdio.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2705,10 +2705,8 @@ def sasdata2parquet(self,
27052705
table: str,
27062706
libref: str ='',
27072707
dsopts: dict = None,
2708-
pa_parquet_kwargs = {"compression": 'snappy',
2709-
"flavor":"spark",
2710-
"write_statistics":False},
2711-
pa_pandas_kwargs = {},
2708+
pa_parquet_kwargs = None,
2709+
pa_pandas_kwargs = None,
27122710
partitioned = False,
27132711
partition_size_mb = 128,
27142712
chunk_size_mb = 4,
@@ -2747,8 +2745,15 @@ def sasdata2parquet(self,
27472745
if not pa:
27482746
logger.error("pyarrow was not imported. This method can't be used without it.")
27492747
return None
2748+
2749+
parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
2750+
"flavor":"spark",
2751+
"write_statistics":False
2752+
}
2753+
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}
2754+
27502755
try:
2751-
compression = pa_parquet_kwargs["compression"]
2756+
compression = parquet_kwargs["compression"]
27522757
except KeyError:
27532758
raise KeyError("The pa_parquet_kwargs dict needs to contain at least the parameter 'compression'. Default value is 'snappy'")
27542759

@@ -3023,12 +3028,12 @@ def dts_to_pyarrow_schema(dtype_dict):
30233028
schema = pa.schema(fields)
30243029
return schema
30253030
# derive parque schema if not defined by user.
3026-
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
3031+
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
30273032
custom_schema = False
3028-
pa_parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
3033+
parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
30293034
else:
30303035
custom_schema = True
3031-
pa_pandas_kwargs["schema"] = pa_parquet_kwargs["schema"]
3036+
pandas_kwargs["schema"] = parquet_kwargs["schema"]
30323037

30333038
##### START STERAM #####
30343039
parquet_writer = None
@@ -3107,7 +3112,7 @@ def dts_to_pyarrow_schema(dtype_dict):
31073112
raise ValueError(f"""The column {dvarlist[i]} contains an unparseable timestamp.
31083113
Consider setting a different pd_timestamp_format or set coerce_timestamp_errors = True and they will be cast as Null""")
31093114

3110-
pa_table = pa.Table.from_pandas(df,**pa_pandas_kwargs)
3115+
pa_table = pa.Table.from_pandas(df,**pandas_kwargs)
31113116

31123117
if not custom_schema:
31133118
#cast the int64 columns to timestamp
@@ -3149,9 +3154,9 @@ def dts_to_pyarrow_schema(dtype_dict):
31493154
raise e
31503155

31513156
if not parquet_writer:
3152-
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
3153-
pa_parquet_kwargs["schema"] = pa_table.schema
3154-
parquet_writer = pq.ParquetWriter(path,**pa_parquet_kwargs)#use_deprecated_int96_timestamps=True,
3157+
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
3158+
parquet_kwargs["schema"] = pa_table.schema
3159+
parquet_writer = pq.ParquetWriter(path,**parquet_kwargs)#use_deprecated_int96_timestamps=True,
31553160

31563161
# Write the table chunk to the Parquet file
31573162
parquet_writer.write_table(pa_table)

0 commit comments

Comments
 (0)