Skip to content

Commit

Permalink
DAEN-5276 - Deprecate CDS reading (#624)
Browse files Browse the repository at this point in the history
  • Loading branch information
brenopapa authored Aug 22, 2024
1 parent 14cc9b7 commit cf74920
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 24 deletions.
15 changes: 0 additions & 15 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,6 @@ and then
Ingesting data
--------------

From both Staging Tables and Data Models (CDS Layer)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Use this method when you need to read most of the records and columns from the source.

.. code:: python
from pycarol import Carol, Staging
staging = Staging(Carol())
df = staging.fetch_parquet(
staging_name="execution_history",
connector_name="model"
)
From both Staging Tables and Data Models (BQ Layer)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
6 changes: 3 additions & 3 deletions pycarol/cds.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"""
from .connectors import Connectors
from .data_models import DataModel
from .utils.deprecation_msgs import _deprecation_msgs
from .utils.deprecation_msgs import _deprecation_msgs, deprecated
import warnings

_MACHINE_FLAVORS = [
Expand Down Expand Up @@ -41,7 +41,7 @@ def check_worker_type(worker_type):
Warning, stacklevel=3
)


@deprecated('2.55.1', '2.57.0', 'CDS Data reading is deprecated - Use Big Query layer to read data from Carol.')
class CDSStaging:
"""
Class to handle all CDS Staging iterations.
Expand Down Expand Up @@ -331,7 +331,7 @@ def count(self, staging_name, connector_id=None, connector_name=None):
"stagingType": staging_name}
return self.carol.call_api(path='v1/cds/staging/fetchCount', method='POST', params=query_params).get('count')


@deprecated('2.55.1', '2.57.0', 'CDS Data reading is deprecated - Use Big Query layer to read data from Carol.')
class CDSGolden:
"""
Class to handle all CDS Staging iterations.
Expand Down
3 changes: 2 additions & 1 deletion pycarol/data_models/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from ..utils.miscellaneous import stream_data
from .. import _CAROL_METADATA_GOLDEN, _NEEDED_FOR_MERGE, _REJECTED_DM_COLS, _CAROL_METADATA_UNTIE_GOLDEN
from ..utils.miscellaneous import drop_duplicated_parquet, drop_duplicated_parquet_dask
from ..utils.deprecation_msgs import _deprecation_msgs
from ..utils.deprecation_msgs import _deprecation_msgs, deprecated
from ..exceptions import CarolApiResponseException

_DATA_MODEL_TYPES_MAPPING = {
Expand Down Expand Up @@ -102,6 +102,7 @@ def _get(self, id, by='id'):
{resp['mdmName']: self._get_name_type_data_models(resp['mdmFields'])})
return resp

@deprecated('2.55.1', '2.57.0', 'CDS Data reading is deprecated - Use Big Query layer to read data from Carol.')
def fetch_parquet(
self, dm_name, merge_records=True, backend='pandas',
return_dask_graph=False,
Expand Down
5 changes: 3 additions & 2 deletions pycarol/functions/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Carol, ApiKeyAuth, PwdAuth, Tasks, Staging, Connectors, CDSStaging, Subscription, DataModel, Apps, CDSGolden
)
from pycarol.query import delete_golden
from pycarol.utils.deprecation_msgs import deprecated

def track_tasks(carol, task_list, retry_count=3, logger=None, callback=None, polling_delay=5):
"""Track a list of taks from carol, waiting for errors/completeness.
Expand Down Expand Up @@ -76,7 +77,7 @@ def callback(task_list):
if callable(callback):
callback(task_status)


@deprecated('2.55.1', '2.57.0', 'CDS Data reading is deprecated - Use Big Query layer to read data from Carol.')
def delele_all_golden_data(carol, dm_name):
"""Delete golden files from a datamodel in all storages.
Expand Down Expand Up @@ -117,7 +118,7 @@ def par_delete_golden(carol, dm_list, n_jobs=5):
for i in dm_list)
return list(chain(*tasks))


@deprecated('2.55.1', '2.57.0', 'CDS Data reading is deprecated - Use Big Query layer to read data from Carol.')
def delete_staging_data(carol, staging_name, connector_name):
"""Delete a staging.
Expand Down
3 changes: 2 additions & 1 deletion pycarol/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .utils.miscellaneous import stream_data
from . import _CAROL_METADATA_STAGING, _NEEDED_FOR_MERGE, _CAROL_METADATA_UNTIE_STAGING
from .utils.miscellaneous import drop_duplicated_parquet, drop_duplicated_parquet_dask
from .utils.deprecation_msgs import _deprecation_msgs
from .utils.deprecation_msgs import _deprecation_msgs, deprecated

_SCHEMA_TYPES_MAPPING = {
"geopoint": str,
Expand Down Expand Up @@ -399,6 +399,7 @@ def _connector_by_name(self, connector_name):
"""
return Connectors(self.carol).get_by_name(connector_name)['mdmId']

@deprecated('2.55.1', '2.57.0', 'CDS Data reading is deprecated - Use Big Query layer to read data from Carol.')
def fetch_parquet(self, staging_name, connector_id=None, connector_name=None, backend='pandas',
merge_records=True, return_dask_graph=False, columns=None, max_hits=None,
return_metadata=False, callback=None, cds=True, max_workers=None, file_pattern=None,
Expand Down
6 changes: 4 additions & 2 deletions pycarol/utils/miscellaneous.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import gzip, io, zipfile, os
from collections import defaultdict
from pathlib import Path
_FILE_MARKER = '<files>'

from pycarol.utils.deprecation_msgs import deprecated
_FILE_MARKER = '<files>'

@deprecated('2.55.1', '2.57.0', 'CDS Data reading is deprecated - Use Big Query layer to read data from Carol.')
def drop_duplicated_parquet_dask(d, untie_field='mdmCounterForEntity'):
"""
Merge updates and delete records from the parquet files in CDS.
Expand Down Expand Up @@ -33,7 +35,7 @@ def drop_duplicated_parquet_dask(d, untie_field='mdmCounterForEntity'):
d = d.reset_index(drop=True)
return d


@deprecated('2.55.1', '2.57.0', 'CDS Data reading is deprecated - Use Big Query layer to read data from Carol.')
def drop_duplicated_parquet(d, untie_field='mdmCounterForEntity'):
"""
Merge updates and delete records from the parquet files in CDS.
Expand Down

1 comment on commit cf74920

@totvslabsbot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.