본문 바로가기
개발/파이썬

[CREON PLUS API 활용] MACD 매매 시그널 실시간 구하기(5분 차트 이용)

by esstory 2018. 11. 23.

차트와 MACD지표를 실시간으로 계산하는 간단한 예제 코드입니다 

MACD 지표는 단기 지수 이동평균과 장기 지수 이동평균의 차로 구해는 비교적 단순한 기술적 지표입니다.



참고로 MACD 지표를 구하는 계산 방법은 아래와 같습니다. 

  • MACD = 12일 지수이동평균- 26일 지수이동평균
  • Signal = MACD 의 9일 지수이동평균
  • Oscillator : MACD - Signal 


여기서 MACD를 직접 계산하지는 않고, CREON PLUS API를 이용해 MACD 계산 값을 넘겨 받습니다 

아래의 CMACD 클래스는 두가지 플러스 Object 의 도움을 받아 지표를 계산합니다.

  • CpIndexes.CpSeries - 차트의 기본 데이터를 저장하는 PLUS 클래스(시가, 고가, 저가, 종가, 거래량)
  • CpIndexes.CpIndex - 실질적인 지표 계산 PLUS 클래스, 150여개의 지표 계산이 가능

class CMACD:
    def __init__(self):
        self.objSeries = win32com.client.Dispatch("CpIndexes.CpSeries")
        self.objIndex = win32com.client.Dispatch("CpIndexes.CpIndex")
 
    # 차트 데이터 세팅 하기
    def setChartData(self, chartData):
        nLen = len(chartData['T'])
        for i in range(nLen):
            self.objSeries.Add(chartData['C'][i], chartData['O'][i], chartData['H'][i], chartData['L'][i],
                               chartData['V'][i])
 
        return
 
    # 기존 차트 데이터에 새로 들어온 신규 데이터 추가
    def addLastData(self, chartData):
        self.objSeries.Add(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1],
                           chartData['V'][-1])
 
    # MACD 계산
    def makeMACD(self):
        result = {}
        # 지표 계산 object
        self.objIndex.series = self.objSeries
        self.objIndex.put_IndexKind("MACD")  # 계산할 지표: MACD
        self.objIndex.put_IndexDefault("MACD")  # MACD 지표 기본 변수 자동 세팅
 
        print("MACD 변수", self.objIndex.get_Term1(), self.objIndex.get_Term2(), self.objIndex.get_Signal())
 
        # 지표 데이터 계산 하기
        self.objIndex.Calculate()
 
        cntofIndex = self.objIndex.ItemCount
        print("지표 개수:  ", cntofIndex)
        indexName = ["MACD", "SIGNAL", "OSC"]
 
        result['MACD'] = []
        result['SIGNAL'] = []
        result['OSC'] = []
        for index in range(cntofIndex):
            cnt = self.objIndex.GetCount(index)
            for j in range(cnt):
                value = self.objIndex.GetResult(index, j)
                result[indexName[index]].append(value)
            # for j in range(cnt) :
            #    value = self.objIndex.GetResult(index,j)
            # print(indexName[index], value)  # 지표의 최근 값 표시
 
        print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1]))
        return (True, result)
 
    # MACD 업데이트(차트 데이터 개수에 변화가 없을 경우에만 사용)
    def updateMACD(self, chartData):
        result = {}
        # 지표 데이터 update
        self.objSeries.update(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1],
                              chartData['V'][-1])
        self.objIndex.update()
        cntofIndex = self.objIndex.ItemCount
        print("지표 개수:  ", cntofIndex)
 
        indexName = ["MACD", "SIGNAL", "OSC"]
 
        result['MACD'] = []
        result['SIGNAL'] = []
        result['OSC'] = []
 
        for index in range(cntofIndex):
            cnt = self.objIndex.GetCount(index)
            # 마지막 데이터만 업데이트 한다
            value = self.objIndex.GetResult(index, cnt - 1)
            result[indexName[index]].append(value)
 
        print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1]))
        return (True, result)





별도의 설명보다는 코드에 가능한 주석을 많이 추가했습니다 ^^; 


이번 예제의 핵심 클래스는 'CMinchartData' 입니다 

분차트를 요청 하고, 실시간으로 업데이트하면서 주어진 주기(예제에서는 5분) 분 차트를 만듭니다 

이 클래스의 주요 함수들은 

  • rqChartMinData - 5분 차트를 요청/수신 받고 기본 데이터를 저장합니다. 수신 받은 데이터로 최초 MACD 를 계산합니다.
  • makeMinchart - 새로운 시세가 올 때 마다 호출됩니다.  최근 봉에 업데이트 할 지 아니면 새로운 봉을 만들지 결정하고 MACD 지표도 다시 계산합니다. 예제는 5분차트이지만 10분, 15분 등 주기 변수만 변경하면 다양한 주기의 분차트를 생성하도록 되어 있습니다.
  • checkMACD - MACD Oscillator 지표가 0 보다 크거나 작을 때를 기준으로 매매 신호를 발생 합니다. 이번 예제에는 봉이 완성된 후 판단하도록 예제를 만들었습니다 (시세가 들어 올 때 마다 판단하면 시그널이 생기고 없어지고를 반복 할 수 있어서)
import sys
from PyQt5.QtWidgets import *
import win32com.client
import ctypes
import time
 
################################################
# PLUS 공통 OBJECT
g_objCodeMgr = win32com.client.Dispatch('CpUtil.CpCodeMgr')
g_objCpStatus = win32com.client.Dispatch('CpUtil.CpCybos')
g_objCpTrade = win32com.client.Dispatch('CpTrade.CpTdUtil')
 
 
################################################
# PLUS 실행 기본 체크 함수
def InitPlusCheck():
    # 프로세스가 관리자 권한으로 실행 여부
    if ctypes.windll.shell32.IsUserAnAdmin():
        print('정상: 관리자권한으로 실행된 프로세스입니다.')
    else:
        print('오류: 일반권한으로 실행됨. 관리자 권한으로 실행해 주세요')
        return False
 
    # 연결 여부 체크
    if (g_objCpStatus.IsConnect == 0):
        print("PLUS가 정상적으로 연결되지 않음. ")
        return False
 
    # 주문 관련 초기화
    if (g_objCpTrade.TradeInit(0) != 0):
        print("주문 초기화 실패")
        return False
 
    return True
 
 
################################################
# CpEvent: 실시간 이벤트 수신 클래스
class CpEvent:
    def set_params(self, client, name, caller):
        self.client = client  # CP 실시간 통신 object
        self.name = name  # 서비스가 다른 이벤트를 구분하기 위한 이름
        self.caller = caller  # callback 을 위해 보관
 
    def OnReceived(self):
        # 실시간 처리 - 현재가 체결 데이터
        if self.name == 'stockcur':
            code = self.client.GetHeaderValue(0)  # 초
            name = self.client.GetHeaderValue(1)  # 초
            timess = self.client.GetHeaderValue(18)  # 초
            exFlag = self.client.GetHeaderValue(19)  # 예상체결 플래그
            cprice = self.client.GetHeaderValue(13)  # 현재가
            diff = self.client.GetHeaderValue(2)  # 대비
            cVol = self.client.GetHeaderValue(17)  # 순간체결수량
            vol = self.client.GetHeaderValue(9)  # 거래량
 
            if exFlag != ord('2'):
                return
 
            item = {}
            item['code'] = code
            item['time'] = timess
            item['diff'] = diff
            item['cur'] = cprice
            item['vol'] = cVol
 
            # 현재가 업데이트
            self.caller.updateCurData(item)
 
            return
 
 
################################################
# plus 실시간 수신 base 클래스
class CpPublish:
    def __init__(self, name, serviceID):
        self.name = name
        self.obj = win32com.client.Dispatch(serviceID)
        self.bIsSB = False
 
    def Subscribe(self, var, caller):
        if self.bIsSB:
            self.Unsubscribe()
 
        if (len(var) > 0):
            self.obj.SetInputValue(0, var)
 
        handler = win32com.client.WithEvents(self.obj, CpEvent)
        handler.set_params(self.obj, self.name, caller)
        self.obj.Subscribe()
        self.bIsSB = True
 
    def Unsubscribe(self):
        if self.bIsSB:
            self.obj.Unsubscribe()
        self.bIsSB = False
 
 
################################################
# CpPBStockCur: 실시간 현재가 요청 클래스
class CpPBStockCur(CpPublish):
    def __init__(self):
        super().__init__('stockcur', 'DsCbo1.StockCur')
 
 
# MACD 지표 계산
class CMACD:
    def __init__(self):
        self.objSeries = win32com.client.Dispatch("CpIndexes.CpSeries")
        self.objIndex = win32com.client.Dispatch("CpIndexes.CpIndex")
 
    # 차트 데이터 세팅 하기
    def setChartData(self, chartData):
        nLen = len(chartData['T'])
        for i in range(nLen):
            self.objSeries.Add(chartData['C'][i], chartData['O'][i], chartData['H'][i], chartData['L'][i],
                               chartData['V'][i])
 
        return
 
    # 기존 차트 데이터에 새로 들어온 신규 데이터 추가
    def addLastData(self, chartData):
        self.objSeries.Add(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1],
                           chartData['V'][-1])
 
    # MACD 계산
    def makeMACD(self):
        result = {}
        # 지표 계산 object
        self.objIndex.series = self.objSeries
        self.objIndex.put_IndexKind("MACD")  # 계산할 지표: MACD
        self.objIndex.put_IndexDefault("MACD")  # MACD 지표 기본 변수 자동 세팅
 
        print("MACD 변수", self.objIndex.get_Term1(), self.objIndex.get_Term2(), self.objIndex.get_Signal())
 
        # 지표 데이터 계산 하기
        self.objIndex.Calculate()
 
        cntofIndex = self.objIndex.ItemCount
        print("지표 개수:  ", cntofIndex)
        indexName = ["MACD", "SIGNAL", "OSC"]
 
        result['MACD'] = []
        result['SIGNAL'] = []
        result['OSC'] = []
        for index in range(cntofIndex):
            cnt = self.objIndex.GetCount(index)
            for j in range(cnt):
                value = self.objIndex.GetResult(index, j)
                result[indexName[index]].append(value)
            # for j in range(cnt) :
            #    value = self.objIndex.GetResult(index,j)
            # print(indexName[index], value)  # 지표의 최근 값 표시
 
        print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1]))
        return (True, result)
 
    # MACD 업데이트(차트 데이터 개수에 변화가 없을 경우에만 사용)
    def updateMACD(self, chartData):
        result = {}
        # 지표 데이터 update
        self.objSeries.update(chartData['C'][-1], chartData['O'][-1], chartData['H'][-1], chartData['L'][-1],
                              chartData['V'][-1])
        self.objIndex.update()
        cntofIndex = self.objIndex.ItemCount
        print("지표 개수:  ", cntofIndex)
 
        indexName = ["MACD", "SIGNAL", "OSC"]
 
        result['MACD'] = []
        result['SIGNAL'] = []
        result['OSC'] = []
 
        for index in range(cntofIndex):
            cnt = self.objIndex.GetCount(index)
            # 마지막 데이터만 업데이트 한다
            value = self.objIndex.GetResult(index, cnt - 1)
            result[indexName[index]].append(value)
 
        print('MACD %.2f SIGNLA %.2f OSC %.2f' % (result['MACD'][-1], result['SIGNAL'][-1], result['OSC'][-1]))
        return (True, result)
 
 
# 분차트 관리 클래스
#   주어진 주기로 분차트 조회 , 실시간 분차트 데이터 생성, MACD 계산 호출
class CMinchartData:
    def __init__(self, interval):
        # interval : 분차트 주기
        self.interval = interval
        self.objCur = {}
        self.data = {}
        self.code = ''
        self.objMACD = CMACD()
        self.LASTTIME = 1530
 
        # 오늘 날짜
        now = time.localtime()
        self.todayDate = now.tm_year * 10000 + now.tm_mon * 100 + now.tm_mday
        print(self.todayDate)
 
    def MonCode(self, code):
        self.data = {}
        self.code = code
 
        self.data['O'] = []
        self.data['H'] = []
        self.data['L'] = []
        self.data['C'] = []
        self.data['V'] = []
        self.data['D'] = []
        self.data['T'] = []
        self.data['MACD'] = []
        self.data['SIGNAL'] = []
        self.data['OSC'] = []
 
        # 차트 기본 통신
        self.rqChartMinData(code, self.interval)
 
        # MACD 클래스에 수신 받은 차트 데이터 세팅
        self.objMACD.setChartData(self.data)
        # MACD 계산 하기
        ret, result = self.objMACD.makeMACD()
 
        self.data['MACD'] = result['MACD']
        self.data['SIGNAL'] = result['SIGNAL']
        self.data['OSC'] = result['OSC']
 
        # 실시간 시세 요청
        if (code not in self.objCur):
            self.objCur[code] = CpPBStockCur()
            self.objCur[code].Subscribe(code, self)
 
    def stop(self):
        for k, v in self.objCur.items():
            v.Unsubscribe()
        self.objCur = {}
 
    # 분차트 - 코드, 주기, 개수
    def rqChartMinData(self, code, interval):
        objRq = win32com.client.Dispatch("CpSysDib.StockChart")
 
        objRq.SetInputValue(0, code)  # 종목 코드
        objRq.SetInputValue(1, ord('2'))  # 개수로 조회
        objRq.SetInputValue(4, 500)  # 통신 개수 - 500 개로 고정
        objRq.SetInputValue(5, [0, 1, 2, 3, 4, 5, 8])  # 날짜,시간, 시가,고가,저가,종가,거래량
        objRq.SetInputValue(6, ord('m'))  # '차트 주가 - 분 데이터
        objRq.SetInputValue(7, interval)  # 차트 주기
        objRq.SetInputValue(9, ord('1'))  # 9 - 수정주가(char)
 
        totlen = 0
        objRq.BlockRequest()
        rqStatus = objRq.GetDibStatus()
        rqRet = objRq.GetDibMsg1()
        print("통신상태", rqStatus, rqRet)
        if rqStatus != 0:
            exit()
 
        len = objRq.GetHeaderValue(3)
        print(totlen)
        totlen += len
 
        print("날짜", "시가", "고가", "저가", "종가", "거래량")
        print("==============================================-")
 
        for i in range(len):
            day = objRq.GetDataValue(0, i)
            time = objRq.GetDataValue(1, i)
            open = objRq.GetDataValue(2, i)
            high = objRq.GetDataValue(3, i)
            low = objRq.GetDataValue(4, i)
            close = objRq.GetDataValue(5, i)
            vol = objRq.GetDataValue(6, i)
 
            self.data['D'].append(day)
            self.data['T'].append(time)
            self.data['O'].append(open)
            self.data['H'].append(high)
            self.data['L'].append(low)
            self.data['C'].append(close)
            self.data['V'].append(vol)
 
        # 수신된 역순으로 넣는다 -> 최근 날짜가 맨 뒤로 가도록
        self.data['D'].reverse()
        self.data['T'].reverse()
        self.data['O'].reverse()
        self.data['H'].reverse()
        self.data['L'].reverse()
        self.data['C'].reverse()
        self.data['V'].reverse()
 
    # 가격 실시간 변경 시 분차트 데이터 재 계산
    def updateCurData(self, item):
        time = item['time']
        self.cur = cur = item['cur']
        vol = item['vol']
        self.makeMinchart(time, cur, vol)
 
    def getHMTFromTime(self, time):
        hh, mm = divmod(time, 10000)
        mm, tt = divmod(mm, 100)
        return (hh, mm, tt)
 
    def getChartTime(self, time):
        lCurTime = time + self.interval
        hh, mm = divmod(lCurTime, 100)
        if (mm >= 60):
            hh += 1
            mm = 0
        lCurTime = hh * 100 + mm
        if (lCurTime > self.LASTTIME):
            lCurTime = self.LASTTIME
        return lCurTime
 
    # 실시간 데이터를 통해 분차트 업데이트
    def makeMinchart(self, time, cur, vol):
        # time 분해 --> 시, 분, 초
        hh, mm, tt = self.getHMTFromTime(time)
        hhmm = hh * 100 + mm
 
        bFind = False
        nLen = len(self.data['T'])
 
        # 분차트 주기 기준으로 나눠서 시간 차트 시간 계산
        #   1분 봉의 경우 14:10분 봉: 14:09:00~14:09:59
        #   5분 봉의 경우 14:10분 봉 : 14:05:00~ 14:09:59
        a, b = divmod(hhmm, self.interval)
        intervaltime = a * self.interval
        lCurTime = self.getChartTime(intervaltime)
 
        if (nLen > 0):
            lLastTime = self.data['T'][-1]
            if (lLastTime == lCurTime):
                bFind = True
 
                self.data['C'][-1] = cur
                if (self.data['H'][-1] < cur):
                    self.data['H'][-1] = cur
                if (self.data['L'][-1] > cur):
                    self.data['L'][-1] = cur
                self.data['V'][-1] += vol
                print('들어온 시간 %d --> 마지막 분차트 시간 %d 에 업데이트' % (time, lLastTime))
 
                ret, result = self.objMACD.updateMACD(self.data)
                self.data['MACD'][-1] = result['MACD'][0]
                self.data['SIGNAL'][-1] = result['SIGNAL'][0]
                self.data['OSC'][-1] = result['OSC'][0]
 
        # 신규 봉이 추가
        if bFind == False:
            print('들어온 시간 %d --> 새로운 분차트 시간 %d 에 업데이트' % (time, lCurTime))
            self.data['D'].append(self.todayDate)
            self.data['T'].append(lCurTime)
            self.data['O'].append(cur)
            self.data['H'].append(cur)
            self.data['L'].append(cur)
            self.data['C'].append(cur)
            self.data['V'].append(vol)
 
            # 데이터 추가 - MACD 계산 모듈에 차트 데이터 추가
            self.objMACD.addLastData(self.data)
            # MACD 계산
            ret, result = self.objMACD.makeMACD()
 
            self.data['MACD'] = result['MACD']
            self.data['SIGNAL'] = result['SIGNAL']
            self.data['OSC'] = result['OSC']
 
            # MACD 의 신호가 교차됐는지 체크
            self.checkMACD()
 
        return
 
    def checkMACD(self):
        if (len(self.data['OSC']) < 5):
            return
        # 현재 시점에서 이전 봉(-2) 가 매수신호/매도 신호 발생했는 지 체크
        # -1 : 현재 시점 -2: 바로 직전 봉 -3 그 전 봉
        print('osc', self.data['OSC'][-3], self.data['OSC'][-2], self.data['OSC'][-1])
        if self.data['OSC'][-3] < 0:
            if self.data['OSC'][-2] > 0:
                print('MACD 매수, 시간 %d, 가격 %d' % (self.data['T'][-1], self.data['C'][-1]))
        elif self.data['OSC'][-3] > 0:
            if self.data['OSC'][-2] < 0:
                print('MACD 매도, 시간 %d, 가격 %d' % (self.data['T'][-1], self.data['C'][-1]))
 
    def printdata(self):
        nLen = len(self.data['T'])
        for i in range(nLen):
            print(self.data['D'][i],
                  self.data['T'][i],
                  self.data['O'][i],
                  self.data['H'][i],
                  self.data['L'][i],
                  self.data['C'][i],
                  self.data['V'][i],
                  self.data['MACD'][i],
                  self.data['SIGNAL'][i],
                  self.data['OSC'][i])
 
        # print(code, self.minDatas[code])
 
 
################################################
# 테스트를 위한 메인 화면
class MyWindow(QMainWindow):
    def __init__(self):
        super().__init__()
 
        # plus 상태 체크
        if InitPlusCheck() == False:
            exit()
 
        self.minData = CMinchartData(5)
        self.minData.MonCode('A069500')
 
        self.setWindowTitle("주식 분 차트 생성")
        self.setGeometry(300, 300, 300, 180)
 
        nH = 20
 
        btnPrint = QPushButton('print', self)
        btnPrint.move(20, nH)
        btnPrint.clicked.connect(self.btnPrint_clicked)
        nH += 50
 
        btnExit = QPushButton('종료', self)
        btnExit.move(20, nH)
        btnExit.clicked.connect(self.btnExit_clicked)
        nH += 50
 
    def btnPrint_clicked(self):
        self.minData.printdata()
        return
 
    def btnExit_clicked(self):
        self.minData.stop()
        exit()
        return
 
 
if __name__ == "__main__":
    app = QApplication(sys.argv)
    myWindow = MyWindow()
    myWindow.show()
    app.exec_()


프로그램의 전체 코드입니다

 self.minData = CMinchartData(5)
 self.minData.MonCode('A069500')



예제는 069500 종목(KODEX 200), 5분 차트를 기준으로 하도록 했습니다


댓글