|
1 |
| -import numpy as np |
2 | 1 | import pandas as pd
|
3 |
| -import talib |
| 2 | +import numpy as np |
| 3 | +import talib.abstract as ta |
| 4 | + |
| 5 | +def QQE(dataframe, rsi_period=14, sF=5, wF=4.236): |
| 6 | + """ |
| 7 | + Calculate QQE (Quantitative Qualitative Estimation) indicator components for a given pandas DataFrame. |
4 | 8 |
|
5 |
| -def QQE(dataframe, rsi_period=14, sf=5, wf=4.236): |
| 9 | + Args: |
| 10 | + dataframe (pd.DataFrame): DataFrame with columns ['high', 'low', 'open', 'close', 'volume']. |
| 11 | + rsi_period (int): RSIPeriod parameter for QQE (default is 14). |
| 12 | + sF (int): sF parameter for QQE (default is 5). |
| 13 | + wF (float): wF parameter for QQE (default is 4.236). |
| 14 | +
|
| 15 | + Returns: |
| 16 | + pd.Series: qqe_value1 |
| 17 | + pd.Series: qqe_value2 |
| 18 | + """ |
6 | 19 | df = dataframe.copy()
|
7 | 20 |
|
8 |
| - # Calculate the Wilders Period |
9 | 21 | wilders_period = rsi_period * 2 - 1
|
10 |
| - start_bar = sf if wilders_period < sf else wilders_period |
11 |
| - |
12 |
| - rsi_handle = talib.RSI(df["close"].values, timeperiod=rsi_period) |
13 |
| - |
14 |
| - # Initialize indicator buffers |
15 |
| - rsi_ma = np.zeros(len(df)) |
16 |
| - tr_level_slow = np.zeros(len(df)) |
17 |
| - rsi_values = np.zeros(len(df)) |
18 |
| - atr_rsi = np.zeros(len(df)) |
19 |
| - ma_atr_rsi = np.zeros(len(df)) |
20 |
| - ma_atr_rsi_wp = np.zeros(len(df)) |
21 | 22 |
|
22 |
| - for i in range(1, len(df)): |
23 |
| - rsi_values[i] = rsi_handle[i] |
24 |
| - rsi_ma[i] = rsi_values[i] |
| 23 | + rsi_values = ta.RSI(df, timeperiod=rsi_period)["RSI"] |
| 24 | + rsi_ema_values = ta.EMA(rsi_values, timeperiod=sF) |
25 | 25 |
|
26 |
| - for i in range(1, len(df)): |
27 |
| - rsi_ma[i] = rsi_ma[i] * (2.0 / (1 + sf)) + (1 - (2.0 / (1 + sf))) * rsi_ma[i - 1] |
28 |
| - atr_rsi[i] = abs(rsi_ma[i - 1] - rsi_ma[i]) |
29 |
| - ma_atr_rsi[i] = atr_rsi[i] |
| 26 | + atr_rsi_values = pd.Series(index=df.index) |
| 27 | + ma_atr_rsi_values = pd.Series(index=df.index) |
30 | 28 |
|
31 |
| - for i in range(1, len(df)): |
32 |
| - ma_atr_rsi[i] = ma_atr_rsi[i] * (2.0 / (1 + wilders_period)) + (1 - (2.0 / (1 + wilders_period))) * ma_atr_rsi[i - 1] |
33 |
| - ma_atr_rsi_wp[i] = ma_atr_rsi[i] |
| 29 | + for i in range(len(df)): |
| 30 | + if i == 0: |
| 31 | + atr_rsi_values.iloc[i] = 0 |
| 32 | + ma_atr_rsi_values.iloc[i] = 0 |
| 33 | + else: |
| 34 | + atr_rsi_values.iloc[i] = abs(rsi_values.iloc[i-1] - rsi_ema_values.iloc[i]) |
| 35 | + ma_atr_rsi_values.iloc[i] = atr_rsi_values.iloc[i:].rolling(window=wilders_period).mean().iloc[0] |
34 | 36 |
|
35 |
| - i = len(df) - 1 |
36 |
| - tr = tr_level_slow[i - 1] |
37 |
| - rsi1 = rsi_ma[i - 1] |
| 37 | + qqe_value1 = rsi_ema_values.copy() |
| 38 | + qqe_value2 = rsi_ema_values.copy() |
38 | 39 |
|
39 |
| - while i >= 1: |
40 |
| - rsi0 = rsi_ma[i] |
41 |
| - ma_atr_rsi_wp[i] = ma_atr_rsi_wp[i] * (2.0 / (1 + wilders_period)) + (1 - (2.0 / (1 + wilders_period))) * ma_atr_rsi_wp[i - 1] |
42 |
| - dar = ma_atr_rsi_wp[i] * wf |
| 40 | + for i in range(len(df)): |
| 41 | + rsi0 = rsi_ema_values.iloc[i] |
| 42 | + rsi1 = rsi_ema_values.iloc[i-1] if i > 0 else rsi0 |
| 43 | + dv = rsi_values.iloc[i-1] if i > 0 else rsi0 |
| 44 | + |
| 45 | + atr_rsi = atr_rsi_values.iloc[i] |
| 46 | + ma_atr_rsi = ma_atr_rsi_values.iloc[i] |
| 47 | + |
| 48 | + dar = ma_atr_rsi * wF |
| 49 | + tr = qqe_value2.iloc[i-1] if i > 0 else qqe_value2.iloc[i] |
43 | 50 |
|
44 |
| - dv = tr |
45 | 51 | if rsi0 < tr:
|
46 | 52 | tr = rsi0 + dar
|
47 |
| - if rsi1 < dv: |
48 |
| - if tr > dv: |
49 |
| - tr = dv |
| 53 | + if rsi1 < dv and tr > dv: |
| 54 | + tr = dv |
50 | 55 | elif rsi0 > tr:
|
51 | 56 | tr = rsi0 - dar
|
52 |
| - if rsi1 > dv: |
53 |
| - if tr < dv: |
54 |
| - tr = dv |
55 |
| - tr_level_slow[i] = tr |
56 |
| - rsi1 = rsi0 |
57 |
| - i -= 1 |
58 |
| - |
59 |
| - rsi_ma_series = pd.Series(rsi_ma, name="rsi_ma") |
60 |
| - tr_level_slow_series = pd.Series(tr_level_slow, name="tr_level_slow") |
61 |
| - rsi_values_series = pd.Series(rsi_values, name="rsi") |
62 |
| - atr_rsi_series = pd.Series(atr_rsi, name="atr_rsi") |
63 |
| - ma_atr_rsi_series = pd.Series(ma_atr_rsi, name="ma_atr_rsi") |
64 |
| - ma_atr_rsi_wp_series = pd.Series(ma_atr_rsi_wp, name="ma_atr_rsi_wp") |
| 57 | + if rsi1 > dv and tr < dv: |
| 58 | + tr = dv |
| 59 | + |
| 60 | + qqe_value1.iloc[i] = rsi0 |
| 61 | + qqe_value2.iloc[i] = tr |
| 62 | + |
| 63 | + qqe_value1.name = 'qqe_value1' |
| 64 | + qqe_value2.name = 'qqe_value2' |
65 | 65 |
|
66 |
| - return rsi_ma_series, tr_level_slow_series, rsi_values_series, atr_rsi_series, ma_atr_rsi_series, ma_atr_rsi_wp_series |
| 66 | + return qqe_value1, qqe_value2 |
67 | 67 |
|
68 | 68 | # Example usage:
|
69 |
| -# Replace df with your pandas DataFrame containing columns ['close'] |
70 |
| -# Call the function with your desired parameters |
71 |
| -#rsi_ma, tr_level_slow, rsi_values, atr_rsi, ma_atr_rsi, ma_atr_rsi_wp = QQE(df) |
| 69 | +# Replace df with your pandas DataFrame containing columns ['open', 'high', 'low', 'close', 'volume'] |
| 70 | +# Call the function as follows: |
| 71 | +# qqe_value1, qqe_value2 = QQE(df, rsi_period=14, sF=5, wF=4.236) |
0 commit comments