Directional changes
Directional changes
What is DC(Directional changes)?
- An approach to summarize price changes other than time series
How to?
- Market prices are sampled based on the DC greater than by at least
- is the threshold predetermined by the observer
- usually expressec as percentage
- Formally, a DC event is detected when we come across a price that satisfies below:
Terminology
- is the threshold predetermined by the observer
- usually expressec as percentage
Extreme point / DCC point
- If the inequality just above holds,
Term | Description | Example in figure |
---|---|---|
Extreme point | the time at | points , , , , , , |
DCC point | the time at | points , , , , , , |
- Note that whilst an extreme point is the end of the one trend, it is also the start point of the next trend
- An extreme point is only recognized in hindsight; precisely at the DCC point
DC event / OS event
- DC means "Directional Change" and OS does "Overshoot"
Term | Description | Example in figure |
---|---|---|
DC event | -starts with an extreme point -ends with a DCC point | intervals , , , |
DCC point | -starts at the DCC point -ends at the next extreme point | ntervals , , , |
- Note that for a given time series and a predetermined threshold, the DC summary is unique
Implementation in Python
- Original version
THETA_bakhach=0.1
data_points = market_index['close_scaled'].tolist()
pext = data_points[0]
vals = []
breakpoints = []
yHat = []
for idx, pc in enumerate(data_points):
if idx == 0:
continue
val = (pc - pext)/pext
dif = pc - pext
if abs(val) > THETA_bakhach:
pext = pc
vals.append(val)
breakpoints.append(idx)
yHat.append(data_points[idx])
# endpoint
breakpoints.append(len(data_points) - 1)
vals.append((data_points[-1] - yHat[-1])/yHat[-1])
yHat.append(data_points[-1])
THETA_bakhach=0.1
data_points = market_index['close_scaled'].tolist()
pext = data_points[0]
vals = []
breakpoints = []
yHat = []
for idx, pc in enumerate(data_points):
if idx == 0:
continue
val = (pc - pext)/pext
dif = pc - pext
if abs(val) > THETA_bakhach:
pext = pc
vals.append(val)
breakpoints.append(idx)
yHat.append(data_points[idx])
# endpoint
breakpoints.append(len(data_points) - 1)
vals.append((data_points[-1] - yHat[-1])/yHat[-1])
yHat.append(data_points[-1])
The right-end point is so ugly!! So, add some correction using amount of change .
- Merging adjacent trends
THETA_bakhach=0.1
DELTA_bakhach=(max(market_index['close_scaled'])-min(market_index['close_scaled'])) * 0.08 #0.08
data_points = market_index['close_scaled'].tolist()
pext = data_points[0]
vals = []
breakpoints = []
yHat = []
for idx, pc in enumerate(data_points):
if idx == 0:
continue
# val = abs((pc - pext)/pext)
val = (pc - pext)/pext
dif = pc - pext
if abs(val) > THETA_bakhach:
pext = pc
vals.append(val)
breakpoints.append(idx)
yHat.append(data_points[idx])
# endpoint
breakpoints.append(len(data_points) - 1)
vals.append((data_points[-1] - yHat[-1])/yHat[-1])
yHat.append(data_points[-1])
rm_idx = []
for vi in range(len(vals))[1:]:
fs = np.sign(vals[vi - 1])
cs = np.sign(vals[vi])
if fs == cs:
rm_idx.append(vi-1)
vals_new = [j for i, j in enumerate(vals) if i not in rm_idx]
breakpoints_new = [j for i, j in enumerate(breakpoints) if i not in rm_idx]
yHat_new = [j for i, j in enumerate(yHat) if i not in rm_idx]
The problem in right-end part is solved now! However, it seems that a lot of unnecessary DC has been detected. For this, add a correction merging adjacent trends.
- Add for observing change amount
THETA_bakhach=0.1
DELTA_bakhach=(max(market_index['close_scaled'])-min(market_index['close_scaled'])) * 0.08
data_points = market_index['close_scaled'].tolist()
pext = data_points[0]
vals = []
breakpoints = []
yHat = []
for idx, pc in enumerate(data_points):
if idx == 0:
continue
# val = abs((pc - pext)/pext)
val = (pc - pext)/pext
dif = pc - pext
# if abs(dif) > DELTA_bakhach:
if abs(val) > THETA_bakhach or abs(dif) > DELTA_bakhach:
pext = pc
vals.append(val)
breakpoints.append(idx)
yHat.append(data_points[idx])
# endpoint
breakpoints.append(len(data_points) - 1)
vals.append((data_points[-1] - yHat[-1])/yHat[-1])
yHat.append(data_points[-1])
rm_idx = []
for vi in range(len(vals))[1:]:
fs = np.sign(vals[vi - 1])
cs = np.sign(vals[vi])
if fs == cs:
rm_idx.append(vi-1)
vals_new = [j for i, j in enumerate(vals) if i not in rm_idx]
breakpoints_new = [j for i, j in enumerate(breakpoints) if i not in rm_idx]
yHat_new = [j for i, j in enumerate(yHat) if i not in rm_idx]
Finally, the detected trends look great!
[References]
Author And Source
이 문제에 관하여(Directional changes), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://velog.io/@hyangki0119/Directional-changes
저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
Author And Source
이 문제에 관하여(Directional changes), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@hyangki0119/Directional-changes저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)