diff --git a/src/acquire_fund_quarter.py b/src/acquire_fund_quarter.py index 8f51d03..d2bf2ba 100644 --- a/src/acquire_fund_quarter.py +++ b/src/acquire_fund_quarter.py @@ -20,6 +20,7 @@ from utils.index import bootstrap_thread from sql_model.fund_query import FundQuery from sql_model.fund_insert import FundInsert +from models.manager import Manager, ManagerAssoc # 利用api获取同类基金的资产 @@ -56,11 +57,13 @@ def crawlData(start, end): page_start, page_limit) for record in results: sleep(1) + # 0P000179WG + # 001811 中欧明睿新常态混合A each_fund = FundSpider( record[0], record[1], record[2], chrome_driver) - is_normal = each_fund.go_fund_url() + is_error_page = each_fund.go_fund_url() # 是否能正常跳转到基金详情页,没有的话,写入csv,退出当前循环 - if is_normal == False: + if is_error_page == True: # error_funds.append(each_fund.fund_code) fund_infos = [each_fund.fund_code, each_fund.morning_star_code, each_fund.fund_name, record[3], page_start, '页面跳转有问题'] @@ -99,22 +102,33 @@ def crawlData(start, end): # 开始存入数据 fund_insert = FundInsert() # 基金经理 - if each_fund.manager.get('id'): - manager_dict = { - 'id': snow_flake_id, - 'manager_id': each_fund.manager.get('id'), - 'name': each_fund.manager.get('name'), - 'brife': each_fund.manager.get('brife') + first_manager_id = None + first_manager_start_date = None + for manager_item in each_fund.manager_list: + manager = Manager(**manager_item) + manager.upsert() + if first_manager_id == None: + first_manager_id = manager_item['manager_id'] + if first_manager_start_date == None: + first_manager_start_date = manager_item['manager_start_date'] + + manager_assoc_data = { + 'quarter_index': quarter_index, + 'manager_start_date': manager_item['manager_start_date'], + 'manager_id': manager_item['manager_id'], + 'fund_code': each_fund.fund_code } - fund_insert.insert_fund_manger_info(manager_dict) + manager_assoc = ManagerAssoc(**manager_assoc_data) + manager_assoc.upsert() + # fund_insert.insert_fund_manger_info(manager_dict) quarterly_dict = { 'id': snow_flake_id, 'quarter_index': each_fund.quarter_index, 'fund_code': each_fund.fund_code, 'investname_style': each_fund.investname_style, 'total_asset': each_fund.total_asset, - 'manager_id': each_fund.manager.get('id'), - 'manager_start_date': each_fund.manager.get('start_date'), + 'manager_id': first_manager_id, # 暂时存第一个基金经理信息 + 'manager_start_date': first_manager_start_date, # 暂时存第一个基金经理信息 'three_month_retracement': each_fund.three_month_retracement, 'june_month_retracement': each_fund.june_month_retracement, 'risk_statistics_alpha': each_fund.risk_statistics.get('alpha'), @@ -183,8 +197,8 @@ def crawlData(start, end): print(current_thread().getName(), 'page_start', page_start) sleep(3) chrome_driver.close() - - bootstrap_thread(crawlData, record_total, 8) + thread_count = 1 + bootstrap_thread(crawlData, record_total, thread_count) exit() if __name__ == '__main__': diff --git a/src/crud/insert.py b/src/crud/insert.py new file mode 100644 index 0000000..2e14f1c --- /dev/null +++ b/src/crud/insert.py @@ -0,0 +1,71 @@ +''' +Desc: +File: /insert.py +Project: crud +File Created: Sunday, 28th August 2022 11:21:42 am +Author: luxuemin2108@gmail.com +----- +Copyright (c) 2022 Camel Lu +''' + +import sys +sys.path.append('./src') + +from sqlalchemy.orm import Session +from models.manager import Manager, ManagerAssoc +from models.quarter import Quarter +from models.var import prefix, ORM_Base, engine + +session = Session(engine) + +def add_quarter(year): + boundary_date_list = ['03-31', '06-30', '09-30', '12-31'] + date_list_len = len(boundary_date_list) + quarter_list = [] + for i in range(date_list_len): + cur_year = year if i != 0 else str(int(year) - 1) + start_index = i - 1 if i - 1 >= 0 else date_list_len - i - 1 + start_date = boundary_date_list[start_index] + end_date = boundary_date_list[i] + quarter_data = { + 'quarter_index': year + "-Q" + str(i + 1), + 'start_time': cur_year + '-' + start_date, + 'end_time': year + '-' + end_date + } + quarter = Quarter(**quarter_data) + # quarter.save() + # session.add(quarter) + quarter_list.append(quarter) + session.add_all(quarter_list) + session.commit() + + + +if __name__ == '__main__': + year = '2023' + # add_quarter(year) + manager_data = { + # 'id': 12, + 'manager_start_date': '2022-09-03', + 'name': 'sdfsd23fs', + 'brife': 'sdfsdfs', + 'manager_id': '22323343' + } + manager = Manager(**manager_data) + # manager2 = Manager(**manager_data2) + manager_assoc_data = { + # 'id': 12, + 'quarter_index': '2022-Q3', + 'manager_start_date': '2022-09-03', + 'manager_id': manager_data['manager_id'], + 'fund_code': "000001" + } + # manager.save() + manager.upsert(ingore_keys = ['manager_id']) + # manager_assoc = ManagerAssoc(**manager_assoc_data) + # print("manager_assoc", manager_assoc) + # manager_assoc.upsert(ingore_keys = ['manager_id']) + # manager_assoc.upsert() + # manager.upsert(ingore_keys = ['manager_id']) + + diff --git a/src/fund_info/crawler.py b/src/fund_info/crawler.py index be61f39..e83d773 100644 --- a/src/fund_info/crawler.py +++ b/src/fund_info/crawler.py @@ -36,7 +36,8 @@ def __init__(self, code, namecode, name, chrome_driver): # 季度变动信息 self.total_asset = None # 总资产 self.investname_style = None # 投资风格 - self.manager = dict() # 基金经理,name,id,管理时间 + self.manager_list = [] # 基金经理列表 + # self.manager = dict() # 基金经理,name,id,管理时间 self.three_month_retracement = 0.0 # 最差六个月回报 self.june_month_retracement = 0.0 # 最差六个月回报 self.bond_position = dict( @@ -60,11 +61,12 @@ def go_fund_url(self, cookie_str=None): sleep(6) # 判断是否页面出错,重定向,如果是的话跳过 if self._chrome_driver.current_url == 'https://www.morningstar.cn/errors/defaulterror.html': - return False + return True while self._chrome_driver.page_source == None: self._chrome_driver.refresh() print('wait:fund_code', self.fund_code) sleep(9) + return False # self._chrome_driver.execute_script('location.reload()') def get_element_text_by_class_name(self, class_name, parent_id): @@ -134,27 +136,35 @@ def get_fund_base_info(self): # 获取基金经理信息(多位在任基金经理,只需第一位) def get_fund_manager_info(self): - try: - # 基金经理 - manager_ele = self._chrome_driver.find_element_by_id( - 'qt_manager').find_element_by_xpath("ul") - manager_name = manager_ele.find_element_by_xpath( - "li[@class='col1']/a").text - manager_id = re.findall( - r'(?<=managerid=)(\w+)$', manager_ele.find_element_by_xpath("li[@class='col1']/a").get_attribute('href')).pop(0) - manager_start_date = manager_ele.find_element_by_xpath( - "li[@class='col1']/i").text[0:10] - manager_brife = manager_ele.find_element_by_xpath( - "li[@class='col2']").text - self.manager['id'] = manager_id - self.manager['name'] = manager_name - self.manager['start_date'] = manager_start_date - self.manager['brife'] = manager_brife - except NoSuchElementException: - self._is_trigger_catch = True - print('error_fund_info:', self.fund_code, - '-', self.morning_star_code, 'get_fund_manager_info') - file_name = './abnormal/manager-' + self.fund_code + "-no_such_element.png" + manager_ele_list = self._chrome_driver.find_element_by_id( + 'qt_manager').find_elements_by_xpath("ul") + for manager_ele in manager_ele_list: + try: + # 基金经理 + manager_name = manager_ele.find_element_by_xpath( + "li[@class='col1']/a").text + # 仅仅记录目前在职的 + if '[离任]' in manager_name: + continue + manager = dict() + manager['name'] = manager_name + manager_id = re.findall( + r'(?<=managerid=)(\w+)$', manager_ele.find_element_by_xpath("li[@class='col1']/a").get_attribute('href')).pop(0) + + if not manager_id: + continue + manager['manager_id'] = manager_id + manager['manager_start_date'] = manager_ele.find_element_by_xpath( + "li[@class='col1']/i").text[0:10] + + manager['brife'] = manager_ele.find_element_by_xpath( + "li[@class='col2']").text + + self.manager_list.append(manager) + + except NoSuchElementException: + self._is_trigger_catch = True + print('manager_ele', manager_ele) # self._chrome_driver.save_screenshot(file_name) # driver.get_screenshot_as_file(file_name) # raise # 抛出异常,注释后则不抛出异常 diff --git a/src/models/manager.py b/src/models/manager.py index d696cf4..4a03030 100644 --- a/src/models/manager.py +++ b/src/models/manager.py @@ -9,41 +9,68 @@ ''' import sys sys.path.append('./src') - -from sqlalchemy.orm import Session, registry, relationship, aliased -from sqlalchemy import DATE, MetaData, Table, Column, Integer, BigInteger, String, ForeignKey, select +from sqlalchemy.orm import registry, relationship +from sqlalchemy import UniqueConstraint, Table, Column, Integer, BigInteger, String, ForeignKey, text, DateTime, Date, func from db.engine import get_engine -from models.var import prefix, ORM_Base, engine +from models.var import prefix, ORM_Base, engine, Model +from lib.mysnowflake import IdWorker + manager_table_name = prefix + 'manager' manager_table = Table(manager_table_name, ORM_Base.metadata, autoload=True, autoload_with=engine) -class Manager(ORM_Base): +idWorker = IdWorker() +class Manager(ORM_Base, Model): __table__ = manager_table # managerAssoc = relationship("ManagerAssoc", back_populates="manager") + + def __init__(self, **kwargs): + self.id = idWorker.get_id() + column_keys = self.__table__.columns.keys() + udpate_data = dict() + for key in kwargs.keys(): + if key not in column_keys: + continue + else: + udpate_data[key] = kwargs[key] + ORM_Base.__init__(self, **udpate_data) + Model.__init__(self, **kwargs, id = self.id) + def __repr__(self): return f"Manager(id={self.id!r}, name={self.name!r}, manager_id={self.manager_id!r})" -class ManagerAssoc(ORM_Base): +class ManagerAssoc(ORM_Base, Model): __tablename__ = prefix + 'manager_assoc' - manager_key = manager_table_name + '.manager_id' id = Column(BigInteger, primary_key=True) quarter_index = Column(String(12)) - manager_id = Column(String(32), ForeignKey(manager_key)) + manager_key = manager_table_name + '.manager_id' + manager_id = Column(String(32), ForeignKey(manager_key), nullable=False) fund_code_key = prefix + 'base' + '.fund_code' - fund_code= Column(String(10), ForeignKey(fund_code_key)) - # manager = relationship('Manager', backref='manager_assoc') + fund_code= Column(String(10), ForeignKey(fund_code_key), nullable=False) + manager_start_date = Column(Date(), nullable=False) + update_time = Column(DateTime, server_default=text('CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'), onupdate=func.now()) + create_time = Column(DateTime, server_default=text('CURRENT_TIMESTAMP'), comment='创建时间') + + UniqueConstraint(quarter_index, manager_id, fund_code, name='uix_1') + + manager = relationship('Manager', backref='manager_assoc') # fund_base = relationship("Fund", backref="manager_assoc") + def __init__(self, **kwargs): + self.id = idWorker.get_id() + ORM_Base.__init__(self, **kwargs) + Model.__init__(self, **kwargs, id = self.id) def __repr__(self): - return f"ManagerAssoc(id={self.id!r}, name={self.manager_id!r}, fullname={self.fund_code_id!r})" + return f"ManagerAssoc(id={self.id!r}, name={self.manager_id!r}, fullname={self.fund_code!r})" def create(): ORM_Base.metadata.create_all(engine) # mapper_registry.metadata.create_all(engine) # ManagerAssoc.__table__.drop(engine) +def drop(): + ManagerAssoc.__table__.drop(engine) if __name__ == '__main__': create() - # demo() + # drop() diff --git a/src/models/quarter.py b/src/models/quarter.py index 4a5a98a..09b7661 100644 --- a/src/models/quarter.py +++ b/src/models/quarter.py @@ -10,19 +10,19 @@ import sys sys.path.append('./src') from datetime import datetime -from sqlalchemy import UniqueConstraint, CheckConstraint, MetaData, Table, Column, text, Integer, String, Date, DateTime, Enum, func -from sqlalchemy.orm import validates -from models.var import ORM_Base, engine +from sqlalchemy import UniqueConstraint, Column, text, Integer, String, Date, DateTime, Enum, func +from sqlalchemy.orm import validates, Session +from models.var import ORM_Base, engine, Model -class Quarter(ORM_Base): + +class Quarter(ORM_Base, Model): __tablename__ = 'quarter' id = Column(Integer, primary_key=True) quarter_index = Column(String(12), nullable=False, unique=True) start_time = Column(Date(), nullable=False, unique=True) end_time = Column(Date(), nullable=False, unique=True) - updated_at = Column(DateTime, server_default=text('CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'), onupdate=func.now()) - created_at = Column(DateTime, server_default=text('CURRENT_TIMESTAMP'), comment='创建时间') - # created_at = Column(DateTime, server_default=func.now(), comment='创建时间') + update_time = Column(DateTime, server_default=text('CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'), onupdate=func.now()) + create_time = Column(DateTime, server_default=text('CURRENT_TIMESTAMP'), comment='创建时间') UniqueConstraint(quarter_index, name='uix_1') def __init__(self, *args, **kwargs) -> None: @@ -30,7 +30,6 @@ def __init__(self, *args, **kwargs) -> None: @validates('end_time') def validate_start_time(self, key, end_time): - # end_time_stamp = time.mktime(end_time) end_time_stamp = datetime.strptime (end_time, '%Y-%m-%d') start_time_stamp = datetime.strptime (self.start_time, '%Y-%m-%d') if end_time_stamp > start_time_stamp: diff --git a/src/models/var.py b/src/models/var.py index 6b76b51..b247208 100644 --- a/src/models/var.py +++ b/src/models/var.py @@ -7,10 +7,73 @@ ----- Copyright (c) 2022 Camel Lu ''' +import sys +sys.path.append('./src') +from sqlalchemy import Column, text, DateTime, func +from sqlalchemy.dialects.mysql import insert from db.engine import get_engine, get_orm_base +from sqlalchemy.orm import Session + ORM_Base = get_orm_base() prefix = 'fund_morning_' engine = get_engine(echo=True) + +# class ORM_Base(Base): +# def __init__(self, **kwargs) -> None: +# column_keys = self.__table__.columns.keys() +# udpate_data = dict() +# for key in kwargs.keys(): +# if key not in column_keys: +# continue +# else: +# udpate_data[key] = kwargs[key] + # Base.__init__(self, **udpate_data) + + +class Model(): + __input_data__ = dict() + + def __init__(self, **kwargs) -> None: + self.__input_data__ = kwargs + self.session = Session(engine) + + def save(self): + self.session.add(self) + self.session.commit() + + def upsert(self, *, ingore_keys = []): + column_keys = self.__table__.columns.keys() + + udpate_data = dict() + for key in self.__input_data__.keys(): + if key not in column_keys: + continue + else: + udpate_data[key] = self.__input_data__[key] + + insert_stmt = insert(self.__table__).values(**udpate_data) + + all_ignore_keys = ['id'] + if isinstance(ingore_keys, list): + all_ignore_keys =[*all_ignore_keys, *ingore_keys] + else: + all_ignore_keys.append(ingore_keys) + + udpate_columns = dict() + for key in self.__input_data__.keys(): + if key not in column_keys or key in all_ignore_keys: + continue + else: + udpate_columns[key] = insert_stmt.inserted[key] + + on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( + **udpate_columns + ) + # self.session.add(self) + self.session.execute(on_duplicate_key_stmt) + self.session.commit() + + diff --git a/src/utils/login.py b/src/utils/login.py index be13544..a0d9a9e 100644 --- a/src/utils/login.py +++ b/src/utils/login.py @@ -67,7 +67,8 @@ def mock_login_site(chrome_driver, site_url, redirect_url=None): env_password = os.getenv('morning_star_password') username.send_keys(env_username) password.send_keys(env_password) - + submit = chrome_driver.find_element_by_id('loginGo') + submit.click() # check_code = chrome_driver.find_element_by_id('txtCheckCode') # count = 1 # flag = True