[DE 프로젝트: 코로나 확진자 수 예측 앱 'CO-THER 19'] 1. 개요 및 AWS PostgreSQL

프로젝트 개요

  • 플라스크(Flask) 웹 구현을 목표로 시작하는 데이터 파이프라인 구축 프로젝트이다.
  • 머신 러닝 모델을 서빙할 예정이고 머신 러닝 성능보다 무에서 파이프라인을 만들어 보는데 의의를 두려고 한다.
  • 태블로를 활용한 분석 및 배포까지 해보고 싶다.
  • 주제는 아무래도 코로나 때문에 굉장히 힘들어하고 있는 한 사람으로서, 기상 변인으로 서울시의 확진자 수를 예측하는 머신 러닝 모델을 만들어보려한다.

데이터 수집

  • API 를 이용한 데이터 수집
  • 일정 시간 간격으로 데이터 수집(Cronjob)

데이터 저장(Store)

  • 관계형 데이터베이스 (postgreSQL)
  • 로컬 데이터베이스를 배포(AWS)

API 서비스 개발 (Service)

  • 수집된 데이터베이스의 데이터를 기반으로 머신러닝 모델을 구성
  • 개발한 모델 API로 사용가능
  • API를 다른 개발자들이 사용할 수 있도록 배포
  • 서비스 사용자의 API 사용을 도와주기 위해, GUI 구성(부트스트랩)

데이터 분석용 대시보드 개발

  • 데이터베이스의 데이터를 기반으로 대시보드에 자신의 의견을 피력하기 위한 그래프 구성
  • 데이터베이스의 데이터를 기반으로 EDA
  • API 서비스 개발(Service)의 모델 결과가 EDA에 포함되도록 그래프를 구성
  • Tableau를 이용한 대시보드 개발
  • 대시보드 배포

데이터 수집

구현 목표

  • 각종 기상에 따른 서울시의 코로나 확진자 수 예측할 것이다.

필요 데이터

  • 일별 확진자 수 및 기상데이터

일별 확진자 수

기상

데이터 저장

  • API를 활용하여 클라우드 RDB에 스키마 형태로 저장해본다.

AWS RDS PostgreSQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Weather API
import requests
import psycopg2
import logging
import sys
import json

key = ''
startDate = '20201005'
endDate = '20211005'
location = '108' # 서울

host = "covid.cjwptwa04yyi.ap-northeast-2.rds.amazonaws.com"
port = 5432
username = "sixmini"
database = "postgres"
password = ""


def main():
    # postgreSQL 연결
    try:
        conn = psycopg2.connect(
            host=host,
            database=database,
            user=username,
            password=password)
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit()

    # cursor.execute("""CREATE TABLE weather (
    #     date VARCHAR PRIMARY KEY NOT NULL,
    #     avgTa FLOAT,
    #     minTa FLOAT,
    #     minTaHrmt FLOAT,
    #     maxTa FLOAT,
    #     maxTaHrmt FLOAT,
    #     mi10MaxRn FLOAT,
    #     mi10MaxRnHrmt FLOAT,
    #     hr1MaxRn FLOAT,
    #     hr1MaxRnHrmt FLOAT,
    #     sumRnDur FLOAT,
    #     sumRn FLOAT,
    #     maxInsWs FLOAT,
    #     maxInsWsWd FLOAT,
    #     maxInsWsHrmt FLOAT,
    #     maxWs FLOAT,
    #     maxWsWd FLOAT,
    #     maxWsHrmt FLOAT,
    #     avgWs FLOAT,
    #     hr24SumRws FLOAT,
    #     maxWd FLOAT,
    #     avgTd FLOAT,
    #     minRhm FLOAT,
    #     minRhmHrmt FLOAT,
    #     avgRhm FLOAT,
    #     avgPv FLOAT,
    #     avgPa FLOAT,
    #     maxPs FLOAT,
    #     maxPsHrmt FLOAT,
    #     minPs FLOAT,
    #     minPsHrmt FLOAT,
    #     avgPs FLOAT,
    #     ssDur FLOAT,
    #     sumSsHr FLOAT,
    #     hr1MaxIcsrHrmt FLOAT,
    #     hr1MaxIcsr FLOAT,
    #     sumGsr FLOAT,
    #     ddMefs FLOAT,
    #     ddMefsHrmt FLOAT,
    #     ddMes FLOAT,
    #     ddMesHrmt FLOAT,
    #     sumDpthFhsc FLOAT,
    #     avgTca FLOAT,
    #     avgLmac FLOAT,
    #     avgTs FLOAT,
    #     minTg FLOAT,
    #     avgCm5Te FLOAT,
    #     avgCm10Te FLOAT,
    #     avgCm20Te FLOAT,
    #     avgCm30Te FLOAT,
    #     avgM05Te FLOAT,
    #     avgM10Te FLOAT,
    #     avgM15Te FLOAT,
    #     avgM30Te FLOAT,
    #     avgM50Te FLOAT,
    #     sumLrgEv FLOAT,
    #     sumSmlEv FLOAT,
    #     n99Rn FLOAT,
    #     iscs VARCHAR,
    #     sumFogDur FLOAT);
    # """)
    # conn.commit()
    # sys.exit()


    # 기상 api
    API_URL = 'http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList?serviceKey={}&pageNo=1&numOfRows=999&dataType=JSON&dataCd=ASOS&dateCd=DAY&startDt={}&endDt={}&stnIds={}'.format(key, startDate, endDate, location)
    
    r = requests.get(API_URL)    
    raw = json.loads(r.text)

    
    weather_raw = raw['response']['body']['items']['item']
    for i in range(len(weather_raw)):
        weather = {}
        try:
            weather.update(
                {
                    'date': weather_raw[i]['tm'],
                    'avgTa': weather_raw[i]['avgTa'],
                    'minTa': weather_raw[i]['minTa'],
                    'minTaHrmt': weather_raw[i]['minTaHrmt'],
                    'maxTa': weather_raw[i]['maxTa'],
                    'maxTaHrmt': weather_raw[i]['maxTaHrmt'],
                    'mi10MaxRn': weather_raw[i]['mi10MaxRn'],
                    'mi10MaxRnHrmt': weather_raw[i]['mi10MaxRnHrmt'],
                    'hr1MaxRn': weather_raw[i]['hr1MaxRn'],
                    'hr1MaxRnHrmt': weather_raw[i]['hr1MaxRnHrmt'],
                    'sumRnDur': weather_raw[i]['sumRnDur'],
                    'sumRn': weather_raw[i]['sumRn'],
                    'maxInsWs': weather_raw[i]['maxInsWs'],
                    'maxInsWsWd': weather_raw[i]['maxInsWsWd'],
                    'maxInsWsHrmt': weather_raw[i]['maxInsWsHrmt'],
                    'maxWs': weather_raw[i]['maxWs'],
                    'maxWsWd': weather_raw[i]['maxWsWd'],
                    'maxWsHrmt': weather_raw[i]['maxWsHrmt'],
                    'avgWs': weather_raw[i]['avgWs'],
                    'hr24SumRws': weather_raw[i]['hr24SumRws'],
                    'maxWd': weather_raw[i]['maxWd'],
                    'avgTd': weather_raw[i]['avgTd'],
                    'minRhm': weather_raw[i]['minRhm'],
                    'minRhmHrmt': weather_raw[i]['minRhmHrmt'],
                    'avgRhm': weather_raw[i]['avgRhm'],
                    'avgPv': weather_raw[i]['avgPv'],
                    'avgPa': weather_raw[i]['avgPa'],
                    'maxPs': weather_raw[i]['maxPs'],
                    'maxPsHrmt': weather_raw[i]['maxPsHrmt'],
                    'minPs': weather_raw[i]['minPs'],
                    'minPsHrmt': weather_raw[i]['minPsHrmt'],
                    'avgPs': weather_raw[i]['avgPs'],
                    'ssDur': weather_raw[i]['ssDur'],
                    'sumSsHr': weather_raw[i]['sumSsHr'],
                    'hr1MaxIcsrHrmt': weather_raw[i]['hr1MaxIcsrHrmt'],
                    'hr1MaxIcsr': weather_raw[i]['hr1MaxIcsr'],
                    'sumGsr': weather_raw[i]['sumGsr'],
                    'ddMefs': weather_raw[i]['ddMefs'],
                    'ddMefsHrmt': weather_raw[i]['ddMefsHrmt'],
                    'ddMes': weather_raw[i]['ddMes'],
                    'ddMesHrmt': weather_raw[i]['ddMesHrmt'],
                    'sumDpthFhsc': weather_raw[i]['sumDpthFhsc'],
                    'avgTca': weather_raw[i]['avgTca'],
                    'avgLmac': weather_raw[i]['avgLmac'],
                    'avgTs': weather_raw[i]['avgTs'],
                    'minTg': weather_raw[i]['minTg'],
                    'avgCm5Te': weather_raw[i]['avgCm5Te'],
                    'avgCm10Te': weather_raw[i]['avgCm10Te'],
                    'avgCm20Te': weather_raw[i]['avgCm20Te'],
                    'avgCm30Te': weather_raw[i]['avgCm30Te'],
                    'avgM05Te': weather_raw[i]['avgM05Te'],
                    'avgM10Te': weather_raw[i]['avgM10Te'],
                    'avgM15Te': weather_raw[i]['avgM15Te'],
                    'avgM30Te': weather_raw[i]['avgM30Te'],
                    'avgM50Te': weather_raw[i]['avgM50Te'],
                    'sumLrgEv': weather_raw[i]['sumLrgEv'],
                    'sumSmlEv': weather_raw[i]['sumSmlEv'],
                    'n99Rn': weather_raw[i]['n99Rn'],
                    'iscs': weather_raw[i]['iscs'],
                    'sumFogDur': weather_raw[i]['sumFogDur']
                }
            )
            for i in weather:
                if weather[i] == '':
                    weather[i] = '0'
            insert_row(cursor, weather, 'weather')
        except:
            print('some error!')
            continue
        
    conn.commit()

    cursor.execute("SELECT date FROM weather")
    print(cursor.fetchall())
    

def insert_row(cursor, data, table):
    placeholders = ', '.join(['%s'] * len(data))
    columns = ', '.join(data.keys())
    key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
    sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON CONFLICT ( %s ) DO UPDATE SET  %s" % (table, columns, placeholders, list(data.keys())[0] ,key_placeholders)
    cursor.execute(sql, list(data.values())*2)


if __name__=='__main__':
    main()
  • 확진자 수는 csv파일로 제공되어있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Confirmed
import requests
import psycopg2
import logging
import sys
import json
import csv

key = ''
startDate = '20201005'
endDate = '20211005'
location = '108' # 서울

host = "covid.cjwptwa04yyi.ap-northeast-2.rds.amazonaws.com"
port = 5432
username = "sixmini"
database = "postgres"
password = ""


def main():
    # postgreSQL 연결
    try:
        conn = psycopg2.connect(
            host=host,
            database=database,
            user=username,
            password=password)
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit()

    # cursor.execute("""CREATE TABLE confirmed (
    #     date VARCHAR PRIMARY KEY NOT NULL,
    #     confirmed INT,
    #     FOREIGN KEY(date) REFERENCES weather(date))
    # """)
    # conn.commit()
    # sys.exit()

    with open('covid-confirmed-in-seoul.csv') as f:
        raw = csv.reader(f)
        for row in raw:
            confirmed = {}
            confirmed.update(
                {
                    'date': row[0],
                    'confirmed': row[1]
                }
            )
            insert_row(cursor, confirmed, 'confirmed')
    conn.commit()


def insert_row(cursor, data, table):
    
    placeholders = ', '.join(['%s'] * len(data))
    columns = ', '.join(data.keys())
    key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
    sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON CONFLICT ( %s ) DO UPDATE SET  %s" % (table, columns, placeholders, list(data.keys())[0] ,key_placeholders)
    cursor.execute(sql, list(data.values())*2)


if __name__=='__main__':
    main()
  • 정상동작 확인 결과 이상 없다.

스크린샷 2021-10-10 06 32 45

0%