-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.py
133 lines (116 loc) · 3.94 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from os import environ
from time import sleep
from typing import Union
import json
import memento_settings as MS
from elasticsearch import helpers
from elasticsearch import Elasticsearch
import requests
import pandas as pd
pd.options.mode.chained_assignment = None # default='warn'
ES = Elasticsearch(**MS.SERVER_ES_INFO)
def make_clear(results: list,
key_lambda = lambda x: x['_id'],
value_lambda = lambda x: x,
filter_lambda = lambda x: x) -> dict:
def clear(result) -> dict:
result['_source']['_id'] = result['_id']
return result['_source']
iterable = filter(filter_lambda, map(clear, results))
return {key_lambda(source): value_lambda(source) for source in iterable}
def get_scroll(query={}, doc_type='', index='memento'):
array = []
def _get_scroll(scroll) -> Union[int, list]:
doc = scroll['hits']['hits']
array.extend(doc)
return doc and True or False
scroll = ES.search(index=index, doc_type=doc_type, body=query, scroll='1m', size=1000)
scroll_id = scroll['_scroll_id']
while _get_scroll(scroll):
scroll = ES.scroll(scroll_id=scroll_id, scroll='1m')
return make_clear(array)
def get_entities() -> dict:
return get_scroll({}, 'namugrim')
def get_exist(idx: str, doc_type: str, index='memento'):
return ES.search(
index=index,
doc_type=doc_type,
body = {
'query': {
'match': {
'_id': idx
}
}
}
)['hits']['total']
def get_item(idx: str, doc_type: str, index='memento'):
result = ES.search(
index=index,
doc_type=doc_type,
body = {
'query': {
'match': {
'_id': idx
}
}
}
)['hits']
return result['hits'][0]['_source'] if result['total'] else None
def get_navernews(keyword, date_start, date_end):
news = get_scroll({
'query': {
'bool': {
'must': [{
'range': {
'published_time': {
"gte" : date_start,
"lte" : date_end,
"format": "yyyy.MM.dd",
}
}
}, {
'term': {
'entities': keyword,
}
}]
}
}
}, doc_type='News_Naver')
def frame_filter(item):
del item['MODULE']
del item['_id']
return item
return pd.DataFrame(list(map(frame_filter, news.values())))
def update_item(update, idx, doc_type: str, index: str = 'memento'):
result = ES.update(index=index,
doc_type=doc_type,
id=idx,
body=update)
return result['_id']
def put_bulk(actions: list):
while True:
try:
helpers.bulk(ES, actions)
break
except:
print ('Connection Error wait for 2s')
sleep(2)
continue
def put_item(item: dict, doc_type: str, idx: str = '', index='memento'):
while True:
try:
result = ES.index(
index=index,
doc_type=doc_type,
body=item,
id=idx
)
break
except:
print ('Connection Error wait for 2s')
sleep(2)
continue
return result['_id']
# Default Connection Function
def put_cluster(cluster, keyword, date_start, date_end):
put_item(cluster, doc_type='cluster')