Source code for dart_fss.xbrl.table

# -*- coding: utf-8 -*-
import re

import numpy as np
import pandas as pd
from pandas import DataFrame

from dateutil.relativedelta import relativedelta

from arelle import XbrlConst

from dart_fss.utils import str_to_regex
from dart_fss.xbrl.helper import (cls_label_check, get_label_list,
                                  cls_merge_type, cls_datetime_check,
                                  get_max_depth, get_value_from_dataset,
                                  generate_df_columns, generate_df_rows,
                                  flatten, get_title, prefered_sign)


[docs] class Table(object): """ XBRL Table XBRL 파일에서 추출된 데이터를 기반으로 재무제표에 관한 정보를 담고 있는 클래스 Attributes ---------- parent: str 로드한 파일 이름 code: str 테이블 코드 definition: str 테이블 정의 uri: str 테이블 uri """ def __init__(self, parent, xbrl, code, definition, uri): self.parent = parent self.code = code self.definition = definition self.uri = uri self._xbrl = xbrl self._facts = None self._dataset = None self._cls = None self._labels = None self._calculations = None @property def facts(self): """list of modelFact: """ if self._facts is None: arcrole = XbrlConst.parentChild relation = self._xbrl.relationshipSet(arcrole, self.uri) facts = [] for fact in self._xbrl.facts: if relation.fromModelObject(fact.concept) \ or relation.toModelObject(fact.concept): facts.append(fact) self._facts = facts return self._facts @property def dataset(self): """dict of modelFact: """ if self._dataset is None: dataset = dict() for fact in self.facts: object_id = fact.context.objectId() if dataset.get(object_id) is None: dataset[object_id] = [] dataset[object_id].append(fact) self._dataset = dataset return self._dataset @property def cls(self): """classification 반환""" if self._cls is None: self._get_cls() return self._cls
[docs] def cls_filter(self, start_dt=None, end_dt=None, label=None): """ classification 필터링 함수 Parameters ---------- start_dt: str 검색 시작 일자 end_dt: str 검색 종료 일자 label: str 포함할 label 명 Returns ------- list of cls 필터된 classification """ return [item for item in self.cls if cls_datetime_check(item, start_dt, end_dt) and cls_label_check(item, label)]
def _get_cls(self): """ classification 정보 추출 함수""" contexts = set() for data in self.facts: context = data.context contexts.add(context) cls = list() for context in contexts: object_id = context.objectId() # data가 없을때 무시 if len(self.dataset[object_id]) < 1: continue instant_datetime = None start_datetime = None end_datetime = None if context.isInstantPeriod is True: instant_datetime = context.instantDatetime - relativedelta(days=1) else: start_datetime = context.startDatetime end_datetime = context.endDatetime - relativedelta(days=1) label = dict() dims = context.qnameDims if len(dims) > 0: for dimQname in sorted(dims.keys(), key=lambda d: str(d), reverse=True): dim_value = dims[dimQname] ko = dim_value.member.label(lang='ko') ko = re.sub(r'\[.*?\]', '', ko) en = dim_value.member.label(lang='en') en = re.sub(r'\[.*?\]', '', en) label[dimQname] = { 'ko': ko, 'en': en } _cls = { 'cls_id': object_id, 'instant_datetime': instant_datetime, 'start_datetime': start_datetime, 'end_datetime': end_datetime, 'label': label } cls.append(_cls) cls.sort(key=lambda x: x.get('instant_datetime') or x.get('start_datetime'), reverse=True) self._cls = cls return self._cls @property def calculations(self): """계산식 반환""" if self._calculations is None: arcrole = XbrlConst.summationItem relationship_set = self._xbrl.relationshipSet(arcrole, self.uri) self._calculations = {} for rel in relationship_set.modelRelationships: key = str(rel.toModelObject.qname).replace(':', '_') self._calculations[key] = rel.weight return self._calculations @property def labels(self): """labels 반환""" if self._labels is None: self._labels = [] arcrole = XbrlConst.parentChild relationship_set = self._xbrl.relationshipSet(arcrole, self.uri) for idx, root_concept in enumerate(relationship_set.rootConcepts): labels = get_label_list(relationship_set, root_concept, relationship_set.modelRelationships[idx]) self._labels.append(labels) return self._labels
[docs] def to_DataFrame(self, cls=None, lang='ko', start_dt=None, end_dt=None, label=None, show_abstract=False, show_class=True, show_depth=10, show_concept=True, separator=True, ignore_subclass=True): """ Pandas DataFrame으로 변환 하는 함수 Parameters ---------- cls: dict, optional classification lang: str, optional 'ko' 한글 or 'en' 영문 start_dt: str, optional 검색 시작 일자 end_dt: str, optional 검색 종료 일자 label: str, optional Column label에 포함될 단어 show_abstract: bool, optional abstract 표시 여부 show_class: bool, optional class 표시 여부 show_depth: int, optional class 표시 깊이 show_concept: bool, optional concept_id 표시 여부 separator: bool, optional 숫자 첫단위 표시 여부 ignore_subclass: bool, optional 대분류인 연결재무제표 및 별도재무제표를 제외한 나머지 column의 표시 여부 (('연결재무제표', '자본금') / ('연결재무제표', '주식발행초과금') 등) Returns ------- DataFrame 재무제표 DataFrame """ if cls is None: cls = self.cls_filter(start_dt, end_dt, label) cls = cls_merge_type(cls) depth = -1 for label in self.labels: depth = max(depth, get_max_depth(label, show_abstract=show_abstract)) depth = depth if depth < show_depth else show_depth table = self.parent.get_table_by_code('d999004') unit = get_value_from_dataset(table.cls, table.dataset, 'dart-gcd_EntityReportingCurrencyISOCode', ignore_case=True) definition = self.definition + ' (Unit: {})'.format(unit[0]) columns = generate_df_columns(definition, cls, depth, lang, show_concept=show_concept, show_class=show_class) if separator: pd.options.display.float_format = '{:,}'.format else: pd.options.display.float_format = '{:}'.format df = pd.DataFrame(columns=columns) rows = [] for label in self.labels: r = generate_df_rows(label, cls, self.dataset, depth, lang=lang, show_abstract=show_abstract, show_concept=show_concept, show_class=show_class) rows.append(r) rows = flatten(rows) data = flatten(rows) for idx, r in enumerate(data): df.loc[idx] = r regex_pass = str_to_regex('concept_id OR label_ko OR label_en OR class') df_count = df.count() drop_columns = [] for key, count in df_count.items(): if regex_pass.search(' '.join(key[1])): pass elif count < 1: drop_columns.append(key) df = df.drop(drop_columns, axis=1) if ignore_subclass: columns = np.array([x for x in df.columns if not isinstance(x[1], tuple) or len(x[1]) == 1], dtype=object) return df[columns] return df
[docs] def get_value_by_concept_id(self, concept_id, start_dt=None, end_dt=None, label=None, lang='en'): """ concept_id을 이용하여 값을 찾아 주는 함수 Parameters ---------- concept_id: str 재무제표 계정의 concept_id start_dt: str 검색 시작 일자 end_dt: str 검색 종료 일자 label: str 검색 포함 label lang: str 'ko' 한글 / 'en' 영문 Returns ------- dict of (str or float) { column 이름 : 값 } """ cls = self.cls_filter(start_dt, end_dt, label) def search_concept_id(labels, concept): for l in labels: if l['concept_id'] == concept: return True, l['preferred'] elif l['children']: result = search_concept_id(l['children'], concept) if result[0]: return result return False, None sign = 1.0 retcode, preferred = search_concept_id(self.labels, concept_id) if retcode: sign = prefered_sign(preferred) data = get_value_from_dataset(classification=cls, dataset=self.dataset, concept_id=concept_id, sign=sign) results = dict() for c, d in zip(cls, data): title = get_title(c, lang=lang) results[title] = d return results
def __repr__(self): info = { 'code': self.code, 'definition': self.definition } return str(info)