차트와 MACD지표를 실시간으로 계산하는 간단한 예제 코드입니다
MACD 지표는 단기 지수 이동평균과 장기 지수 이동평균의 차로 구해는 비교적 단순한 기술적 지표입니다.
참고로 MACD 지표를 구하는 계산 방법은 아래와 같습니다.
- MACD = 12일 지수이동평균- 26일 지수이동평균
- Signal = MACD 의 9일 지수이동평균
- Oscillator : MACD - Signal
여기서 MACD를 직접 계산하지는 않고, CREON PLUS API를 이용해 MACD 계산 값을 넘겨 받습니다
아래의 CMACD 클래스는 두가지 플러스 Object 의 도움을 받아 지표를 계산합니다.
- CpIndexes.CpSeries - 차트의 기본 데이터를 저장하는 PLUS 클래스(시가, 고가, 저가, 종가, 거래량)
- CpIndexes.CpIndex - 실질적인 지표 계산 PLUS 클래스, 150여개의 지표 계산이 가능
class CMACD: def __init__(self): self.objSeries = win32com.client.Dispatch("CpIndexes.CpSeries") self.objIndex = win32com.client.Dispatch("CpIndexes.CpIndex") # 차트 데이터 세팅 하기 def setChartData(self, chartData): nLen = len(chartData['T']) for i in range(nLen): self.objSeries.Add(chartData['C'][i], chartData['O'][i], chartData['H'][i], chartData['L'][i], chartData['V'][i]) return # 기존 차트 데이터에 새로 들어온 신규 데이터 추가 def addLastData(self, chartData): self.objSeries.Add(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1], chartData['V'][-1]) # MACD 계산 def makeMACD(self): result = {} # 지표 계산 object self.objIndex.series = self.objSeries self.objIndex.put_IndexKind("MACD") # 계산할 지표: MACD self.objIndex.put_IndexDefault("MACD") # MACD 지표 기본 변수 자동 세팅 print("MACD 변수", self.objIndex.get_Term1(), self.objIndex.get_Term2(), self.objIndex.get_Signal()) # 지표 데이터 계산 하기 self.objIndex.Calculate() cntofIndex = self.objIndex.ItemCount print("지표 개수: ", cntofIndex) indexName = ["MACD", "SIGNAL", "OSC"] result['MACD'] = [] result['SIGNAL'] = [] result['OSC'] = [] for index in range(cntofIndex): cnt = self.objIndex.GetCount(index) for j in range(cnt): value = self.objIndex.GetResult(index, j) result[indexName[index]].append(value) # for j in range(cnt) : # value = self.objIndex.GetResult(index,j) # print(indexName[index], value) # 지표의 최근 값 표시 print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1])) return (True, result) # MACD 업데이트(차트 데이터 개수에 변화가 없을 경우에만 사용) def updateMACD(self, chartData): result = {} # 지표 데이터 update self.objSeries.update(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1], chartData['V'][-1]) self.objIndex.update() cntofIndex = self.objIndex.ItemCount print("지표 개수: ", cntofIndex) indexName = ["MACD", "SIGNAL", "OSC"] result['MACD'] = [] result['SIGNAL'] = [] result['OSC'] = [] for index in range(cntofIndex): cnt = self.objIndex.GetCount(index) # 마지막 데이터만 업데이트 한다 value = self.objIndex.GetResult(index, cnt - 1) result[indexName[index]].append(value) print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1])) return (True, result)
별도의 설명보다는 코드에 가능한 주석을 많이 추가했습니다 ^^;
이번 예제의 핵심 클래스는 'CMinchartData' 입니다
분차트를 요청 하고, 실시간으로 업데이트하면서 주어진 주기(예제에서는 5분) 분 차트를 만듭니다
이 클래스의 주요 함수들은
- rqChartMinData - 5분 차트를 요청/수신 받고 기본 데이터를 저장합니다. 수신 받은 데이터로 최초 MACD 를 계산합니다.
- makeMinchart - 새로운 시세가 올 때 마다 호출됩니다. 최근 봉에 업데이트 할 지 아니면 새로운 봉을 만들지 결정하고 MACD 지표도 다시 계산합니다. 예제는 5분차트이지만 10분, 15분 등 주기 변수만 변경하면 다양한 주기의 분차트를 생성하도록 되어 있습니다.
- checkMACD - MACD Oscillator 지표가 0 보다 크거나 작을 때를 기준으로 매매 신호를 발생 합니다. 이번 예제에는 봉이 완성된 후 판단하도록 예제를 만들었습니다 (시세가 들어 올 때 마다 판단하면 시그널이 생기고 없어지고를 반복 할 수 있어서)
import sys from PyQt5.QtWidgets import * import win32com.client import ctypes import time ################################################ # PLUS 공통 OBJECT g_objCodeMgr = win32com.client.Dispatch('CpUtil.CpCodeMgr') g_objCpStatus = win32com.client.Dispatch('CpUtil.CpCybos') g_objCpTrade = win32com.client.Dispatch('CpTrade.CpTdUtil') ################################################ # PLUS 실행 기본 체크 함수 def InitPlusCheck(): # 프로세스가 관리자 권한으로 실행 여부 if ctypes.windll.shell32.IsUserAnAdmin(): print('정상: 관리자권한으로 실행된 프로세스입니다.') else: print('오류: 일반권한으로 실행됨. 관리자 권한으로 실행해 주세요') return False # 연결 여부 체크 if (g_objCpStatus.IsConnect == 0): print("PLUS가 정상적으로 연결되지 않음. ") return False # 주문 관련 초기화 if (g_objCpTrade.TradeInit(0) != 0): print("주문 초기화 실패") return False return True ################################################ # CpEvent: 실시간 이벤트 수신 클래스 class CpEvent: def set_params(self, client, name, caller): self.client = client # CP 실시간 통신 object self.name = name # 서비스가 다른 이벤트를 구분하기 위한 이름 self.caller = caller # callback 을 위해 보관 def OnReceived(self): # 실시간 처리 - 현재가 체결 데이터 if self.name == 'stockcur': code = self.client.GetHeaderValue(0) # 초 name = self.client.GetHeaderValue(1) # 초 timess = self.client.GetHeaderValue(18) # 초 exFlag = self.client.GetHeaderValue(19) # 예상체결 플래그 cprice = self.client.GetHeaderValue(13) # 현재가 diff = self.client.GetHeaderValue(2) # 대비 cVol = self.client.GetHeaderValue(17) # 순간체결수량 vol = self.client.GetHeaderValue(9) # 거래량 if exFlag != ord('2'): return item = {} item['code'] = code item['time'] = timess item['diff'] = diff item['cur'] = cprice item['vol'] = cVol # 현재가 업데이트 self.caller.updateCurData(item) return ################################################ # plus 실시간 수신 base 클래스 class CpPublish: def __init__(self, name, serviceID): self.name = name self.obj = win32com.client.Dispatch(serviceID) self.bIsSB = False def Subscribe(self, var, caller): if self.bIsSB: self.Unsubscribe() if (len(var) > 0): self.obj.SetInputValue(0, var) handler = win32com.client.WithEvents(self.obj, CpEvent) handler.set_params(self.obj, self.name, caller) self.obj.Subscribe() self.bIsSB = True def Unsubscribe(self): if self.bIsSB: self.obj.Unsubscribe() self.bIsSB = False ################################################ # CpPBStockCur: 실시간 현재가 요청 클래스 class CpPBStockCur(CpPublish): def __init__(self): super().__init__('stockcur', 'DsCbo1.StockCur') # MACD 지표 계산 class CMACD: def __init__(self): self.objSeries = win32com.client.Dispatch("CpIndexes.CpSeries") self.objIndex = win32com.client.Dispatch("CpIndexes.CpIndex") # 차트 데이터 세팅 하기 def setChartData(self, chartData): nLen = len(chartData['T']) for i in range(nLen): self.objSeries.Add(chartData['C'][i], chartData['O'][i], chartData['H'][i], chartData['L'][i], chartData['V'][i]) return # 기존 차트 데이터에 새로 들어온 신규 데이터 추가 def addLastData(self, chartData): self.objSeries.Add(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1], chartData['V'][-1]) # MACD 계산 def makeMACD(self): result = {} # 지표 계산 object self.objIndex.series = self.objSeries self.objIndex.put_IndexKind("MACD") # 계산할 지표: MACD self.objIndex.put_IndexDefault("MACD") # MACD 지표 기본 변수 자동 세팅 print("MACD 변수", self.objIndex.get_Term1(), self.objIndex.get_Term2(), self.objIndex.get_Signal()) # 지표 데이터 계산 하기 self.objIndex.Calculate() cntofIndex = self.objIndex.ItemCount print("지표 개수: ", cntofIndex) indexName = ["MACD", "SIGNAL", "OSC"] result['MACD'] = [] result['SIGNAL'] = [] result['OSC'] = [] for index in range(cntofIndex): cnt = self.objIndex.GetCount(index) for j in range(cnt): value = self.objIndex.GetResult(index, j) result[indexName[index]].append(value) # for j in range(cnt) : # value = self.objIndex.GetResult(index,j) # print(indexName[index], value) # 지표의 최근 값 표시 print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1])) return (True, result) # MACD 업데이트(차트 데이터 개수에 변화가 없을 경우에만 사용) def updateMACD(self, chartData): result = {} # 지표 데이터 update self.objSeries.update(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1], chartData['V'][-1]) self.objIndex.update() cntofIndex = self.objIndex.ItemCount print("지표 개수: ", cntofIndex) indexName = ["MACD", "SIGNAL", "OSC"] result['MACD'] = [] result['SIGNAL'] = [] result['OSC'] = [] for index in range(cntofIndex): cnt = self.objIndex.GetCount(index) # 마지막 데이터만 업데이트 한다 value = self.objIndex.GetResult(index, cnt - 1) result[indexName[index]].append(value) print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1])) return (True, result) # 분차트 관리 클래스 # 주어진 주기로 분차트 조회 , 실시간 분차트 데이터 생성, MACD 계산 호출 class CMinchartData: def __init__(self, interval): # interval : 분차트 주기 self.interval = interval self.objCur = {} self.data = {} self.code = '' self.objMACD = CMACD() self.LASTTIME = 1530 # 오늘 날짜 now = time.localtime() self.todayDate = now.tm_year * 10000 + now.tm_mon * 100 + now.tm_mday print(self.todayDate) def MonCode(self, code): self.data = {} self.code = code self.data['O'] = [] self.data['H'] = [] self.data['L'] = [] self.data['C'] = [] self.data['V'] = [] self.data['D'] = [] self.data['T'] = [] self.data['MACD'] = [] self.data['SIGNAL'] = [] self.data['OSC'] = [] # 차트 기본 통신 self.rqChartMinData(code, self.interval) # MACD 클래스에 수신 받은 차트 데이터 세팅 self.objMACD.setChartData(self.data) # MACD 계산 하기 ret, result = self.objMACD.makeMACD() self.data['MACD'] = result['MACD'] self.data['SIGNAL'] = result['SIGNAL'] self.data['OSC'] = result['OSC'] # 실시간 시세 요청 if (code not in self.objCur): self.objCur[code] = CpPBStockCur() self.objCur[code].Subscribe(code, self) def stop(self): for k, v in self.objCur.items(): v.Unsubscribe() self.objCur = {} # 분차트 - 코드, 주기, 개수 def rqChartMinData(self, code, interval): objRq = win32com.client.Dispatch("CpSysDib.StockChart") objRq.SetInputValue(0, code) # 종목 코드 objRq.SetInputValue(1, ord('2')) # 개수로 조회 objRq.SetInputValue(4, 500) # 통신 개수 - 500 개로 고정 objRq.SetInputValue(5, [0, 1, 2, 3, 4, 5, 8]) # 날짜,시간, 시가,고가,저가,종가,거래량 objRq.SetInputValue(6, ord('m')) # '차트 주가 - 분 데이터 objRq.SetInputValue(7, interval) # 차트 주기 objRq.SetInputValue(9, ord('1')) # 9 - 수정주가(char) totlen = 0 objRq.BlockRequest() rqStatus = objRq.GetDibStatus() rqRet = objRq.GetDibMsg1() print("통신상태", rqStatus, rqRet) if rqStatus != 0: exit() len = objRq.GetHeaderValue(3) print(totlen) totlen += len print("날짜", "시가", "고가", "저가", "종가", "거래량") print("==============================================-") for i in range(len): day = objRq.GetDataValue(0, i) time = objRq.GetDataValue(1, i) open = objRq.GetDataValue(2, i) high = objRq.GetDataValue(3, i) low = objRq.GetDataValue(4, i) close = objRq.GetDataValue(5, i) vol = objRq.GetDataValue(6, i) self.data['D'].append(day) self.data['T'].append(time) self.data['O'].append(open) self.data['H'].append(high) self.data['L'].append(low) self.data['C'].append(close) self.data['V'].append(vol) # 수신된 역순으로 넣는다 -> 최근 날짜가 맨 뒤로 가도록 self.data['D'].reverse() self.data['T'].reverse() self.data['O'].reverse() self.data['H'].reverse() self.data['L'].reverse() self.data['C'].reverse() self.data['V'].reverse() # 가격 실시간 변경 시 분차트 데이터 재 계산 def updateCurData(self, item): time = item['time'] self.cur = cur = item['cur'] vol = item['vol'] self.makeMinchart(time, cur, vol) def getHMTFromTime(self, time): hh, mm = divmod(time, 10000) mm, tt = divmod(mm, 100) return (hh, mm, tt) def getChartTime(self, time): lCurTime = time + self.interval hh, mm = divmod(lCurTime, 100) if (mm >= 60): hh += 1 mm = 0 lCurTime = hh * 100 + mm if (lCurTime > self.LASTTIME): lCurTime = self.LASTTIME return lCurTime # 실시간 데이터를 통해 분차트 업데이트 def makeMinchart(self, time, cur, vol): # time 분해 --> 시, 분, 초 hh, mm, tt = self.getHMTFromTime(time) hhmm = hh * 100 + mm bFind = False nLen = len(self.data['T']) # 분차트 주기 기준으로 나눠서 시간 차트 시간 계산 # 1분 봉의 경우 14:10분 봉: 14:09:00~14:09:59 # 5분 봉의 경우 14:10분 봉 : 14:05:00~ 14:09:59 a, b = divmod(hhmm, self.interval) intervaltime = a * self.interval lCurTime = self.getChartTime(intervaltime) if (nLen > 0): lLastTime = self.data['T'][-1] if (lLastTime == lCurTime): bFind = True self.data['C'][-1] = cur if (self.data['H'][-1] < cur): self.data['H'][-1] = cur if (self.data['L'][-1] > cur): self.data['L'][-1] = cur self.data['V'][-1] += vol print('들어온 시간 %d --> 마지막 분차트 시간 %d 에 업데이트' % (time, lLastTime)) ret, result = self.objMACD.updateMACD(self.data) self.data['MACD'][-1] = result['MACD'][0] self.data['SIGNAL'][-1] = result['SIGNAL'][0] self.data['OSC'][-1] = result['OSC'][0] # 신규 봉이 추가 if bFind == False: print('들어온 시간 %d --> 새로운 분차트 시간 %d 에 업데이트' % (time, lCurTime)) self.data['D'].append(self.todayDate) self.data['T'].append(lCurTime) self.data['O'].append(cur) self.data['H'].append(cur) self.data['L'].append(cur) self.data['C'].append(cur) self.data['V'].append(vol) # 데이터 추가 - MACD 계산 모듈에 차트 데이터 추가 self.objMACD.addLastData(self.data) # MACD 계산 ret, result = self.objMACD.makeMACD() self.data['MACD'] = result['MACD'] self.data['SIGNAL'] = result['SIGNAL'] self.data['OSC'] = result['OSC'] # MACD 의 신호가 교차됐는지 체크 self.checkMACD() return def checkMACD(self): if (len(self.data['OSC']) < 5): return # 현재 시점에서 이전 봉(-2) 가 매수신호/매도 신호 발생했는 지 체크 # -1 : 현재 시점 -2: 바로 직전 봉 -3 그 전 봉 print('osc', self.data['OSC'][-3], self.data['OSC'][-2], self.data['OSC'][-1]) if self.data['OSC'][-3] < 0: if self.data['OSC'][-2] > 0: print('MACD 매수, 시간 %d, 가격 %d' % (self.data['T'][-1], self.data['C'][-1])) elif self.data['OSC'][-3] > 0: if self.data['OSC'][-2] < 0: print('MACD 매도, 시간 %d, 가격 %d' % (self.data['T'][-1], self.data['C'][-1])) def printdata(self): nLen = len(self.data['T']) for i in range(nLen): print(self.data['D'][i], self.data['T'][i], self.data['O'][i], self.data['H'][i], self.data['L'][i], self.data['C'][i], self.data['V'][i], self.data['MACD'][i], self.data['SIGNAL'][i], self.data['OSC'][i]) # print(code, self.minDatas[code]) ################################################ # 테스트를 위한 메인 화면 class MyWindow(QMainWindow): def __init__(self): super().__init__() # plus 상태 체크 if InitPlusCheck() == False: exit() self.minData = CMinchartData(5) self.minData.MonCode('A069500') self.setWindowTitle("주식 분 차트 생성") self.setGeometry(300, 300, 300, 180) nH = 20 btnPrint = QPushButton('print', self) btnPrint.move(20, nH) btnPrint.clicked.connect(self.btnPrint_clicked) nH += 50 btnExit = QPushButton('종료', self) btnExit.move(20, nH) btnExit.clicked.connect(self.btnExit_clicked) nH += 50 def btnPrint_clicked(self): self.minData.printdata() return def btnExit_clicked(self): self.minData.stop() exit() return if __name__ == "__main__": app = QApplication(sys.argv) myWindow = MyWindow() myWindow.show() app.exec_()
프로그램의 전체 코드입니다
self.minData = CMinchartData(5)
self.minData.MonCode('A069500')
예제는 069500 종목(KODEX 200), 5분 차트를 기준으로 하도록 했습니다
'개발 > 파이썬' 카테고리의 다른 글
파이썬의 for 루프 (1) | 2019.06.16 |
---|---|
[CREON PLUS API 활용] BlockRequest 와 Request (362) | 2018.12.11 |
파이썬 윈도우 메시지 펌프 처리 예제 (0) | 2018.12.08 |
파이썬 리턴값 편리 기능 (0) | 2018.11.18 |
[파이썬] 파일 다루기(유니코드, 파일 읽기 등) (0) | 2018.10.21 |
[파이썬] 문자열 다루기 (0) | 2018.10.17 |
댓글