Skip to content

Commit 12abd7f

Browse files
committed
[card-server] cli command to expose a card server to view realtime updates
- Added a card viewer html file - Created a simple HTTP based card server that will help showcase the realtime cards from querying the server
1 parent cd067ec commit 12abd7f

File tree

3 files changed

+428
-1
lines changed

3 files changed

+428
-1
lines changed

metaflow/plugins/cards/card_cli.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
from metaflow.client import Task
22
from metaflow import JSONType, namespace
3-
from metaflow.exception import CommandException
3+
from metaflow.util import resolve_identity
4+
from metaflow.exception import (
5+
CommandException,
6+
MetaflowNotFound,
7+
MetaflowNamespaceMismatch,
8+
)
49
import webbrowser
510
import re
611
from metaflow._vendor import click
@@ -945,3 +950,90 @@ def list(
945950
show_list_as_json=as_json,
946951
file=file,
947952
)
953+
954+
955+
@card.command(help="Run local card viewer server")
956+
@click.option(
957+
"--run-id",
958+
default=None,
959+
show_default=True,
960+
type=str,
961+
help="Run ID of the flow",
962+
)
963+
@click.option(
964+
"--port",
965+
default=8324,
966+
show_default=True,
967+
type=int,
968+
help="Port on which Metaflow card server will run",
969+
)
970+
@click.option(
971+
"--namespace",
972+
"user_namespace",
973+
default=None,
974+
show_default=True,
975+
type=str,
976+
help="Namespace of the flow",
977+
)
978+
@click.option(
979+
"--max-cards",
980+
default=30,
981+
show_default=True,
982+
type=int,
983+
help="Maximum number of cards to be shown at any time by the server",
984+
)
985+
@click.pass_context
986+
def server(ctx, run_id, port, user_namespace, max_cards):
987+
from .card_server import create_card_server, CardServerOptions
988+
989+
user_namespace = resolve_identity() if user_namespace is None else user_namespace
990+
run = _get_run_object(ctx.obj, run_id, user_namespace)
991+
options = CardServerOptions(
992+
run_object=run,
993+
only_running=False,
994+
follow_resumed=False,
995+
flow_datastore=ctx.obj.flow_datastore,
996+
max_cards=max_cards,
997+
)
998+
create_card_server(options, port, ctx.obj)
999+
1000+
1001+
def _get_run_object(obj, run_id, user_namespace):
1002+
from metaflow import Flow, Run, Task
1003+
1004+
flow_name = obj.flow.name
1005+
try:
1006+
if run_id is not None:
1007+
namespace(None)
1008+
else:
1009+
_msg = "Searching for runs in namespace: %s" % user_namespace
1010+
obj.echo(_msg, fg="blue", bold=False)
1011+
namespace(user_namespace)
1012+
flow = Flow(pathspec=flow_name)
1013+
except MetaflowNotFound:
1014+
raise CommandException("No run found for *%s*." % flow_name)
1015+
1016+
except MetaflowNamespaceMismatch:
1017+
raise CommandException(
1018+
"No run found for *%s* in namespace *%s*. You can switch the namespace using --namespace"
1019+
% (flow_name, user_namespace)
1020+
)
1021+
1022+
if run_id is None:
1023+
run_id = flow.latest_run.pathspec
1024+
1025+
else:
1026+
assert len(run_id.split("/")) == 1, "run_id should be of the form <runid>"
1027+
run_id = "/".join([flow_name, run_id])
1028+
1029+
try:
1030+
run = Run(run_id)
1031+
except MetaflowNotFound:
1032+
raise CommandException("No run found for runid: *%s*." % run_id)
1033+
except MetaflowNamespaceMismatch:
1034+
raise CommandException(
1035+
"No run found for runid: *%s* in namespace *%s*. You can switch the namespace using --namespace"
1036+
% (run_id, user_namespace)
1037+
)
1038+
obj.echo("Using run-id %s" % run_id, fg="blue", bold=False)
1039+
return run
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
import os
2+
import json
3+
from http.server import BaseHTTPRequestHandler
4+
5+
try:
6+
from http.server import ThreadingHTTPServer
7+
except ImportError:
8+
from socketserver import ThreadingMixIn
9+
from http.server import HTTPServer
10+
11+
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
12+
daemon_threads = True
13+
14+
15+
from .card_client import CardContainer
16+
from .exception import CardNotPresentException
17+
from .card_resolver import resolve_paths_from_task
18+
19+
VIEWER_PATH = os.path.join(
20+
os.path.dirname(os.path.abspath(__file__)), "card_viewer", "viewer.html"
21+
)
22+
23+
CARD_VIEWER_HTML = open(VIEWER_PATH).read()
24+
25+
TASK_CACHE = {}
26+
27+
28+
class CardServerOptions:
29+
def __init__(
30+
self, run_object, only_running, follow_resumed, flow_datastore, max_cards=20
31+
):
32+
self.run_object = run_object
33+
self.only_running = only_running
34+
self.follow_resumed = follow_resumed
35+
self.flow_datastore = flow_datastore
36+
self.max_cards = max_cards
37+
38+
39+
def cards_for_task(
40+
flow_datastore, task_pathspec, card_type=None, card_hash=None, card_id=None
41+
):
42+
try:
43+
paths, card_ds = resolve_paths_from_task(
44+
flow_datastore,
45+
task_pathspec,
46+
type=card_type,
47+
hash=card_hash,
48+
card_id=card_id,
49+
)
50+
except CardNotPresentException:
51+
return None
52+
for card in CardContainer(paths, card_ds, origin_pathspec=None):
53+
yield card
54+
55+
56+
def cards_for_run(
57+
flow_datastore,
58+
run_object,
59+
only_running,
60+
card_type=None,
61+
card_hash=None,
62+
card_id=None,
63+
max_cards=20,
64+
):
65+
curr_idx = 0
66+
for step in run_object.steps():
67+
for task in step.tasks():
68+
if only_running and task.finished:
69+
continue
70+
card_generator = cards_for_task(
71+
flow_datastore,
72+
task.pathspec,
73+
card_type=card_type,
74+
card_hash=card_hash,
75+
card_id=card_id,
76+
)
77+
if card_generator is None:
78+
continue
79+
for card in card_generator:
80+
curr_idx += 1
81+
if curr_idx >= max_cards:
82+
raise StopIteration
83+
yield task.pathspec, card
84+
85+
86+
class CardViewerRoutes(BaseHTTPRequestHandler):
87+
88+
card_options: CardServerOptions = None
89+
90+
def do_GET(self):
91+
try:
92+
_, path = self.path.split("/", 1)
93+
try:
94+
prefix, suffix = path.split("/", 1)
95+
except:
96+
prefix = path
97+
suffix = None
98+
except:
99+
prefix = None
100+
if prefix in self.ROUTES:
101+
self.ROUTES[prefix](self, suffix)
102+
else:
103+
self._response(CARD_VIEWER_HTML.encode("utf-8"))
104+
105+
def get_runinfo(self, suffix):
106+
task_card_generator = cards_for_run(
107+
self.card_options.flow_datastore,
108+
self.card_options.run_object,
109+
self.card_options.only_running,
110+
max_cards=self.card_options.max_cards,
111+
)
112+
flow_name = self.card_options.run_object.parent.id
113+
run_id = self.card_options.run_object.id
114+
cards = []
115+
for pathspec, card in task_card_generator:
116+
step, task = pathspec.split("/")[-2:]
117+
cards.append(
118+
dict(
119+
task=pathspec,
120+
label="%s/%s %s" % (step, task, card.hash),
121+
card_object=dict(
122+
hash=card.hash,
123+
type=card.type,
124+
path=card.path,
125+
id=card.id,
126+
),
127+
card="%s/%s" % (pathspec, card.hash),
128+
)
129+
)
130+
resp = {"status": "ok", "flow": flow_name, "run_id": run_id, "cards": cards}
131+
self._response(resp, is_json=True)
132+
133+
def get_card(self, suffix):
134+
flow, run_id, step, task_id, card_hash = suffix.split("/")
135+
pathspec = "/".join([flow, run_id, step, task_id])
136+
cards = list(
137+
cards_for_task(
138+
self.card_options.flow_datastore, pathspec, card_hash=card_hash
139+
)
140+
)
141+
if len(cards) == 0:
142+
self._response("Card not found", code=404)
143+
return
144+
selected_card = cards[0]
145+
self._response(selected_card.get().encode("utf-8"))
146+
147+
def get_data(self, suffix):
148+
flow, run_id, step, task_id, card_hash = suffix.split("/")
149+
pathspec = "/".join([flow, run_id, step, task_id])
150+
cards = list(
151+
cards_for_task(
152+
self.card_options.flow_datastore, pathspec, card_hash=card_hash
153+
)
154+
)
155+
if len(cards) == 0:
156+
self._response("Card not found", code=404)
157+
return
158+
selected_card = cards[0]
159+
card_data = selected_card.get_data()
160+
if card_data is not None:
161+
self._response({"status": "ok", "payload": card_data}, is_json=True)
162+
else:
163+
self._response({"status": "not found"}, is_json=True)
164+
165+
def _response(self, body, is_json=False, code=200):
166+
self.send_response(code)
167+
mime = "application/json" if is_json else "text/html"
168+
self.send_header("Content-type", mime)
169+
self.end_headers()
170+
if is_json:
171+
self.wfile.write(json.dumps(body).encode("utf-8"))
172+
else:
173+
self.wfile.write(body)
174+
175+
ROUTES = {"runinfo": get_runinfo, "card": get_card, "data": get_data}
176+
177+
178+
def create_card_server(card_options, port, ctx_obj):
179+
CardViewerRoutes.card_options = card_options
180+
server_addr = ("", port)
181+
ctx_obj.echo(
182+
"Starting card server on port %d for run-id %s"
183+
% (port, str(card_options.run_object.pathspec)),
184+
fg="green",
185+
bold=True,
186+
)
187+
server = ThreadingHTTPServer(server_addr, CardViewerRoutes)
188+
server.serve_forever()

0 commit comments

Comments
 (0)