[CREON PLUS API 활용] MACD 매매 시그널 실시간 구하기(5분 차트 이용)

2018. 11. 23. 10:17개발/파이썬

차트와 MACD지표를 실시간으로 계산하는 간단한 예제 코드입니다 

MACD 지표는 단기 지수 이동평균과 장기 지수 이동평균의 차로 구해는 비교적 단순한 기술적 지표입니다.



참고로 MACD 지표를 구하는 계산 방법은 아래와 같습니다. 

  • MACD = 12일 지수이동평균- 26일 지수이동평균
  • Signal = MACD 의 9일 지수이동평균
  • Oscillator : MACD - Signal 


여기서 MACD를 직접 계산하지는 않고, CREON PLUS API를 이용해 MACD 계산 값을 넘겨 받습니다 

아래의 CMACD 클래스는 두가지 플러스 Object 의 도움을 받아 지표를 계산합니다.

  • CpIndexes.CpSeries - 차트의 기본 데이터를 저장하는 PLUS 클래스(시가, 고가, 저가, 종가, 거래량)
  • CpIndexes.CpIndex - 실질적인 지표 계산 PLUS 클래스, 150여개의 지표 계산이 가능

class CMACD:
    def __init__(self):
        self.objSeries = win32com.client.Dispatch("CpIndexes.CpSeries")
        self.objIndex = win32com.client.Dispatch("CpIndexes.CpIndex")
 
    # 차트 데이터 세팅 하기
    def setChartData(self, chartData):
        nLen = len(chartData['T'])
        for i in range(nLen):
            self.objSeries.Add(chartData['C'][i], chartData['O'][i], chartData['H'][i], chartData['L'][i],
                               chartData['V'][i])
 
        return
 
    # 기존 차트 데이터에 새로 들어온 신규 데이터 추가
    def addLastData(self, chartData):
        self.objSeries.Add(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1],
                           chartData['V'][-1])
 
    # MACD 계산
    def makeMACD(self):
        result = {}
        # 지표 계산 object
        self.objIndex.series = self.objSeries
        self.objIndex.put_IndexKind("MACD")  # 계산할 지표: MACD
        self.objIndex.put_IndexDefault("MACD")  # MACD 지표 기본 변수 자동 세팅
 
        print("MACD 변수", self.objIndex.get_Term1(), self.objIndex.get_Term2(), self.objIndex.get_Signal())
 
        # 지표 데이터 계산 하기
        self.objIndex.Calculate()
 
        cntofIndex = self.objIndex.ItemCount
        print("지표 개수:  ", cntofIndex)
        indexName = ["MACD", "SIGNAL", "OSC"]
 
        result['MACD'] = []
        result['SIGNAL'] = []
        result['OSC'] = []
        for index in range(cntofIndex):
            cnt = self.objIndex.GetCount(index)
            for j in range(cnt):
                value = self.objIndex.GetResult(index, j)
                result[indexName[index]].append(value)
            # for j in range(cnt) :
            #    value = self.objIndex.GetResult(index,j)
            # print(indexName[index], value)  # 지표의 최근 값 표시
 
        print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1]))
        return (True, result)
 
    # MACD 업데이트(차트 데이터 개수에 변화가 없을 경우에만 사용)
    def updateMACD(self, chartData):
        result = {}
        # 지표 데이터 update
        self.objSeries.update(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1],
                              chartData['V'][-1])
        self.objIndex.update()
        cntofIndex = self.objIndex.ItemCount
        print("지표 개수:  ", cntofIndex)
 
        indexName = ["MACD", "SIGNAL", "OSC"]
 
        result['MACD'] = []
        result['SIGNAL'] = []
        result['OSC'] = []
 
        for index in range(cntofIndex):
            cnt = self.objIndex.GetCount(index)
            # 마지막 데이터만 업데이트 한다
            value = self.objIndex.GetResult(index, cnt - 1)
            result[indexName[index]].append(value)
 
        print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1]))
        return (True, result)





별도의 설명보다는 코드에 가능한 주석을 많이 추가했습니다 ^^; 


이번 예제의 핵심 클래스는 'CMinchartData' 입니다 

분차트를 요청 하고, 실시간으로 업데이트하면서 주어진 주기(예제에서는 5분) 분 차트를 만듭니다 

이 클래스의 주요 함수들은 

  • rqChartMinData - 5분 차트를 요청/수신 받고 기본 데이터를 저장합니다. 수신 받은 데이터로 최초 MACD 를 계산합니다.
  • makeMinchart - 새로운 시세가 올 때 마다 호출됩니다.  최근 봉에 업데이트 할 지 아니면 새로운 봉을 만들지 결정하고 MACD 지표도 다시 계산합니다. 예제는 5분차트이지만 10분, 15분 등 주기 변수만 변경하면 다양한 주기의 분차트를 생성하도록 되어 있습니다.
  • checkMACD - MACD Oscillator 지표가 0 보다 크거나 작을 때를 기준으로 매매 신호를 발생 합니다. 이번 예제에는 봉이 완성된 후 판단하도록 예제를 만들었습니다 (시세가 들어 올 때 마다 판단하면 시그널이 생기고 없어지고를 반복 할 수 있어서)
import sys
from PyQt5.QtWidgets import *
import win32com.client
import ctypes
import time
 
################################################
# PLUS 공통 OBJECT
g_objCodeMgr = win32com.client.Dispatch('CpUtil.CpCodeMgr')
g_objCpStatus = win32com.client.Dispatch('CpUtil.CpCybos')
g_objCpTrade = win32com.client.Dispatch('CpTrade.CpTdUtil')
 
 
################################################
# PLUS 실행 기본 체크 함수
def InitPlusCheck():
    # 프로세스가 관리자 권한으로 실행 여부
    if ctypes.windll.shell32.IsUserAnAdmin():
        print('정상: 관리자권한으로 실행된 프로세스입니다.')
    else:
        print('오류: 일반권한으로 실행됨. 관리자 권한으로 실행해 주세요')
        return False
 
    # 연결 여부 체크
    if (g_objCpStatus.IsConnect == 0):
        print("PLUS가 정상적으로 연결되지 않음. ")
        return False
 
    # 주문 관련 초기화
    if (g_objCpTrade.TradeInit(0) != 0):
        print("주문 초기화 실패")
        return False
 
    return True
 
 
################################################
# CpEvent: 실시간 이벤트 수신 클래스
class CpEvent:
    def set_params(self, client, name, caller):
        self.client = client  # CP 실시간 통신 object
        self.name = name  # 서비스가 다른 이벤트를 구분하기 위한 이름
        self.caller = caller  # callback 을 위해 보관
 
    def OnReceived(self):
        # 실시간 처리 - 현재가 체결 데이터
        if self.name == 'stockcur':
            code = self.client.GetHeaderValue(0)  # 초
            name = self.client.GetHeaderValue(1)  # 초
            timess = self.client.GetHeaderValue(18)  # 초
            exFlag = self.client.GetHeaderValue(19)  # 예상체결 플래그
            cprice = self.client.GetHeaderValue(13)  # 현재가
            diff = self.client.GetHeaderValue(2)  # 대비
            cVol = self.client.GetHeaderValue(17)  # 순간체결수량
            vol = self.client.GetHeaderValue(9)  # 거래량
 
            if exFlag != ord('2'):
                return
 
            item = {}
            item['code'] = code
            item['time'] = timess
            item['diff'] = diff
            item['cur'] = cprice
            item['vol'] = cVol
 
            # 현재가 업데이트
            self.caller.updateCurData(item)
 
            return
 
 
################################################
# plus 실시간 수신 base 클래스
class CpPublish:
    def __init__(self, name, serviceID):
        self.name = name
        self.obj = win32com.client.Dispatch(serviceID)
        self.bIsSB = False
 
    def Subscribe(self, var, caller):
        if self.bIsSB:
            self.Unsubscribe()
 
        if (len(var) > 0):
            self.obj.SetInputValue(0, var)
 
        handler = win32com.client.WithEvents(self.obj, CpEvent)
        handler.set_params(self.obj, self.name, caller)
        self.obj.Subscribe()
        self.bIsSB = True
 
    def Unsubscribe(self):
        if self.bIsSB:
            self.obj.Unsubscribe()
        self.bIsSB = False
 
 
################################################
# CpPBStockCur: 실시간 현재가 요청 클래스
class CpPBStockCur(CpPublish):
    def __init__(self):
        super().__init__('stockcur', 'DsCbo1.StockCur')
 
 
# MACD 지표 계산
class CMACD:
    def __init__(self):
        self.objSeries = win32com.client.Dispatch("CpIndexes.CpSeries")
        self.objIndex = win32com.client.Dispatch("CpIndexes.CpIndex")
 
    # 차트 데이터 세팅 하기
    def setChartData(self, chartData):
        nLen = len(chartData['T'])
        for i in range(nLen):
            self.objSeries.Add(chartData['C'][i], chartData['O'][i], chartData['H'][i], chartData['L'][i],
                               chartData['V'][i])
 
        return
 
    # 기존 차트 데이터에 새로 들어온 신규 데이터 추가
    def addLastData(self, chartData):
        self.objSeries.Add(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1],
                           chartData['V'][-1])
 
    # MACD 계산
    def makeMACD(self):
        result = {}
        # 지표 계산 object
        self.objIndex.series = self.objSeries
        self.objIndex.put_IndexKind("MACD")  # 계산할 지표: MACD
        self.objIndex.put_IndexDefault("MACD")  # MACD 지표 기본 변수 자동 세팅
 
        print("MACD 변수", self.objIndex.get_Term1(), self.objIndex.get_Term2(), self.objIndex.get_Signal())
 
        # 지표 데이터 계산 하기
        self.objIndex.Calculate()
 
        cntofIndex = self.objIndex.ItemCount
        print("지표 개수:  ", cntofIndex)
        indexName = ["MACD", "SIGNAL", "OSC"]
 
        result['MACD'] = []
        result['SIGNAL'] = []
        result['OSC'] = []
        for index in range(cntofIndex):
            cnt = self.objIndex.GetCount(index)
            for j in range(cnt):
                value = self.objIndex.GetResult(index, j)
                result[indexName[index]].append(value)
            # for j in range(cnt) :
            #    value = self.objIndex.GetResult(index,j)
            # print(indexName[index], value)  # 지표의 최근 값 표시
 
        print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1]))
        return (True, result)
 
    # MACD 업데이트(차트 데이터 개수에 변화가 없을 경우에만 사용)
    def updateMACD(self, chartData):
        result = {}
        # 지표 데이터 update
        self.objSeries.update(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1],
                              chartData['V'][-1])
        self.objIndex.update()
        cntofIndex = self.objIndex.ItemCount
        print("지표 개수:  ", cntofIndex)
 
        indexName = ["MACD", "SIGNAL", "OSC"]
 
        result['MACD'] = []
        result['SIGNAL'] = []
        result['OSC'] = []
 
        for index in range(cntofIndex):
            cnt = self.objIndex.GetCount(index)
            # 마지막 데이터만 업데이트 한다
            value = self.objIndex.GetResult(index, cnt - 1)
            result[indexName[index]].append(value)
 
        print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1]))
        return (True, result)
 
 
# 분차트 관리 클래스
#   주어진 주기로 분차트 조회 , 실시간 분차트 데이터 생성, MACD 계산 호출
class CMinchartData:
    def __init__(self, interval):
        # interval : 분차트 주기
        self.interval = interval
        self.objCur = {}
        self.data = {}
        self.code = ''
        self.objMACD = CMACD()
        self.LASTTIME = 1530
 
        # 오늘 날짜
        now = time.localtime()
        self.todayDate = now.tm_year * 10000 + now.tm_mon * 100 + now.tm_mday
        print(self.todayDate)
 
    def MonCode(self, code):
        self.data = {}
        self.code = code
 
        self.data['O'] = []
        self.data['H'] = []
        self.data['L'] = []
        self.data['C'] = []
        self.data['V'] = []
        self.data['D'] = []
        self.data['T'] = []
        self.data['MACD'] = []
        self.data['SIGNAL'] = []
        self.data['OSC'] = []
 
        # 차트 기본 통신
        self.rqChartMinData(code, self.interval)
 
        # MACD 클래스에 수신 받은 차트 데이터 세팅
        self.objMACD.setChartData(self.data)
        # MACD 계산 하기
        ret, result = self.objMACD.makeMACD()
 
        self.data['MACD'] = result['MACD']
        self.data['SIGNAL'] = result['SIGNAL']
        self.data['OSC'] = result['OSC']
 
        # 실시간 시세 요청
        if (code not in self.objCur):
            self.objCur[code] = CpPBStockCur()
            self.objCur[code].Subscribe(code, self)
 
    def stop(self):
        for k, v in self.objCur.items():
            v.Unsubscribe()
        self.objCur = {}
 
    # 분차트 - 코드, 주기, 개수
    def rqChartMinData(self, code, interval):
        objRq = win32com.client.Dispatch("CpSysDib.StockChart")
 
        objRq.SetInputValue(0, code)  # 종목 코드
        objRq.SetInputValue(1, ord('2'))  # 개수로 조회
        objRq.SetInputValue(4, 500)  # 통신 개수 - 500 개로 고정
        objRq.SetInputValue(5, [0, 1, 2, 3, 4, 5, 8])  # 날짜,시간, 시가,고가,저가,종가,거래량
        objRq.SetInputValue(6, ord('m'))  # '차트 주가 - 분 데이터
        objRq.SetInputValue(7, interval)  # 차트 주기
        objRq.SetInputValue(9, ord('1'))  # 9 - 수정주가(char)
 
        totlen = 0
        objRq.BlockRequest()
        rqStatus = objRq.GetDibStatus()
        rqRet = objRq.GetDibMsg1()
        print("통신상태", rqStatus, rqRet)
        if rqStatus != 0:
            exit()
 
        len = objRq.GetHeaderValue(3)
        print(totlen)
        totlen += len
 
        print("날짜", "시가", "고가", "저가", "종가", "거래량")
        print("==============================================-")
 
        for i in range(len):
            day = objRq.GetDataValue(0, i)
            time = objRq.GetDataValue(1, i)
            open = objRq.GetDataValue(2, i)
            high = objRq.GetDataValue(3, i)
            low = objRq.GetDataValue(4, i)
            close = objRq.GetDataValue(5, i)
            vol = objRq.GetDataValue(6, i)
 
            self.data['D'].append(day)
            self.data['T'].append(time)
            self.data['O'].append(open)
            self.data['H'].append(high)
            self.data['L'].append(low)
            self.data['C'].append(close)
            self.data['V'].append(vol)
 
        # 수신된 역순으로 넣는다 -> 최근 날짜가 맨 뒤로 가도록
        self.data['D'].reverse()
        self.data['T'].reverse()
        self.data['O'].reverse()
        self.data['H'].reverse()
        self.data['L'].reverse()
        self.data['C'].reverse()
        self.data['V'].reverse()
 
    # 가격 실시간 변경 시 분차트 데이터 재 계산
    def updateCurData(self, item):
        time = item['time']
        self.cur = cur = item['cur']
        vol = item['vol']
        self.makeMinchart(time, cur, vol)
 
    def getHMTFromTime(self, time):
        hh, mm = divmod(time, 10000)
        mm, tt = divmod(mm, 100)
        return (hh, mm, tt)
 
    def getChartTime(self, time):
        lCurTime = time + self.interval
        hh, mm = divmod(lCurTime, 100)
        if (mm >= 60):
            hh += 1
            mm = 0
        lCurTime = hh * 100 + mm
        if (lCurTime > self.LASTTIME):
            lCurTime = self.LASTTIME
        return lCurTime
 
    # 실시간 데이터를 통해 분차트 업데이트
    def makeMinchart(self, time, cur, vol):
        # time 분해 --> 시, 분, 초
        hh, mm, tt = self.getHMTFromTime(time)
        hhmm = hh * 100 + mm
 
        bFind = False
        nLen = len(self.data['T'])
 
        # 분차트 주기 기준으로 나눠서 시간 차트 시간 계산
        #   1분 봉의 경우 14:10분 봉: 14:09:00~14:09:59
        #   5분 봉의 경우 14:10분 봉 : 14:05:00~ 14:09:59
        a, b = divmod(hhmm, self.interval)
        intervaltime = a * self.interval
        lCurTime = self.getChartTime(intervaltime)
 
        if (nLen > 0):
            lLastTime = self.data['T'][-1]
            if (lLastTime == lCurTime):
                bFind = True
 
                self.data['C'][-1] = cur
                if (self.data['H'][-1] < cur):
                    self.data['H'][-1] = cur
                if (self.data['L'][-1] > cur):
                    self.data['L'][-1] = cur
                self.data['V'][-1] += vol
                print('들어온 시간 %d --> 마지막 분차트 시간 %d 에 업데이트' % (time, lLastTime))
 
                ret, result = self.objMACD.updateMACD(self.data)
                self.data['MACD'][-1] = result['MACD'][0]
                self.data['SIGNAL'][-1] = result['SIGNAL'][0]
                self.data['OSC'][-1] = result['OSC'][0]
 
        # 신규 봉이 추가
        if bFind == False:
            print('들어온 시간 %d --> 새로운 분차트 시간 %d 에 업데이트' % (time, lCurTime))
            self.data['D'].append(self.todayDate)
            self.data['T'].append(lCurTime)
            self.data['O'].append(cur)
            self.data['H'].append(cur)
            self.data['L'].append(cur)
            self.data['C'].append(cur)
            self.data['V'].append(vol)
 
            # 데이터 추가 - MACD 계산 모듈에 차트 데이터 추가
            self.objMACD.addLastData(self.data)
            # MACD 계산
            ret, result = self.objMACD.makeMACD()
 
            self.data['MACD'] = result['MACD']
            self.data['SIGNAL'] = result['SIGNAL']
            self.data['OSC'] = result['OSC']
 
            # MACD 의 신호가 교차됐는지 체크
            self.checkMACD()
 
        return
 
    def checkMACD(self):
        if (len(self.data['OSC']) < 5):
            return
        # 현재 시점에서 이전 봉(-2) 가 매수신호/매도 신호 발생했는 지 체크
        # -1 : 현재 시점 -2: 바로 직전 봉 -3 그 전 봉
        print('osc', self.data['OSC'][-3], self.data['OSC'][-2], self.data['OSC'][-1])
        if self.data['OSC'][-3] < 0:
            if self.data['OSC'][-2] > 0:
                print('MACD 매수, 시간 %d, 가격 %d' % (self.data['T'][-1], self.data['C'][-1]))
        elif self.data['OSC'][-3] > 0:
            if self.data['OSC'][-2] < 0:
                print('MACD 매도, 시간 %d, 가격 %d' % (self.data['T'][-1], self.data['C'][-1]))
 
    def printdata(self):
        nLen = len(self.data['T'])
        for i in range(nLen):
            print(self.data['D'][i],
                  self.data['T'][i],
                  self.data['O'][i],
                  self.data['H'][i],
                  self.data['L'][i],
                  self.data['C'][i],
                  self.data['V'][i],
                  self.data['MACD'][i],
                  self.data['SIGNAL'][i],
                  self.data['OSC'][i])
 
        # print(code, self.minDatas[code])
 
 
################################################
# 테스트를 위한 메인 화면
class MyWindow(QMainWindow):
    def __init__(self):
        super().__init__()
 
        # plus 상태 체크
        if InitPlusCheck() == False:
            exit()
 
        self.minData = CMinchartData(5)
        self.minData.MonCode('A069500')
 
        self.setWindowTitle("주식 분 차트 생성")
        self.setGeometry(300, 300, 300, 180)
 
        nH = 20
 
        btnPrint = QPushButton('print', self)
        btnPrint.move(20, nH)
        btnPrint.clicked.connect(self.btnPrint_clicked)
        nH += 50
 
        btnExit = QPushButton('종료', self)
        btnExit.move(20, nH)
        btnExit.clicked.connect(self.btnExit_clicked)
        nH += 50
 
    def btnPrint_clicked(self):
        self.minData.printdata()
        return
 
    def btnExit_clicked(self):
        self.minData.stop()
        exit()
        return
 
 
if __name__ == "__main__":
    app = QApplication(sys.argv)
    myWindow = MyWindow()
    myWindow.show()
    app.exec_()


프로그램의 전체 코드입니다

 self.minData = CMinchartData(5)
 self.minData.MonCode('A069500')



예제는 069500 종목(KODEX 200), 5분 차트를 기준으로 하도록 했습니다