Directional changes

Directional changes

What is DC(Directional changes)?

  • An approach to summarize price changes other than time series

How to?

  • Market prices are sampled based on the DC greater than PEXTP_{EXT}
  • Formally, a DC event is detected when we come across a price PcP_c

Terminology

Extreme point / DCC point

  • If the inequality just above holds,
TermDescriptionExample in figure
Extreme pointthe time at PEXTP_{EXT}points AA, BB, CC, DD, EE, FF, GG
DCC pointthe time at PcP_{c}points A0.1A^{0.1}, B0.1B^{0.1}, C0.1C^{0.1}, D0.1D^{0.1}, E0.1E^{0.1}, F0.1F^{0.1}, G0.1G^{0.1}
  • Note that whilst an extreme point is the end of the one trend, it is also the start point of the next trend
  • An extreme point is only recognized in hindsight; precisely at the DCC point

DC event / OS event

  • DC means "Directional Change" and OS does "Overshoot"
TermDescriptionExample in figure
DC event-starts with an extreme point
-ends with a DCC point
intervals [A,A0.1][A,A^{0.1}]
DCC point-starts at the DCC point
-ends at the next extreme point
ntervals [A0.1,B][A^{0.1},B]
  • Note that for a given time series and a predetermined threshold, the DC summary is unique

Implementation in Python

  • Original version
THETA_bakhach=0.1
data_points = market_index['close_scaled'].tolist()
pext = data_points[0]
vals = []
breakpoints = []
yHat = []

for idx, pc in enumerate(data_points):
    if idx == 0:
        continue

    val = (pc - pext)/pext
    dif = pc - pext

	if abs(val) > THETA_bakhach:
        pext = pc
        vals.append(val)
        breakpoints.append(idx)
        yHat.append(data_points[idx])

# endpoint
breakpoints.append(len(data_points) - 1)
vals.append((data_points[-1] - yHat[-1])/yHat[-1])
yHat.append(data_points[-1])


The right-end point is so ugly!! So, add some correction using amount of change .


  • Merging adjacent trends
THETA_bakhach=0.1
DELTA_bakhach=(max(market_index['close_scaled'])-min(market_index['close_scaled'])) * 0.08 #0.08
data_points = market_index['close_scaled'].tolist()
pext = data_points[0]
vals = []
breakpoints = []
yHat = []

 
for idx, pc in enumerate(data_points):
    if idx == 0:
        continue

    # val = abs((pc - pext)/pext)
    val = (pc - pext)/pext
    dif = pc - pext

    if abs(val) > THETA_bakhach:
        pext = pc
        vals.append(val)
        breakpoints.append(idx)
        yHat.append(data_points[idx])

# endpoint
breakpoints.append(len(data_points) - 1)
vals.append((data_points[-1] - yHat[-1])/yHat[-1])
yHat.append(data_points[-1])

rm_idx = []
for vi in range(len(vals))[1:]:
    fs = np.sign(vals[vi - 1])
    cs = np.sign(vals[vi])
    if fs == cs:
        rm_idx.append(vi-1)
        
vals_new = [j for i, j in enumerate(vals) if i not in rm_idx]
breakpoints_new = [j for i, j in enumerate(breakpoints) if i not in rm_idx]
yHat_new = [j for i, j in enumerate(yHat) if i not in rm_idx]

The problem in right-end part is solved now! However, it seems that a lot of unnecessary DC has been detected. For this, add a correction merging adjacent trends.


  • Add δ\delta for observing change amount
THETA_bakhach=0.1
DELTA_bakhach=(max(market_index['close_scaled'])-min(market_index['close_scaled'])) * 0.08
data_points = market_index['close_scaled'].tolist()
pext = data_points[0]
vals = []
breakpoints = []
yHat = []

 
for idx, pc in enumerate(data_points):
    if idx == 0:
        continue

    # val = abs((pc - pext)/pext)
    val = (pc - pext)/pext
    dif = pc - pext

    # if abs(dif) > DELTA_bakhach:
    if abs(val) > THETA_bakhach or abs(dif) > DELTA_bakhach:
        pext = pc
        vals.append(val)
        breakpoints.append(idx)
        yHat.append(data_points[idx])

# endpoint
breakpoints.append(len(data_points) - 1)
vals.append((data_points[-1] - yHat[-1])/yHat[-1])
yHat.append(data_points[-1])

rm_idx = []
for vi in range(len(vals))[1:]:
    fs = np.sign(vals[vi - 1])
    cs = np.sign(vals[vi])
    if fs == cs:
        rm_idx.append(vi-1)
        
vals_new = [j for i, j in enumerate(vals) if i not in rm_idx]
breakpoints_new = [j for i, j in enumerate(breakpoints) if i not in rm_idx]
yHat_new = [j for i, j in enumerate(yHat) if i not in rm_idx]

Finally, the detected trends look great!

[References]

좋은 웹페이지 즐겨찾기