Skip to content

Commit c952156

Browse files
The Postgres DB was altered to include a project table to control the jobs and be used by the aggregator
1 parent 7c31d7d commit c952156

File tree

12 files changed

+55
-32
lines changed

12 files changed

+55
-32
lines changed

.idea/workspace.xml

Lines changed: 13 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

API_REST/DAO/connection.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,24 @@ def __init__(self):
1515
self._conn = psycopg2.connect(user=self._user, password=self._password, host=self._host, port=self._port,
1616
database=self._db)
1717

18-
def insert_jobs(self, type, status, file):
18+
def insert_jobs(self, type, status, file, file_name):
1919

2020
cursor = self._conn.cursor()
21+
query = "INSERT INTO project (video_lesson) VALUES(%s) RETURNING id"
22+
print(file_name, flush=True)
23+
cursor.execute(query, (file_name,))
24+
self._conn.commit()
25+
project_id = cursor.fetchone()[0]
26+
2127
query = "INSERT INTO jobs (type, status, file_id, project_id) VALUES(%s, %s, %s, %s) RETURNING oid"
22-
cursor.execute(query, (type, status, file, 1))
28+
cursor.execute(query, (type, status, file, project_id))
2329

2430
self._conn.commit()
2531

2632
oid = cursor.fetchone()[0]
2733
self._conn.close()
2834

29-
return oid
35+
return oid, project_id
3036

3137
def get_file(self, oid):
3238
cursor = self._conn.cursor()

API_REST/app.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def asr():
5151
return redirect(request.url)
5252

5353

54-
@app.route('/extract_audio', methods=['POST'])
54+
@app.route('/segmentation', methods=['POST'])
5555
def extract_audio():
5656

5757
if request.method == 'POST':
@@ -66,8 +66,8 @@ def extract_audio():
6666
channel.queue_declare(queue='audio_extractor', durable=True)
6767
db_conn = Connection()
6868
file_oid = db_conn.insert_doc_mongo(file.read())
69-
db_conn.insert_jobs(type='audio_extractor', status='new', file=file_oid)
70-
message = {'type': 'audio_extractor', 'status': 'new', 'oid': file_oid}
69+
oid, project_id = db_conn.insert_jobs(type='audio_extractor', status='new', file=file_oid, file_name=file.filename)
70+
message = {'type': 'audio_extractor', 'status': 'new', 'oid': file_oid, 'project_id': project_id}
7171
channel.basic_publish(exchange='', routing_key='audio_extractor', body=json.dumps(message))
7272
connection.close()
7373
return 'Done'

audio_extractor/DAO/connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ def __init__(self):
1515
self._conn = psycopg2.connect(user=self._user, password=self._password, host=self._host, port=self._port,
1616
database=self._db)
1717

18-
def insert_jobs(self, type, status, file):
18+
def insert_jobs(self, type, status, file, project_id):
1919

2020
cursor = self._conn.cursor()
21+
2122
query = "INSERT INTO jobs (type, status, file_id, project_id) VALUES(%s, %s, %s, %s) RETURNING oid"
22-
cursor.execute(query, (type, status, file, 1))
23+
cursor.execute(query, (type, status, file, project_id))
2324

2425
self._conn.commit()
2526

audio_extractor/worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ def callback(ch, method, properties, body):
1919
try:
2020
print(" [x] Received %r" % body, flush=True)
2121
oid = json.loads(body)['oid']
22+
project_id = json.loads(body)['project_id']
2223
print(str(oid) + '!!!???', flush=True)
24+
print(str(project_id) + '!!!???', flush=True)
25+
2326
conn = Connection()
2427
file = conn.get_doc_mongo(file_oid=oid)
2528

@@ -30,8 +33,8 @@ def callback(ch, method, properties, body):
3033
try:
3134
file_oid = conn.insert_doc_mongo(data)
3235

33-
conn.insert_jobs('audio_extractor', 'done', file_oid)
34-
message = {'type': 'vad', 'status': 'new', 'oid': file_oid}
36+
conn.insert_jobs('audio_extractor', 'done', file_oid, project_id)
37+
message = {'type': 'vad', 'status': 'new', 'oid': file_oid, 'project_id': project_id}
3538
connection = pika.BlockingConnection(pika.ConnectionParameters(host=os.environ['QUEUE_SERVER']))
3639
channel = connection.channel()
3740

docker-compose.yml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ services:
99
- '25672:25672'
1010
- '15672:15672'
1111
volumes:
12-
- 'rabbitmq_data:/bitnami'
12+
- ~/rabbitmq_data:/bitnami
1313
postgres-compose:
1414
image: postgres
1515
environment:
@@ -167,6 +167,4 @@ services:
167167
- "9999:80"
168168

169169

170-
volumes:
171-
rabbitmq_data:
172-
driver: local
170+

worker_asr/DAO/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ def __init__(self):
1616
self._conn = psycopg2.connect(user=self._user, password=self._password, host=self._host, port=self._port,
1717
database=self._db)
1818

19-
def insert_jobs(self, type, status, file):
19+
def insert_jobs(self, type, status, file, project_id):
2020

2121
cursor = self._conn.cursor()
2222
query = "INSERT INTO jobs (type, status, file_id, project_id) VALUES(%s, %s, %s, %s) RETURNING oid"
23-
cursor.execute(query, (type, status, file, 1))
23+
cursor.execute(query, (type, status, file, project_id))
2424

2525
self._conn.commit()
2626

worker_asr/worker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def do_work(connection, channel, delivery_tag, body):
2828
try:
2929
print(" [x] Received %r" % body, flush=True)
3030
oid = json.loads(body)['oid']
31+
project_id = json.loads(body)['project_id']
3132
conn = Connection()
3233
# file = conn.get_file(oid)
3334
file = conn.get_doc_mongo(file_oid=oid)
@@ -50,9 +51,9 @@ def do_work(connection, channel, delivery_tag, body):
5051

5152
# inserts the result of processing in database
5253
file_oid = conn.insert_doc_mongo(payload)
53-
conn.insert_jobs(type='asr', status='done', file=file_oid)
54+
conn.insert_jobs(type='asr', status='done', file=file_oid, project_id=project_id)
5455

55-
message = {'type': 'segmentation', 'status': 'new', 'oid': file_oid}
56+
message = {'type': 'segmentation', 'status': 'new', 'oid': file_oid, 'project_id': project_id}
5657

5758
# post a message on topic_segmentation queue
5859
connection_out = pika.BlockingConnection(pika.ConnectionParameters(host=os.environ['QUEUE_SERVER']))

worker_low_level_features/DAO/connection.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@ def __init__(self):
1515
self._conn = psycopg2.connect(user=self._user, password=self._password, host=self._host, port=self._port,
1616
database=self._db)
1717

18-
def insert_jobs(self, type, status, file):
19-
18+
def insert_jobs(self, type, status, file, project_id):
2019
cursor = self._conn.cursor()
20+
2121
query = "INSERT INTO jobs (type, status, file_id, project_id) VALUES(%s, %s, %s, %s) RETURNING oid"
22-
cursor.execute(query, (type, status, file, 1))
22+
cursor.execute(query, (type, status, file, project_id))
2323

2424
self._conn.commit()
2525

2626
oid = cursor.fetchone()[0]
2727
self._conn.close()
2828

29-
return oid
29+
return oid, project_id
3030

3131
def get_file(self, oid):
3232
cursor = self._conn.cursor()

worker_low_level_features/worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def do_work(connection, channel, delivery_tag, body):
2828
try:
2929
print(" [x] Received %r" % body, flush=True)
3030
oid = json.loads(body)['oid']
31+
project_id = json.loads(body)['project_id']
3132
conn = Connection()
3233
# file = conn.get_file(oid)
3334
file = conn.get_doc_mongo(file_oid=oid)
@@ -57,9 +58,9 @@ def do_work(connection, channel, delivery_tag, body):
5758

5859
# inserts the result of processing in database
5960
file_oid = conn.insert_doc_mongo(payload)
60-
conn.insert_jobs(type='low_level_features', status='done', file=file_oid)
61+
conn.insert_jobs(type='low_level_features', status='done', file=file_oid, project_id=project_id)
6162

62-
message = {'type': 'segmentation', 'status': 'new', 'oid': file_oid}
63+
message = {'type': 'segmentation', 'status': 'new', 'oid': file_oid, 'project_id': project_id}
6364

6465
# post a message on topic_segmentation queue
6566
connection_out = pika.BlockingConnection(pika.ConnectionParameters(host=os.environ['QUEUE_SERVER']))
@@ -68,6 +69,7 @@ def do_work(connection, channel, delivery_tag, body):
6869
channel2.queue_declare(queue='segmentation', durable=True)
6970
channel2.basic_publish(exchange='', routing_key='segmentation', body=json.dumps(message))
7071

72+
7173
except Exception as e:
7274
# print(e, flush=True)
7375
print('Connection Error %s' % e, flush=True)

0 commit comments

Comments
 (0)