1+ #!/usr/bin/env python3
2+ # Copyright (c) 2025, Salesforce, Inc.
3+ # SPDX-License-Identifier: Apache-2
4+
5+ """
6+ Housing Sale Price Prediction with Einstein Regression
7+
8+ This example uses Einstein regression model to predict housing sale prices
9+ based on property features like Year_Built__c.
10+
11+ Model: YH_Regression_Python_Predicted_SalePrice_CM_12l_ATC937af934
12+ Type: Regression
13+ Input: Year_Built__c (numeric)
14+ Output: Predicted_SalePrice
15+ """
16+
17+ import logging
18+ from typing import Any , Dict , Optional
19+
20+ from datacustomcode .function import Runtime
21+ from datacustomcode .function .feature_types .chunking import (
22+ ChunkType ,
23+ SearchIndexChunkingV1Output ,
24+ SearchIndexChunkingV1Request ,
25+ SearchIndexChunkingV1Response ,
26+ )
27+
28+ from datacustomcode .einstein_predictions .types import (
29+ PredictionColumBuilder ,
30+ PredictionRequestBuilder ,
31+ PredictionType ,
32+ )
33+
34+ logger = logging .getLogger (__name__ )
35+ logging .basicConfig (level = logging .INFO )
36+
37+ # Configuration
38+ PREDICTION_MODEL_NAME = "YH_Regression_Python_Predicted_SalePrice_CM_12l_ATC937af934"
39+
40+
41+ def predict_sale_price (
42+ features : Dict [str , Any ],
43+ runtime : Runtime ,
44+ ) -> Optional [float ]:
45+ """Predict housing sale price using Einstein regression model.
46+
47+ Args:
48+ features: Extracted housing features (numeric and string)
49+ runtime: Runtime with prediction client
50+
51+ Returns:
52+ Predicted sale price or None if prediction fails
53+ """
54+ try :
55+ # Build prediction columns - handle both numeric and string values
56+ prediction_columns = []
57+
58+ for column_name , value in features .items ():
59+ if isinstance (value , str ):
60+ # String values (e.g., Garage_Qual__c)
61+ column = (
62+ PredictionColumBuilder ()
63+ .set_column_name (column_name )
64+ .set_string_values ([value ])
65+ .build ()
66+ )
67+ elif isinstance (value , (int , float )):
68+ # Numeric values
69+ column = (
70+ PredictionColumBuilder ()
71+ .set_column_name (column_name )
72+ .set_double_values ([float (value )])
73+ .build ()
74+ )
75+ else :
76+ # Skip unsupported types
77+ logger .warning (f"Skipping field { column_name } with unsupported type { type (value )} " )
78+ continue
79+
80+ prediction_columns .append (column )
81+
82+ # Build regression prediction request
83+ prediction_request = (
84+ PredictionRequestBuilder ()
85+ .set_prediction_type (PredictionType .REGRESSION )
86+ .set_model_api_name (PREDICTION_MODEL_NAME )
87+ .set_prediction_columns (prediction_columns )
88+ .build ()
89+ )
90+
91+ prediction_response = runtime .einstein_predictions .predict (prediction_request )
92+
93+ if not prediction_response .is_success :
94+ logger .error (f"Prediction failed: { prediction_response .data } " )
95+ return None
96+
97+ # Parse regression response
98+ results = prediction_response .data .get ("results" , [])
99+ if not results :
100+ logger .warning ("No results in prediction response" )
101+ return None
102+
103+ first_result = results [0 ]
104+ prediction_type = first_result .get ("type" )
105+
106+ if prediction_type != "RegressionPredictionSuccess" :
107+ logger .error (f"Unexpected prediction type: { prediction_type } " )
108+ logger .error (f"Full result: { first_result } " )
109+ return None
110+
111+ prediction_data = first_result .get ("prediction" , {})
112+ predicted_value = prediction_data .get ("value" )
113+
114+ if predicted_value is None :
115+ logger .warning ("No predicted value in response" )
116+ return None
117+
118+ logger .info (f"Predicted sale price: ${ predicted_value :,.2f} " )
119+
120+ # Log top contributors (which features influenced the price most)
121+ top_contributors = prediction_data .get ("topContributors" , [])
122+ if top_contributors :
123+ logger .info (f"Top price contributors: { top_contributors } " )
124+
125+ return float (predicted_value )
126+
127+ except Exception as e :
128+ logger .error (f"Prediction failed with error: { e } " , exc_info = True )
129+ return None
130+
131+
132+ def enrich_property_with_price (
133+ source_dmo_fields : Dict [str , Any ],
134+ runtime : Runtime ,
135+ ) -> Dict [str , str ]:
136+ """Enrich property data with predicted sale price.
137+
138+ Args:
139+ source_dmo_fields: Property features from source DMO
140+ runtime: Runtime for predictions
141+
142+ Returns:
143+ Citations dictionary with predicted price
144+ """
145+ citations = {}
146+
147+ # Copy original fields to citations
148+ if source_dmo_fields :
149+ for key , value in source_dmo_fields .items ():
150+ citations [key ] = str (value )
151+
152+ # Get price prediction - pass source_dmo_fields directly as features
153+ predicted_price = predict_sale_price (source_dmo_fields , runtime )
154+
155+ if predicted_price is not None :
156+ citations ["predicted_sale_price" ] = f"${ predicted_price :,.2f} "
157+ citations ["predicted_sale_price_raw" ] = str (predicted_price )
158+ citations ["prediction_status" ] = "success"
159+ else :
160+ citations ["predicted_sale_price" ] = "N/A"
161+ citations ["prediction_status" ] = "failed"
162+
163+ return citations
164+
165+
166+ def function (
167+ request : SearchIndexChunkingV1Request , runtime : Runtime
168+ ) -> SearchIndexChunkingV1Response :
169+ """Housing price prediction using Einstein regression.
170+
171+ Predicts sale prices for properties based on Year_Built__c feature
172+ and adds predictions to citations for real estate data enrichment.
173+
174+ Input format:
175+ {
176+ "input": [
177+ {
178+ "text": "Beautiful 3BR house built in 1990",
179+ "metadata": {
180+ "source_dmo_fields": {
181+ "Year_Built__c": 1990,
182+ "address": "123 Main St",
183+ "city": "San Francisco"
184+ }
185+ }
186+ }
187+ ]
188+ }
189+
190+ Output format:
191+ {
192+ "output": [
193+ {
194+ "text": "Beautiful 3BR house built in 1990",
195+ "seq_no": 1,
196+ "citations": {
197+ "Year_Built__c": "1990",
198+ "address": "123 Main St",
199+ "predicted_sale_price": "$350,000.00",
200+ "predicted_sale_price_raw": "350000.0",
201+ "prediction_status": "success"
202+ }
203+ }
204+ ]
205+ }
206+
207+ Args:
208+ request: Input properties to enrich
209+ runtime: Runtime with prediction API access
210+
211+ Returns:
212+ Properties enriched with predicted sale prices
213+ """
214+ logger .info (
215+ f"Processing { len (request .input )} properties for price prediction"
216+ )
217+
218+ enriched_properties = []
219+ seq_no = 1
220+
221+ for doc_idx , doc in enumerate (request .input ):
222+ text = doc .text
223+ metadata = doc .metadata
224+
225+ logger .info (f"Property { doc_idx + 1 } : { text [:100 ]} ..." )
226+
227+ # Get source_dmo_fields
228+ source_dmo_fields = {}
229+ if metadata and metadata .source_dmo_fields :
230+ source_dmo_fields = dict (metadata .source_dmo_fields )
231+
232+ # Enrich with price prediction - pass source_dmo_fields directly
233+ citations = enrich_property_with_price (source_dmo_fields , runtime )
234+
235+ # Create output
236+ property_output = SearchIndexChunkingV1Output (
237+ chunk_type = ChunkType .TEXT ,
238+ text = text .strip (),
239+ seq_no = seq_no ,
240+ citations = citations ,
241+ )
242+ enriched_properties .append (property_output )
243+
244+ logger .info (
245+ f"Property { seq_no } : Predicted price = "
246+ f"{ citations .get ('predicted_sale_price' , 'N/A' )} "
247+ )
248+ seq_no += 1
249+
250+ logger .info (f"Total properties enriched: { len (enriched_properties )} " )
251+
252+ return SearchIndexChunkingV1Response (output = enriched_properties )
0 commit comments