# -*- coding: utf-8 -*-
import re
import math
import warnings
import numpy as np
import pandas as pd
import traceback
from typing import Union, List, Dict, Tuple, Pattern, Optional
from collections import OrderedDict
from pandas import DataFrame
from datetime import datetime
from dateutil.relativedelta import relativedelta
from bs4 import BeautifulSoup
from bs4.element import Tag
from dart_fss.api.filings import get_corp_info
from dart_fss.filings.reports import Report
from dart_fss.filings import search as search_filings
from dart_fss.utils import str_compare, str_unit_to_number_unit, is_notebook
from dart_fss.utils import str_insert_whitespace as ws
from dart_fss.errors.errors import NotFoundConsolidated, NoDataReceived
from dart_fss.utils import str_to_regex, get_currency_str
from dart_fss.fs.fs import FinancialStatement
from dart_fss.filings.search_result import SearchResults
def str_to_float(text: str, unit: float) -> float:
""" 문자를 float 데이터로 변환
문자를 float 데이터로 변환, (1,000) 같은 경우 -1000 으로 변환
Parameters
----------
text: str
입력문자
unit: float
unit for table
Returns
-------
float
변환된 숫자
"""
regex = re.compile(r'\((-*\d+)\)|\(-\)(\d+)') # 음수 처리를 위한 정규식
regex_korean = re.compile(r'[ㄱ-힣]|[a-zA-Z]')
if isinstance(text, str):
try:
text = re.sub(r',|\s+', '', text)
if regex_korean.search(text):
value = float(regex_korean.sub('', text))
# Value 값에 단위가 들어간 경우 unit으로 나누어 이후 계산에서 일괄적으로 곱해질 unit 값을 제거한다
if re.search('원', text):
return value / unit
else:
return value
if regex.search(text):
value = regex.search(text).group(1)
if value is None:
value = regex.search(text).group(2)
return -float(value)
else:
return float(text)
except (ValueError, TypeError):
return 0.0
elif isinstance(text, (int, float)):
return float(text)
else:
raise ValueError('Invalid Value: {}'.format(text))
regex_br = re.compile(r'<br\s*?\/?\s*?>')
def text_split_by_br(tag) -> list:
s = regex_br.sub('\n', str(tag), re.MULTILINE)
t = BeautifulSoup(s, 'html.parser')
s = t.get_text()
res = [x for x in s.split('\n') if len(x) > 0]
return res
def get_datetime(year, month, day):
try:
return datetime(year, month, day)
except ValueError:
if month not in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]:
warnings_text = "Month must be in 1..12"
warnings.warn(warnings_text, RuntimeWarning)
return None
if day != 0:
if month == 12:
dt = datetime(year + 1, 1, 1)
else:
dt = datetime(year, month + 1, 1)
dt = dt - relativedelta(day=1)
else:
dt = datetime(year, month, 1)
warnings_text = "Day({0}.{1}.{2}) is out of range for month. dart-fss tries to fix {0}.{1}.{2} to {0}.{1}.{3}".format(year, month, day, dt.day)
warnings.warn(warnings_text, RuntimeWarning)
return dt
def extract_date_from_header(header):
""" 재무제표 기간 추출을 위해 사용하는 method"""
# YYYY년 MM월 DD일 형태 검색
regex = re.compile(r'(\d{4})[^0-9:]*\s*(\d{1,2})[^0-9]*\s*(\d{1,2})') # Exclude ':'
# YYYY년 MM월 DD일 M'M'월 D'D'일 형태 검색
regex2 = re.compile(r'(\d{4})[^0-9:]*\s*(\d{1,2})[^0-9]*\s*(\d{1,2})[^0-9]*\s*(\d{1,2})[^0-9]*\s*(\d{1,2})') # Exclude ':'
date_info = []
td_list = header.find_all('td')
re_nbsp = re.compile(r'\xa0')
re_space = re.compile(r'\s+')
for td in td_list:
# Remove white text in tag
for tag in td.find_all(style=re.compile(r'color:#ffffff', re.IGNORECASE)):
tag.decompose()
texts = text_split_by_br(td)
for text in texts:
text = re_nbsp.sub(' ', text)
text = re_space.sub(' ', text)
searched = regex.findall(text)
searched2 = regex2.findall(text)
# 잘못된 Table Header 검색시 필터링
searched_length = len(searched)
if len(text) > searched_length * 80:
continue
elif searched_length > 0:
f = searched[0]
if len(searched2) == 0:
# 오류 방지를 위해 Dummy 값 삽입
searched2 = [[9999, 99, 99, 99, 99]]
s = searched2[0]
# 만약 regex와 regex2의 첫번째 결과 값이 동일할때 regex2로 검색처리
# 제21(당)기 2018년 01월 01일부터 12월 31일 까지 형태 처리
if f[1] == s[1] and f[2] == s[2] and int(s[3]) < 13 and int(s[4]) < 32:
date = []
year = int(s[0])
month = int(s[1])
day = int(s[2])
dt = get_datetime(year, month, day)
if dt is not None:
date.append(dt)
month = int(s[3])
day = int(s[4])
dt = get_datetime(year, month, day)
if dt is not None:
date.append(dt)
if len(date) > 0:
date_info.append(tuple(date))
else:
date = []
for d in searched:
year = int(d[0])
month = int(d[1])
day = int(d[2])
dt = get_datetime(year, month, day)
if dt is not None:
date.append(dt)
if len(date) > 0:
date_info.append(tuple(date))
return date_info
def extract_unit_from_header(header):
""" html에서 unit을 추출하는 함수 """
unit_regex = re.compile(r'\(단위\s*?:\s*(.*)\)')
td_list = header.find_all('td')
for td in td_list:
searched = unit_regex.search(td.text)
if searched:
return searched.group(1)
return '원'
def convert_thead_into_columns(fs_tp: str, fs_table: dict, separate: bool = False,
lang: str = 'ko'):
""" thead에서 DataFrame의 columns을 추출하는 Method"""
def column_ko_to_en(ko):
ko_to_en = {
'과목': 'label_ko',
'주석': 'comment'
}
en = ko_to_en.get(ko)
return en if en else ko
thead = fs_table['table'].thead
if thead is None:
tt = fs_table['table'].tbody.tr.extract()
thead = BeautifulSoup('<thead></thead>', 'html.parser')
thead.thead.append(tt)
for td in thead.tr.find_all('td'):
td.name = 'th'
th_colspan_list = [int(th.attrs.get('colspan', 1)) for th in thead.tr.find_all('th')]
date_info = extract_date_from_header(fs_table['header'])
# 검색된 날짜가 없을경우 Empty array return
if len(date_info) == 0:
return []
# Regular Expression for title
regex = str_to_regex('과목 OR 주석')
fs_string = {
'bs': 'Statement of financial position',
'is': 'Income statement',
'cis': 'Statement of comprehensive income',
'cf': 'Statement of cash flows'
}
str_unit = extract_unit_from_header(fs_table['header'])
str_unit = get_currency_str(str_unit)
if str_unit:
for key in fs_string:
fs_string[key] = fs_string[key] + '(Unit: {})'.format(str_unit)
label = {
'ko': {
True: '별도재무제표',
False: '연결재무제표'
},
'en': {
True: 'Separate',
False: 'Consolidated'
}
}
# 최대 Col
col_length = sum(th_colspan_list)
# 최대 Row
row_length = len(thead.find_all('tr'))
row_length = row_length + 1 if row_length == 1 else row_length
# row-sapn, col-span을 처리하기 위한 Matrix
columns_matrix = [[None for _y in range(col_length)] for _x in range(row_length)]
# Bug fix (#141): Return an empty array if a duplicate column exists
duplicate_check = []
for idx, tr in enumerate(thead.find_all('tr')):
start_idx = 0
for ele_idx, element in enumerate(columns_matrix[idx]):
if element is None:
start_idx = ele_idx
break
for jdx, th in enumerate(tr.find_all('th')):
row_span = int(th.attrs.get('rowspan', 1))
col_span = int(th.attrs.get('colspan', 1))
text = re.sub(r'\s+', '', th.text)
# Bug fix (#141): Return an empty array if a duplicate column exists
if text in duplicate_check:
return []
duplicate_check.append(text)
# Fix bug(#76)
if len(text) == 0:
continue
date_list = [datetime(1900, 1, 1)]
if idx == 0:
if jdx == 0:
text = '과목'
elif regex.search(text) is None:
if len(date_info) > 0:
date_list = date_info.pop(0)
else:
date = '-'.join([date.strftime('%Y%m%d') for date in date_list])
warnings_text = "Date data length does not match table header."\
+ "So last date was set using last data({}). ".format(date)
warnings.warn(warnings_text, RuntimeWarning)
text = '-'.join([date.strftime('%Y%m%d') for date in date_list])
if regex.search(text):
row_span = 2
for mdx in range(row_span):
for ndx in range(col_span):
new_text = text
if mdx == 0 and regex.search(text):
new_text = fs_string[fs_tp]
columns_matrix[idx + mdx][start_idx + ndx] = new_text
start_idx = start_idx + ndx + 1
# Bug fix (#141): Return an empty array if a duplicate '과목' exists
if len(columns_matrix) < 2 or columns_matrix[1].count('과목') > 1:
return []
regex_3month = re.compile(r'3개월')
regex_total = str_to_regex(r'누적 OR 금액')
columns = []
for jdx in range(len(columns_matrix[0])):
column = []
sec_item = []
for idx in range(len(columns_matrix)):
item = columns_matrix[idx][jdx]
if idx == 0:
column.append(item)
continue
elif idx == 1 and (item is None or regex.search(item) is None):
sec_item.append(label[lang][separate])
else:
pass
if item is None:
pass
elif str_compare(column[0], item):
continue
elif regex_3month.search(item):
# extract date info
date_info = [datetime.strptime(date_str, '%Y%m%d') for date_str in column[0].split('-')]
# calculating start_dt
delta = relativedelta(months=3)
start_dt = date_info[1] - delta
start_dt = start_dt.replace(day=1)
end_dt = date_info[1]
column[0] = '-'.join([date.strftime('%Y%m%d') for date in [start_dt, end_dt]])
elif regex_total.search(item):
pass
else:
sec_item.append(column_ko_to_en(item))
if sec_item[0] in ['label_ko', 'comment']:
column.append(sec_item[0])
else:
column.append(tuple(sec_item))
columns.append(column)
return columns
def convert_tbody_to_dataframe(columns: list, fs_table: dict):
""" Html의 tbody를 DataFrame으로 변환하는 함수"""
column_matrix = OrderedDict()
for idx, column in enumerate(columns):
key = tuple(column)
if column_matrix.get(key):
column_matrix[key].append(idx)
else:
column_matrix[key] = []
column_matrix[key].append(idx)
deduplicated = [key for key in column_matrix]
df_columns = pd.MultiIndex.from_tuples(deduplicated)
df = pd.DataFrame(columns=df_columns)
tbody = fs_table['table'].tbody
regex = str_to_regex('label_ko OR comment')
str_unit = extract_unit_from_header(fs_table['header'])
unit = str_unit_to_number_unit(str_unit)
unit_regex = re.compile(r'\(단위\s*?:\s*([a-zA-Zㄱ-힣])\)')
# br 태그에 의해 구분되는 경우 처리하기 위한 함수
def get_text_before_newline(tag):
br = tag.find('br')
if br is None:
# br 태그가 없을시 단순 반환
return tag.text
else:
text = ''
for x in br.previous_siblings:
text += str(x)
# br 태그로 구분되는 경우 첫번째 라인 텍스트만 반환
return BeautifulSoup(text, 'html.parser').text
for idx, tr in enumerate(tbody.find_all('tr')):
extracted = [re.sub(r'\s+|=+', '', get_text_before_newline(td)) for td in tr.find_all('td')]
row = {key: 0 for key in deduplicated}
for key, index_list in column_matrix.items():
for index in index_list:
if len(extracted) <= index:
row[key] = None
elif isinstance(key[1], str):
row[key] = extracted[index]
elif regex.search(' '.join(key[1])):
value = extracted[index]
row[key] = value
else:
value = str_to_float(extracted[index], unit)
row[key] += value
if isinstance(row[key], float):
if abs(row[key]) < 1e-10:
row[key] = ''
else:
row[key] = row[key] * unit
ordered_list = []
for column in df_columns.tolist():
ordered_list.append(row.get(column, None))
try:
if len(ordered_list) > 0 and ordered_list[0] is not None:
row_unit = unit_regex.search(ordered_list[0])
else:
row_unit = False
except TypeError as ex :
warnings_text = '{} : {}'.format(repr(ex), ordered_list[0])
warnings.warn(warnings_text, RuntimeWarning)
row_unit = False
if row_unit:
row_unit = str_unit_to_number_unit(row_unit.group(1))
for jdx, value in enumerate(ordered_list):
if isinstance(value, str):
pass
else:
ordered_list[jdx] = ordered_list[jdx] / unit * row_unit
df.loc[idx] = ordered_list
return df
def seek_table(tables: List, includes: Pattern,
excludes: Union[Pattern, None] = None) -> Tuple[Union[str, None], Union[str, None], Union[str, None]]:
""" Table 검색 """
# 날짜 검색을 위한 Regular Expression
regex = re.compile(r'\d{4}(.*?)\d{1,2}(.*?)\d{1,2}')
# Header Tag 가 아닌 경우 저장
not_headers = []
# Minimum Row Number
MIN_ROW_NUMBER = 4
for table in tables:
# Table 의 Row 가 4개 이하인 경우 재무제표 테이블이 아닌것으로 판정
rows = table.find_all('tr')
if len(rows) < MIN_ROW_NUMBER:
continue
for tag in table.previous_siblings:
# tag 가 tables 에 있으면 검색 종료
if tag in tables:
break
# tag 가 Tag Object 인 경우에만 검색 진행
if isinstance(tag, Tag):
# title 검색
children = tag.find_all(text=includes)
if len(children) == 0: # 부국증권도 사업보고서 검색 안되던 문제 해결을 위한 코드(#66)
texts = text_split_by_br(tag)
children = [text for text in texts if includes.search(text)]
for child in children:
title = child
if title:
title = re.sub(r'\s+', '', title)
# 만약 타이틀에 제외될 단어 포함시 Pass
if excludes and excludes.search(title):
not_headers.append(tag)
continue
# 타이틀이 너무 길때 Pass
# len(title) > 12 일때 일부 회사(메리츠화재)에서 연결포괄손익계산서 검색이 안되는 문제가 발생
if len(title) > 13:
not_headers.append(tag)
continue
headers = table.find_all_previous('table', class_='nb')
for header in headers:
# Header 가 None 이거나 not_headers 에 포함된 경우 Pass
if header is None or header in not_headers:
continue
# Row 가 2개 이하인 경우 Pass
tr_list = header.find_all('tr')
if len(tr_list) < 2:
continue
# 검색된 날짜가 한개도 없을 경우 Pass
datetime_cnt = 0
for tr in tr_list:
if regex.search(tr.text):
datetime_cnt += 1
if datetime_cnt == 0:
continue
return title, header, table
return None, None, None
def search_fs_table(tables: List, fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'),
separate: bool = False) -> Dict[str, dict]:
"""
페이지의 재무제표 테이블을 검색하는 함수
Parameters
----------
tables: list of ResultSet
page 내부에서 검색된 모든 Tables
fs_tp: tuple of str
'bs' 재무상태표, 'is' 손익계산서, 'cis' 포괄손익계산서, 'cf' 현금흐름표
separate: bool
개별 재무제표 여부
Returns
-------
dict of {str : dict }
검색된 재무제표 결과
"""
fs_table = OrderedDict()
# 순서대로 검색 (순서 변경 금지)
queryset = {
'bs': ws('재무상태표') + ' OR ' + ws('대차대조표'),
'is': ws('손익계산서'),
'cis': ws('포괄손익계산서'),
'cf': ws('현금흐름표'),
}
for key, query in queryset.items():
if key not in fs_tp:
continue
# 연결재무제표 검색시 사용할 query 구문
excludes = None
if not separate:
query = query + ' AND ' + ws('연결')
else:
excludes = ws('연결')
if key == 'is':
if excludes:
excludes += ' OR ' + ws('포괄')
else:
excludes = ws('포괄')
if excludes:
excludes = str_to_regex(excludes)
regex = str_to_regex(query)
title, header, tb = seek_table(tables=tables, includes=regex, excludes=excludes)
fs_table[key] = {'title': title, 'header': header, 'table': tb}
return fs_table
def extract_fs_table(fs_table, fs_tp, separate: bool = False, lang: str = 'ko', report = None):
results = OrderedDict()
for tp, table in fs_table.items():
if tp in fs_tp:
df = None
if table['table']:
try:
columns = convert_thead_into_columns(fs_tp=tp, fs_table=table, separate=separate, lang=lang)
# Table Header 가 추출된 경우에만 Table Body 추출
if len(columns) > 0:
df = convert_tbody_to_dataframe(columns=columns, fs_table=table)
except Exception as ex :
traceback.print_exc()
report_dict = None
if report is not None :
report_dict = report.to_dict()
warnings_text = 'Unable to extract a {} type financial statement. : {}'.format(tp, report_dict)
warnings.warn(warnings_text, RuntimeWarning)
results[tp] = df
return results
def report_find_all(report: Report, query: dict, fs_tp: Tuple[str], separate: bool) -> Tuple[int, Dict[str, Dict]]:
"""
Report의 Page 중 Query 조건에 맞는 페이지 검색후 모든 재무제표 Table 추출
Parameters
----------
report: Report
Report
query: dict
검색 조건
fs_tp: tuple of str
검색할 재무제표 타입
separate: bool
개별 재무제표 여부
Returns
-------
"""
count = 0
fs_table = None
searched_end = False
searched = report.find_all(**query)
for key in searched:
for page in searched[key]:
non_break_space = u'\xa0'
html = page.html.replace(non_break_space, ' ')
soup = BeautifulSoup(html, 'html.parser')
tables = soup.find_all('table', border='1')
fs_table = search_fs_table(tables=tables, fs_tp=fs_tp, separate=separate)
count = sum([fs_table[fs_tp]['table'] is not None for fs_tp in fs_table])
if count > 0:
searched_end = True
break
if searched_end:
break
return count, fs_table
def analyze_html(report: Report, fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'),
lang: str = 'ko', separate: bool = False) -> Union[Dict[str, DataFrame], None]:
"""
보고서의 HTML을 이용하여 재무제표를 추출하는 Method
Parameters
----------
report: Report
리포트
fs_tp: tuple of str
'bs' 재무상태표, 'is' 손익계산서, 'cis' 포괄손익계산서, 'cf' 현금흐름표
lang: str
'ko': 한글 / 'en' 영문
separate: bool
개별 재무제표 여부
Returns
-------
dict of {str: DataFrame}
재무제표
"""
inc = ['재무제표', '감사보고서']
exc = ['주석', '결합', '의견', '수정', '검토보고서']
includes = ' OR '.join([ws(x) for x in inc])
excludes = ' OR '.join([ws(x) for x in exc])
query = {
'includes': includes,
'excludes':excludes,
'scope': ['attached_reports', 'pages'],
'options': {'title': True} # 첨부보고서 및 연결보고서의 title 까지 검색
}
if separate:
query['excludes'] += ' OR ' + ws('연결')
else:
query['includes'] += ' AND ' + ws('연결')
count, fs_table = report_find_all(report, query, fs_tp, separate)
# 검색결과가 없을시 재검색, 검색 키워드 변경
if count == 0:
query = {
'includes': r'재무제표 OR 명세서',
'excludes': r'주석 OR 결합 OR 의견 OR 수정 OR 검토보고서',
'scope': ['attached_reports', 'pages']
}
count, fs_table = report_find_all(report, query, fs_tp, separate)
# 수정된 검색어의 경우에도 검색결과가 없을시, None 반환
if count == 0:
return None
extract_results = extract_fs_table(fs_table=fs_table, fs_tp=fs_tp, separate=separate, lang=lang, report=report)
return extract_results
def find_all_columns(df: DataFrame, query: str) -> pd.Index:
"""
DataFrame의 column을 검색어를 통해 검색하는 함수
Parameters
----------
df: DataFrame
검색할 DataFrame
query: str
검색어
Returns
-------
tuple of str
검색된 DataFrame의 column
"""
regex = str_to_regex(query)
if df is None:
return []
columns = df.columns.tolist()
results = []
for column in columns:
for item in column:
if isinstance(item, str) and regex.search(item):
results.append(column)
else:
if regex.search(' '.join(item)):
results.append(column)
if len(results) > 0:
results = pd.MultiIndex.from_tuples(results)
return results
def extract_account_title(title):
title = title.split('.')
if len(title) == 1:
title = title[0]
elif len(title) > 1:
title = ''.join(title[1:])
title = re.sub(r'\[.*?\]|\(.*?\)|<.*?>|[^가-힣|a-z|A-Z]', '', title)
title = re.sub(r'\s+', '', title)
return title
def compare_df_and_ndf_label_and_concept(column: Tuple[Union[str, Tuple[str]]],
df: DataFrame, ndf: DataFrame, ldf: DataFrame,
ndata: List[Union[float, str, None]],
nlabels: List[str]) -> Tuple[List[Union[float, str]], List[str]]:
"""
Labels 을 시용하여 데이터를 검색하는 함수
Parameters
----------
column: tuple
추가할 column Name
df: dict of { str: DataFrame }
데이터를 추가할 DataFrame
ndf: dict of { str: DataFrame }
데이터를 검색할 DataFrame
ldf: dict of { str: DataFrame }
Label DataFrame
ndata: list of float
추가할 column의 데이터 리스트
nlabels: list of str
추가할 column의 label 리스트
Returns
-------
tuple of list
추가할 column의 데이터 리스트, 추가할 column의 label 리스트
"""
label_none_data = []
df_label_column = find_all_columns(df, 'label_ko')[0]
ndf_label_column = find_all_columns(ndf, 'label_ko')[0]
concept_none_data = {}
df_concept_column = find_all_columns(df, 'concept_id')
ndf_concept_column = find_all_columns(ndf, 'concept_id')
# concept_id 컬럼이 존재하는지 여부 조사
concept_exist = len(df_concept_column) * len(ndf_concept_column) != 0
if concept_exist:
df_concept_column = df_concept_column[0]
ndf_concept_column = ndf_concept_column[0]
en_none_data = {}
df_en_column = find_all_columns(df, 'label_en')
ndf_en_column = find_all_columns(ndf, 'label_en')
# label_en 컬럼이 존재하는지 여부 조사
en_exist = len(df_en_column) * len(ndf_en_column) != 0
if en_exist:
df_en_column = df_en_column[0]
ndf_en_column = ndf_en_column[0]
for idx, value in enumerate(ndata):
if isinstance(value, str):
# 이전에 검색된 데이터가 문자인 경우 pass
pass
elif value is None:
# 이전에 검색된 데이터가 없는 경우 pass
pass
elif math.isnan(value):
# 이전에 검색된 데이터가 유효한 값이 아닌 경우 pass
pass
else:
# 올바른 값이 경우 검색 X
continue
# label 추출
label = df[df_label_column].iloc[idx]
label = re.sub(r'\s+', '', label)
label = extract_account_title(label)
label_set = set(ldf.iloc[idx])
label_set.add(label)
# (index, label_set) 리스트 생성
label_none_data.append((idx, label_set))
# concept_id가 존재하는 경우 concept_id도 추가로 검색
if concept_exist:
concept = df[df_concept_column].iloc[idx]
concept_none_data[concept] = idx
# label_en가 존재하는 경우 label_en도 추가로 검색
if en_exist:
en = df[df_en_column].iloc[idx]
en_none_data[en] = idx
# 기존 Dataframe index 중 사용된 결과 값 리스트
used = []
for idx in range(len(ndf)):
# 검색된 값
value_found = None
# 검색된 기존 Dataframe 의 index
index_found = None
# 검색할 label 명
label = extract_account_title(ndf[ndf_label_column].iloc[idx])
if concept_exist:
# 추가할 Dataframe 의 concept_id
concept = ndf[ndf_concept_column].iloc[idx]
index_found = concept_none_data.get(concept)
if index_found in used:
continue
elif index_found is not None:
value_found = ndf[column].iloc[idx]
if index_found is None:
if en_exist:
en = ndf[ndf_en_column].iloc[idx]
index_found = en_none_data.get(en)
if index_found in used:
continue
elif index_found is not None:
value_found = ndf[column].iloc[idx]
if index_found is None:
for index, label_set in label_none_data:
if index in used:
continue
if label in label_set:
value_found = ndf[column].iloc[idx]
index_found = index
break
if index_found is None:
pass
elif isinstance(index_found, int):
used.append(index_found)
ndata[index_found] = value_found
nlabels[index_found] = label
return ndata, nlabels
def compare_df_and_ndf_value(column: Tuple[Union[str, Tuple[str]]],
df: DataFrame, ndf: DataFrame, ldf: DataFrame,
ndata: List[Union[float, str, None]],
nlabels: List[str]) -> Tuple[List[Union[float, str]], List[str]]:
"""
중복된 데이터의 값을 비교하여 데이터 값을 추출하는 함수
Parameters
----------
column: tuple
추가할 column Name
df: dict of { str: DataFrame }
데이터를 추가할 DataFrame
ndf: dict of { str: DataFrame }
데이터를 검색할 DataFrame
ldf: dict of { str: DataFrame }
Label DataFrame
ndata: list of float
추가할 column의 데이터 리스트
nlabels: list of str
추가할 column의 label 리스트
Returns
-------
tuple of list
추가할 column의 데이터 리스트, 추가할 column의 label 리스트
"""
_, df_columns = split_columns_concept_data(df.columns)
_, ndf_columns = split_columns_concept_data(ndf.columns)
df_columns_set = convert_multiindex_to_set(df_columns)
ndf_columns_set = convert_multiindex_to_set(ndf_columns)
overlap = df_columns_set.intersection(ndf_columns_set)
nko_column = find_all_columns(ndf, r'label_ko')
for idx, value in enumerate(ndata):
if isinstance(value, str):
# 이전에 검색된 데이터가 문자인 경우 pass
pass
elif value is None:
# 이전에 검색된 데이터가 없는 경우 pass
pass
elif math.isnan(value):
# 이전에 검색된 데이터가 유효한 값이 아닌 경우 pass
pass
else:
# 올바른 값이 경우 검색 X
continue
nvalue = None
nlabel = ''
found = {}
found_sign = {}
max_found = 0
check_all_zeros = pd.to_numeric(df[list(overlap)].iloc[idx], errors='coerce').sum()
if check_all_zeros == 0.0:
continue # 모든 값이 0인 경우 pass
for col in overlap:
value = df[col].iloc[idx]
if isinstance(value, str):
pass
elif value is None:
pass
elif math.isnan(value):
pass
else:
sign = 1
# Ref와 일치하는 값을 가지는 row index 찾기
w = ndf[ndf[col] == value].dropna(axis=1, how='all').dropna(how='all')
# 만약 찾지 못하는 경우 Ref의 값의 음수와 동일한 값을 가지는 row index 찾기
if len(w) == 0:
sign = -1
w = ndf[ndf[col] == -value].dropna(axis=1, how='all').dropna(how='all')
if len(w) > 0:
for index in w.index.values:
found[index] = found.get(index, 0) + 1
max_found = max(max_found, found[index])
found_sign[index] = sign
for k, v in found.items():
if v >= max_found:
nvalue = found_sign[k] * ndf[column].iloc[k]
nlabel = ndf[nko_column].iloc[k].iloc[0]
nlabel = extract_account_title(nlabel)
if nvalue and math.isnan(nvalue):
nvalue = None
ndata[idx] = nvalue
nlabels[idx] = nlabel
return ndata, nlabels
additional_comparison_function = [compare_df_and_ndf_value]
def init_label(fs_df: Dict[str, DataFrame],
fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'),
label_df: Dict[str, DataFrame] = None):
""" 각각의 타입에 따라 추출된 Label들을 담고 있는 Dataframe 초기화
Parameters
----------
fs_df: dict of {str: DataFrame}
추출된 재무제표
fs_tp: tuple of str, optional
'bs' 재무상태표, 'is' 손익계산서, 'cis' 포괄손익계산서, 'cf' 현금흐름표
label_df: dict of {str: DataFrame}
초기화할 label_df
Returns
-------
dict of {str : DataFrame}
초기화된 label_df
"""
if label_df is None:
label_df = {tp: None for tp in fs_tp}
for tp in fs_df:
if tp in fs_tp:
# 추가될 재무제표의 DataFrame
df = fs_df[tp]
if df is None:
continue
# label_df가 없을시 초기화
if label_df.get(tp) is None:
concept_column = find_all_columns(df, r'concept_id')
ko_column = find_all_columns(df, r'label_ko')
# Label_ko 가 없을시 Table 오류 이므로 None 처리
if len(ko_column) == 0:
fs_df[tp] = None
continue
else:
ko_column = ko_column[0]
date_columns = find_all_columns(df, r'\d{8}')
label_columns = []
if len(concept_column) == 1:
label_columns.append(('default', 'concept_id',))
for column in date_columns:
label_columns.append(column)
nlabel_columns = pd.MultiIndex.from_tuples(label_columns)
label_df[tp] = pd.DataFrame(columns=nlabel_columns)
if len(concept_column) == 1:
label_df[tp][nlabel_columns[0]] = [extract_account_title(x) for x in list(df[concept_column[0]])]
for column in date_columns:
label_df[tp][column] = list(df[ko_column])
return label_df
def merge_fs(fs_df: Dict[str, DataFrame],
nfs_df: Dict[str, DataFrame],
label_df: Dict[str, DataFrame],
fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf')):
"""
재무제표 DataFrame과 Report의 데이터를 합쳐주는 Method
Parameters
----------
fs_df: dict of {str: DataFrame}
데이터를 추가할 DataFrame
nfs_df: dict of {str: DataFrame}
새로운 데이터를 검색할 DataFrame
label_df: dict of {str: DataFrame}
재무제표 검색결과시 추출된 값의 Label
fs_tp: tuple of str, optional
'bs' 재무상태표, 'is' 손익계산서, 'cis' 포괄손익계산서, 'cf' 현금흐름표
Returns
-------
tuple of dict of {str: DataFrame}
재무제표, 추출된 Label 리스트
"""
global additional_comparison_function
for tp in fs_df:
if tp in fs_tp:
# 추가될 재무제표의 DataFrame
df = fs_df[tp]
# 새로 추가할 재무제표
ndf = nfs_df[tp]
# 재무제표가 없을시 추가 검색 X
if df is None:
if ndf is None:
continue
else:
fs_df[tp] = ndf.copy(deep=True)
df = fs_df[tp]
# 검색된 재무제표가 없을시 추가 검색 X
if ndf is None:
continue
# label_df가 없을시 초기화
if label_df.get(tp) is None:
label_df = init_label(fs_df=fs_df, fs_tp=fs_tp, label_df=label_df)
_, df_columns = split_columns_concept_data(df.columns)
_, ndf_columns = split_columns_concept_data(ndf.columns)
df_columns = convert_multiindex_to_set(df_columns)
ndf_columns = convert_multiindex_to_set(ndf_columns)
overlap = df_columns.intersection(ndf_columns)
date_regex = re.compile(r'\d{8}')
diff = [x for x in (ndf_columns - overlap) if date_regex.search(x[0])]
diff.sort(key=lambda x: date_regex.findall(x[0])[0], reverse=True)
# Data가 동일할 경우 Continue
if len(diff) == 0:
continue
diff = pd.MultiIndex.from_tuples(diff)
for column in diff:
ndata = [None for _ in range(len(df))]
nlabels = ['' for _ in range(len(df))]
ndata, nlabels = compare_df_and_ndf_label_and_concept(column, df, ndf, label_df[tp], ndata, nlabels)
for compare_func in additional_comparison_function:
ndata, nlabels = compare_func(column, df, ndf, label_df[tp], ndata, nlabels)
label_df[tp][column] = nlabels
fs_df[tp][column] = ndata
return fs_df, label_df
def convert_multiindex_to_set(multiindex) :
if multiindex is None :
return set()
return set(multiindex.tolist())
def analyze_xbrl(report, fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'), separate: bool = False, lang: str = 'ko',
show_abstract: bool = False, show_class: bool = True, show_depth: int = 10,
show_concept: bool = True, separator: bool = True) -> Union[Dict[str, DataFrame], None]:
"""
Report의 xbrl 파일 분석을 통한 재무제표 추출
Parameters
----------
report: Report
Report
fs_tp: tuple of str, optional
'bs' 재무상태표, 'is' 손익계산서, 'cis' 포괄손익계산서, 'cf' 현금흐름표
separate: bool, optional
개별재무제표 여부
lang: str, optional
'ko' 한글, 'en' 영문
show_abstract: bool, optional
Abstract 행 표시 여부
show_class: bool, optional
class 표시 여부
show_depth: int, optional
표시할 class 깊이
show_concept: bool, optional
concept_id 표시여부
separator: bool, optional
1000단위 구분자 표시 여부
Returns
-------
dict of {str : DataFrame} or None
pandas DataFrame
"""
xbrl = report.xbrl
if xbrl is None:
return None
# 재무제표 추출을 위한 함수
def get_fs():
data = xbrl.get_financial_statement(separate=separate)
return data[0] if data else None
def get_is():
data = xbrl.get_income_statement(separate=separate)
if data:
data = data[0] if len(data) > 1 else None
return data
def get_ci():
data = xbrl.get_income_statement(separate=separate)
if data:
data = data[1] if len(data) > 1 else data[0]
return data
def get_cf():
data = xbrl.get_cash_flows(separate=separate)
return data[0] if data else None
func_fs = {
'bs': get_fs,
'is': get_is,
'cis': get_ci,
'cf': get_cf,
}
# DataFrame 옵션
option = {
'label': 'Separate' if separate else 'Consolidated',
'lang': lang,
'show_abstract': show_abstract,
'show_class': show_class,
'show_depth': show_depth,
'show_concept': show_concept,
'separator': separator,
'ignore_subclass': True,# 하위 class 무시
}
statements = OrderedDict()
for tp in fs_tp:
statements[tp] = func_fs[tp]()
if statements[tp]:
statements[tp] = statements[tp].to_DataFrame(**option)
return statements
def split_columns_concept_data(columns: pd.Index) -> Tuple[Optional[pd.Index], Optional[pd.Index]]:
regex = re.compile(r'\d{8}')
concept_columns = []
data_columns = []
for column in columns:
df_column_date = regex.findall(column[0])
if len(df_column_date) == 0:
concept_columns.append(column)
else:
data_columns.append(column)
if len(concept_columns) > 0:
concept_columns = pd.MultiIndex.from_tuples(concept_columns)
else:
concept_columns = None
if len(data_columns) > 0:
data_columns = pd.MultiIndex.from_tuples(data_columns)
else:
data_columns = None
return concept_columns, data_columns
def sorting_data_columns(columns: pd.Index) -> pd.Index:
def sorting(value):
if isinstance(value, str):
return value
else:
ret = [x for x in value]
return tuple(ret)
regex = re.compile(r'\d{8}')
data_columns = []
for column in columns:
df_column_date = regex.findall(column[0])
data_columns.append([column, df_column_date])
data_columns.sort(key=lambda x: sorting(x[1]), reverse=True)
data_columns = [x[0] for x in data_columns]
data_columns = pd.MultiIndex.from_tuples(data_columns)
return data_columns
def sorting_columns(statements: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
for tp in statements:
df = statements[tp]
if df is None:
continue
concept_columns, data_columns = split_columns_concept_data(df.columns)
if data_columns is not None:
data_columns = sorting_data_columns(data_columns)
if concept_columns is not None and data_columns is not None:
ncolumns = concept_columns.tolist() + data_columns.tolist()
ncolumns = pd.MultiIndex.from_tuples(ncolumns)
else:
ncolumns = df.columns
statements[tp] = statements[tp][ncolumns]
return statements
def select_cumulative(corp_code, statements: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
acc_mt = get_corp_info(corp_code)['acc_mt']
month = int(acc_mt) % 12 + 1
start_with = '{:02}01'.format(month)
regex_str = re.compile(r'\d{4}' + start_with)
for tp in statements:
if tp == 'bs':
continue
df = statements[tp]
if df is None:
continue
concept_columns, data_columns = split_columns_concept_data(df.columns)
if concept_columns is not None and data_columns is not None:
data_columns = [x for x in data_columns if regex_str.search(x[0])]
ncolumns = concept_columns.tolist() + data_columns
ncolumns = pd.MultiIndex.from_tuples(ncolumns)
else:
ncolumns = df.columns
statements[tp] = statements[tp][ncolumns]
return statements
def drop_empty_columns(df: Dict[str, DataFrame], label_df: bool = False) -> Dict[str, DataFrame]:
for tp in df:
df_tp = df[tp]
if df_tp is None:
continue
if label_df:
none_columns = df_tp[df_tp != u''].isnull().all()
else:
none_columns = df_tp.isnull().all()
columns = []
for key, value in none_columns.items():
if value is not True:
columns.append(key)
# convert list to numpy array
columns = np.array(columns, dtype=object)
df[tp] = df_tp[columns]
return df
def remove_almost_empty_columns(df: Dict[str, DataFrame], min_data_number: int = 1) -> Dict[str, DataFrame]:
"""
Remove almost empty columns from dataframe in dictionary.
Args:
df : Dict[str, DataFrame]
The original dictionary containing dataframes.
min_data_number : int
The minimum number of data in a column to not be considered almost empty.
Returns:
df : Dict[str, DataFrame]
The modified dictionary with almost empty columns removed.
"""
regex = re.compile(r'\d{8}')
for tp, df_tp in df.items():
if df_tp is not None:
new_columns = [col for col in df_tp.columns if (regex.search(col[0]) is None) or (df_tp[col].count() > min_data_number)]
df[tp] = df_tp[new_columns]
return df
def analyze_report(report: Report,
fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'),
separate: bool = False,
lang: str = 'ko',
separator: bool = True,
dataset: str = 'xbrl') -> Union[Dict[str, Optional[DataFrame]], None]:
# 2012년 이후 데이터만 XBRL 데이터 추출
year = int(report.rcept_dt[:4])
if year > 2011 and dataset == 'xbrl':
xbrl = report.xbrl
else:
xbrl = None
# XBRL File check
if xbrl is not None:
if separate is False and not xbrl.exist_consolidated():
raise NotFoundConsolidated('Could not find consolidated financial statements')
fs_df = analyze_xbrl(report, fs_tp=fs_tp, separate=separate, lang=lang,
show_abstract=False, show_class=True, show_depth=10,
show_concept=True, separator=separator)
else:
fs_df = analyze_html(report, fs_tp=fs_tp, separate=separate, lang=lang)
# If consolidated financial statements are not found, raise an NotFoundConsolidated exception
if fs_df is None and separate is False:
raise NotFoundConsolidated('Could not find consolidated financial statements')
fs_df = remove_almost_empty_columns(fs_df, min_data_number=1)
return fs_df
def search_annual_report(corp_code: str,
bgn_de: str,
end_de: str = None,
separate: bool = False,
last_reprt_at: str = 'Y'):
reports = []
try:
# 사업보고서 검색(최종보고서)
reports = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
pblntf_detail_ty='A001', page_count=100, last_reprt_at=last_reprt_at)
except NoDataReceived:
# 감사보고서 검색
if separate:
pblntf_detail_ty = 'F001'
else:
pblntf_detail_ty = 'F002'
reports = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
pblntf_detail_ty=pblntf_detail_ty, page_count=100, last_reprt_at=last_reprt_at)
finally:
if len(reports) == 0:
raise RuntimeError('Could not find an annual report')
return reports
# 인카금융서비스에서 반기보고서가 없는 경우 발생하는 오류 처리
def search_other_report(corp_code, bgn_de, end_de, pblntf_detail_ty):
try:
reports = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
pblntf_detail_ty=pblntf_detail_ty, page_count=100, last_reprt_at='Y')
except NoDataReceived:
resp = {
'page_no': 1,
'page_count': 100,
'total_count': 0,
'total_page': 0,
'list': [],
}
reports = SearchResults(resp)
finally:
return reports