diff --git a/django_common_task_system/admin.py b/django_common_task_system/admin.py index 59418e5..6eb8631 100644 --- a/django_common_task_system/admin.py +++ b/django_common_task_system/admin.py @@ -378,7 +378,7 @@ def get_readonly_fields(self, request, obj=None): class ConsumerAdmin(admin.ModelAdmin): - list_display = ('consumer_id', 'machine', + list_display = ('consumer_id', 'admin_machine', 'program_state', 'admin_consume_url', 'consume_status', 'program_status', @@ -428,13 +428,14 @@ class ConsumerAdmin(admin.ModelAdmin): # list_filter = ('runner_status',) readonly_fields = ('create_time', ) - def machine(self, obj: models.Consumer): + def admin_machine(self, obj: models.Consumer): attrs = [ - 'IP: %s' % obj.machine_ip, - '主机名: %s' % obj.machine_name, + '主机名: %s' % obj.machine.hostname, + '内网IP: %s' % obj.machine.intranet_ip, + '外网IP: %s' % obj.machine.internet_ip, ] return format_html('%s' % '
'.join(attrs) if attrs else '-') - machine.short_description = '机器' + admin_machine.short_description = '机器' def program_status(self, obj: models.Consumer): return obj.program is not None and obj.program.is_running @@ -458,11 +459,14 @@ def program_state(self, obj): def admin_consume_url(self, obj: models.Consumer): url = urlparse(obj.consume_url) - return format_html( - '%s' % ( - obj.consume_url, url.path + try: + return format_html( + '%s' % ( + obj.consume_url, url.path + ) ) - ) + except Exception as e: + return str(e) admin_consume_url.short_description = '消费地址' def action(self, obj: models.Consumer): @@ -731,8 +735,9 @@ def get_queryset(self, request): consumer_agent.state.pull() state = consumer_agent.state consumer_state = { - '进程ID': state.ident, - '进程状态': '运行中' if state.is_running else '已停止', + '程序ID': state.ident, + "程序名称": state.program_name, + '程序状态': '运行中' if state.is_running else '已停止', '已处理计划数量': (state.succeed_count + state.failed_count), '成功计划数量': state.succeed_count, '失败计划数量': state.failed_count, @@ -740,7 +745,7 @@ def get_queryset(self, request): '日志文件': state.log_file.replace(os.getcwd(), '') } model.objects['consumer'] = model( - name="系统计划处理进程", + name="系统计划消费线程", state=consumer_state, position=1 ) @@ -748,9 +753,9 @@ def get_queryset(self, request): producer_agent.state.pull() state = producer_agent.state producer_state = { - "运行ID": state.ident, - "线程名称": state.name, - "线程状态": "运行中" if state.is_running else "已停止", + "程序ID": state.ident, + "程序名称": state.program_name, + "程序状态": "运行中" if state.is_running else "已停止", "已调度计划数量": state.scheduled_count, "最近调度时间": state.last_schedule_time, "日志文件": state.log_file.replace(os.getcwd(), '') diff --git a/django_common_task_system/choices.py b/django_common_task_system/choices.py index 8abeceb..89e6a42 100644 --- a/django_common_task_system/choices.py +++ b/django_common_task_system/choices.py @@ -1,6 +1,4 @@ -from django.db.models import TextChoices -import queue -import os +from django.db.models import TextChoices, IntegerChoices class TaskStatus(TextChoices): @@ -72,6 +70,13 @@ class ProgramType(TextChoices): PROCESS = 'Process' +class ProgramSource(IntegerChoices): + REPORT = 1, '主动上报' + DETECT = 2, '被动检测' + ADMIN = 3, '管理员添加' + API = 4, 'API添加' + + class ConsumeStatus(TextChoices): # start status INIT = 'Init', '初始化' diff --git a/django_common_task_system/consumer.py b/django_common_task_system/consumer.py index a0498e2..b74f81a 100644 --- a/django_common_task_system/consumer.py +++ b/django_common_task_system/consumer.py @@ -5,7 +5,7 @@ from threading import Thread from docker.models.containers import Container from datetime import datetime -from django_common_task_system.program import ContainerProgram, ProgramAgent, ProgramState +from django_common_task_system.program import Program, ProgramAgent, ProgramState, Key, MapKey, ListKey import traceback import docker @@ -28,17 +28,14 @@ def __init__(self, key): self.container: Optional[Container] = None -class ConsumerProgram(ContainerProgram): +class ConsumerProgram(Program): state_class = ConsumerState + state_key = MapKey('consumers') def __init__(self, model: Consumer, container=None): self.model = model - self.state_key = 'consumer_%s' % model.consumer_id super().__init__(container=container) - def init_state(self, **kwargs): - pass - @property def program_id(self) -> int: return self.model.consumer_id @@ -63,7 +60,11 @@ def load_from_container(cls, container: Container): @classmethod def load_consumer_from_cache(cls, cache: dict): program = cache.pop('program') - machine = Machine(**cache.pop('machine')) + machine_cache = cache.pop('machine') + if machine_cache: + machine = Machine(**machine_cache) + else: + machine = Machine.objects.local consumer = Consumer(machine=machine, **cache) if program: container_cache = program.get('container', {}) @@ -123,7 +124,7 @@ def run(self): model.startup_log = traceback.format_exc() model.save() - def stop(self): + def stop(self, destroy=False): if isinstance(self.container, Container): self.container.stop() # self.container.remove() diff --git a/django_common_task_system/forms.py b/django_common_task_system/forms.py index f2495d3..efd382b 100644 --- a/django_common_task_system/forms.py +++ b/django_common_task_system/forms.py @@ -7,7 +7,7 @@ from django.contrib.admin import widgets from django.utils.module_loading import import_string from django_common_task_system.choices import ( - ScheduleType, ScheduleTimingType, ScheduleStatus, TaskStatus) + ScheduleType, ScheduleTimingType, ScheduleStatus, TaskStatus, ProgramSource) from django_common_objects.widgets import JSONWidget from django_common_task_system.utils import foreign_key from datetime import datetime, time as datetime_time @@ -18,7 +18,6 @@ from django_common_task_system.utils.cache import ttl_cache from .choices import ProgramType from . import models -from .builtins import builtins from . import get_schedule_model, get_task_model from .fields import (NLPSentenceWidget, PeriodScheduleFiled, OnceScheduleField, MultiWeekdaySelectFiled, MultiMonthdaySelectFiled, MultiYearDaySelectWidget, MultiDaySelectField, PeriodWidget, @@ -398,43 +397,13 @@ def __init__(self, *args, **kwargs): path = reverse('schedule-get', kwargs={'code': obj.code}) consume_url_choices.append((path, obj.name)) self.fields['consume_url'].choices = consume_url_choices - intranet_ip = ttl_cache()(ip_utils.get_intranet_ip)() - internet_ip = self.get_internet_ip() - ip_choices = ( - (intranet_ip, "%s(内网)" % intranet_ip), - (internet_ip, "%s(外网)" % internet_ip), - ('127.0.0.1', '127.0.0.1') - ) + ip_choices = [(models.Machine.localhost_ip, '127.0.0.1(本机)')] + for machine in models.Machine.objects.all(): + ip_choices.append((machine.intranet_ip, "%s(%s)内网" % (machine.intranet_ip, machine.hostname))) + ip_choices.append((machine.internet_ip, "%s(%s)外网" % (machine.internet_ip, machine.hostname))) self.fields['consume_host'].choices = ip_choices - # self.fields['machine'].choices = self.get_machine_choices() self.initial['consume_port'] = os.environ['DJANGO_SERVER_ADDRESS'].split(':')[-1] - @staticmethod - def get_machine_choices(): - machines = {} - for o in models.Consumer.objects.all(): - machines.setdefault(o.machine_ip, []).append(o) - choices = [] - for ip, clients in machines.items(): - choices.append(("%s-%s" % (ip, clients[0].machine_name), - "%s(%s)%s clients running" % (ip, clients[0].machine_name, len(clients)))) - intranet_ip = ttl_cache()(ip_utils.get_intranet_ip)() - internet_ip = ConsumerForm.get_internet_ip() - for ip in (intranet_ip, internet_ip, '127.0.0.1'): - if ip in machines: - break - else: - choices.append(('127.0.0.1-本机', "127.0.0.1(本机)0 clients running")) - return choices - - @staticmethod - @ttl_cache() - def get_internet_ip(): - try: - return ip_utils.get_internet_ip() - except Exception as e: - return "获取失败: %s" % str(e)[:50] - @staticmethod def validate_setting(setting_str, setting_dict): try: @@ -464,6 +433,7 @@ def clean(self): raise forms.ValidationError('command is required for mysql consume') consumer.program_type = cleaned_data['program_type'] consumer.machine = cleaned_data['machine'] + consumer.program_source = ProgramSource.ADMIN if consumer.program_type == ProgramType.DOCKER: consumer.program_setting = { 'image': cleaned_data['container_image'], diff --git a/django_common_task_system/models.py b/django_common_task_system/models.py index d81d1f9..38f4d6e 100644 --- a/django_common_task_system/models.py +++ b/django_common_task_system/models.py @@ -6,7 +6,7 @@ from django_common_task_system.choices import ( TaskStatus, ScheduleStatus, ScheduleCallbackStatus, ScheduleCallbackEvent, ScheduleQueueModule, ConsumeStatus, ProgramType, - PermissionType, ExecuteStatus, ScheduleExceptionReason + PermissionType, ExecuteStatus, ScheduleExceptionReason, ProgramSource ) from django_common_objects.models import CommonTag, CommonCategory from django_common_objects import fields as common_fields @@ -20,8 +20,9 @@ from django.utils.functional import cached_property from django_common_task_system.cache_service import cache_agent from functools import cmp_to_key -from django_common_task_system.program import ContainerProgram +from django_common_task_system.program import Program, RemoteContainer from rest_framework import serializers +from django_common_task_system.utils import ip as ip_util import os import time import re @@ -258,7 +259,7 @@ class Query: order_by = [] select_related = False - def using(self, alias): + def using(self, _): return self def all(self): @@ -392,45 +393,90 @@ def delete(self, consumer: 'Consumer'): consumer.program.stop() cache_agent.hdel(self.cache_key, consumer.consumer_id) + @staticmethod + def create(consume_url=None, program=None, program_source=ProgramSource.REPORT, **kwargs): + for field in Consumer._meta.fields: + if field.name not in kwargs and field.default is not models.fields.NOT_PROVIDED: + if callable(field.default): + kwargs[field.name] = field.default() + else: + kwargs[field.name] = field.default + machine = kwargs.pop('machine', None) + if program_source == ProgramSource.REPORT: + assert machine is not None, 'machine is required when program_source is REPORT' + assert consume_url is not None, 'consume_url is required when program_source is REPORT' + machine = Machine(**machine) + else: + if machine is None: + machine = Machine.objects.local + if consume_url is None: + consume_url = 'http://%s' % machine.localhost_ip + consumer = Consumer(consume_url=consume_url, machine=machine, **kwargs) + consumer.program = program + consumer.save() + return consumer + class MachineManager(CustomManager): + _local = None + + @property + def local(self): + if self._local is None: + for _ in range(3): + try: + internet_ip = ip_util.get_internet_ip() + break + except Exception: + continue + else: + internet_ip = None + self._local = Machine(hostname='本机', intranet_ip=ip_util.get_intranet_ip(), + internet_ip=internet_ip, group='默认') + return self._local def all(self): - return QuerySet([ - Machine(name='本机', ip='127.0.0.1', group='默认'), - ], Machine) + return QuerySet([self.local, *dict.values(self)], Machine) class Machine(models.Model): - name = models.CharField(max_length=100, verbose_name='机器名', primary_key=True) - ip = models.GenericIPAddressField(max_length=100, verbose_name='机器IP', default='127.0.0.1') + hostname = models.CharField(max_length=100, verbose_name='机器名') + intranet_ip = models.GenericIPAddressField(max_length=100, verbose_name='内网IP') + internet_ip = models.GenericIPAddressField(max_length=100, verbose_name='外网IP', primary_key=True) group = models.CharField(max_length=100, verbose_name='分组', default='默认') objects = MachineManager() + @property + def localhost_ip(self): + return "127.0.0.1" + class Meta: managed = False verbose_name = verbose_name_plural = '机器管理' def __str__(self): - return "%s(%s)" % (self.name, self.ip) + return "%s(%s)" % (self.hostname, self.intranet_ip) + + def save( + self, force_insert=False, force_update=False, using=None, update_fields=None + ): + Machine.objects[self.internet_ip] = self class Consumer(models.Model): # 两种运行模式: 容器模式,进程模式 - program: ContainerProgram = None + program: Program = None machine = models.ForeignKey(Machine, on_delete=models.DO_NOTHING, db_constraint=False, verbose_name='机器') consumer_id = models.IntegerField(verbose_name='客户端ID', primary_key=True) - # machine_name = models.CharField(max_length=100, verbose_name='机器名', default='本机') - # machine_ip = models.GenericIPAddressField(max_length=100, verbose_name='机器IP', default='127.0.0.1') - # group = models.CharField(max_length=100, verbose_name='分组', default='默认') consume_url = models.CharField(max_length=200, verbose_name='订阅地址') consume_kwargs = models.JSONField(verbose_name='订阅参数', default=dict) program_type = models.CharField(max_length=100, verbose_name='运行引擎', choices=ProgramType.choices, default=ProgramType.DOCKER) program_setting = models.JSONField(verbose_name='引擎设置', default=dict) program_env = models.CharField(max_length=500, verbose_name='环境变量', blank=True, null=True) - consume_status = models.CharField(max_length=500, choices=ConsumeStatus.choices, + program_source = models.IntegerField(verbose_name='程序来源', default=ProgramSource.REPORT) + consume_status = models.CharField(max_length=500, choices=ConsumeStatus.choices, verbose_name='启动结果', default=ConsumeStatus.RUNNING) startup_log = models.CharField(max_length=2000, null=True, blank=True) create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') @@ -627,14 +673,18 @@ class Meta: class MachineSerializer(serializers.ModelSerializer): + internet_ip = serializers.IPAddressField(read_only=True) + class Meta: fields = '__all__' model = Machine class ConsumerSerializer(serializers.ModelSerializer): - program = serializers.SerializerMethodField() + program = serializers.SerializerMethodField(label='程序') machine = MachineSerializer() + consumer_id = serializers.IntegerField(read_only=True) + program_source = serializers.IntegerField(required=False, default=ProgramSource.REPORT) @staticmethod def get_program(obj: Consumer): @@ -657,6 +707,14 @@ def get_program(obj: Consumer): } return None + def create(self, validated_data): + if validated_data['program_source'] == ProgramSource.REPORT: + validated_data['machine']['internet_ip'] = self.context['request'].META.get('REMOTE_ADDR') + program = Program(container=RemoteContainer(self.initial_data['program'])) + else: + program = None + return Consumer.objects.create(program=program, **validated_data) + class Meta: fields = '__all__' model = Consumer diff --git a/django_common_task_system/producer.py b/django_common_task_system/producer.py index 31d2c28..4d33e96 100644 --- a/django_common_task_system/producer.py +++ b/django_common_task_system/producer.py @@ -3,8 +3,7 @@ from django_common_task_system.builtins import builtins from django_common_task_system import get_schedule_model, get_schedule_serializer from django_common_task_system.models import AbstractSchedule -from django_common_task_system.utils.logger import add_file_handler -from django_common_task_system.program import Program, ProgramAgent, ProgramState +from django_common_task_system.program import LocalProgram, ProgramAgent, ProgramState, Key from django_common_task_system.schedule.config import ScheduleConfig from datetime import datetime import time @@ -23,18 +22,9 @@ def __init__(self, key): self.log_file = '' -class Producer(Program): +class Producer(LocalProgram): state_class = ProducerState - state_key = 'producer' - - def __init__(self): - super(Producer, self).__init__(name='Producer') - self.log_file = add_file_handler(self.logger) - - def init_state(self, **kwargs): - super(Producer, self).init_state( - log_file=self.log_file, - ) + state_key = Key('producer') def produce(self): state = self.state @@ -109,8 +99,8 @@ def __init__(self): def program_id(self) -> int: return self.ident - def stop(self): - super(ProducerThread, self).stop() + def stop(self, destroy=False): + super(ProducerThread, self).stop(destroy=destroy) while self.is_alive(): time.sleep(0.5) return '' diff --git a/django_common_task_system/program.py b/django_common_task_system/program.py index 98dffdb..0de8376 100644 --- a/django_common_task_system/program.py +++ b/django_common_task_system/program.py @@ -1,11 +1,14 @@ +import json import threading import logging import enum import os -from typing import Callable, Optional +from typing import Callable, Optional, Union from docker.models.containers import Container from django_common_task_system.cache_service import cache_agent from django_common_task_system.choices import ContainerStatus +from django_common_task_system.utils.logger import add_file_handler +from django_common_task_system.log import PagedLog class ProgramAction(str, enum.Enum): @@ -23,14 +26,47 @@ class ContainerProgramAction(str, enum.Enum): DESTROY = 'destroy' +class RemoteContainer(Container): + def __init__(self, attrs): + super(RemoteContainer, self).__init__(attrs) + + def start(self, **kwargs): + pass + + def stop(self, **kwargs): + pass + + def restart(self, **kwargs): + pass + + def logs(self, **kwargs): + return '' + + def remove(self, **kwargs): + pass + + +class Key(str): + pass + + +class MapKey(str): + pass + + +class ListKey(str): + pass + + class ProgramState(dict): - def __init__(self, key): + def __init__(self, key: Union[MapKey, Key]): super(ProgramState, self).__init__() self.key = key self.ident = None self.is_running = False self.engine = None self.create_time = None + self.program_name = None def __setattr__(self, key, value): self[key] = value @@ -42,27 +78,68 @@ def commit(self, **kwargs) -> dict: return kwargs or self def push(self, **kwargs): - cache_agent.hset(self.key, mapping=kwargs) + if isinstance(self.key, Key): + cache_agent.hset(self.key, mapping=kwargs) + else: + cache_agent.hset(self.key, self.ident, json.dumps(kwargs)) def commit_and_push(self, **kwargs): return self.push(**self.commit(**kwargs)) def pull(self): - state = cache_agent.hgetall(self.key) - for k, v in state.items(): - setattr(self, k, v) + if isinstance(self.key, Key): + state = cache_agent.hgetall(self.key) + else: + state = cache_agent.hget(self.key, self.ident) + if state: + state = json.loads(state) + if state: + for k, v in state.items(): + setattr(self, k, v) + + def delete(self): + if isinstance(self.key, Key): + cache_agent.delete(self.key) + else: + cache_agent.hdel(self.key, self.ident) class Program: state_class = ProgramState - state_key = '' + state_key: Key = None - def __init__(self, name=None, logger: logging.Logger = None): - assert self.state_key, 'state_key must be set' + def __init__(self, name=None, container=None, logger: logging.Logger = None): + assert isinstance(self.state_key, Key), 'state_key type must be Key' self._event = threading.Event() self.state = self.state_class(self.state_key) - self.program_name = name or self.__class__.__name__ - self.logger = logger or logging.getLogger(self.program_name.lower()) + self.container: Optional[Container, RemoteContainer] = container + self._program_name = name + self._logger = logger + self._log_file = None + + @property + def program_name(self): + if self._program_name: + name = self._program_name + elif self.container is not None: + name = self.container.name + else: + name = self.__class__.__name__ + return name + + @property + def log_file(self): + if self._log_file is None: + _ = self.logger + return self._log_file + + @property + def logger(self): + if self._logger is None: + self._logger = logging.getLogger(self.program_name) + if self._log_file is None: + self._log_file = add_file_handler(self._logger) + return self._logger @property def is_running(self): @@ -72,12 +149,17 @@ def is_running(self): def program_id(self) -> int: return os.getpid() - def init_state(self, **kwargs): + def pre_started(self): self.state.commit_and_push( - name=self.program_name, + program_name=self.program_name, is_running=False, engine=self.__class__.__name__, - **kwargs + ) + + def on_started(self): + self.state.commit_and_push( + is_running=True, + ident=self.program_id ) def run(self) -> None: @@ -91,50 +173,77 @@ def start_if_not_started(self) -> str: if self.is_running: error = '%s already started, pid: %s' % (self, self.program_id) else: - self.init_state() + self.pre_started() start_program() - self.state.commit_and_push(is_running=True, ident=self.program_id) + self.on_started() self.logger.info('%s started, pid: %s' % (self, self.program_id)) error = '' self.logger.info(error) return error - def stop(self): + def stop(self, destroy=False): self._event.clear() + if self.container: + self.container.stop() + if destroy: + self.container.remove() + if destroy: + self.state.delete() + else: + self.state.commit_and_push( + is_running=False, + ) def read_log(self, page=0, page_size=10): - raise NotImplementedError + if self.container: + return self.container.logs(tail=page_size) + else: + return PagedLog(self.log_file, page_size=page_size).read_page(page=page) def __str__(self): return "Program(%s)" % self.__class__.__name__ -class ContainerProgram(Program): - def __init__(self, container=None): - self.container: Optional[Container] = container - super().__init__(name=getattr(container, 'name', None)) +class LocalProgram(Program): def run(self) -> None: raise NotImplementedError - def read_log(self, page=0, page_size=1000): - if self.container: - return self.container.logs(tail=page_size) - - # def start(self): - # self.container.start() - # - def stop(self): - assert self.container, 'container must be set' - self.container.stop() - self.container.remove() - - def restart(self): - self.container.restart() + def pre_started(self): + self.state.commit_and_push( + name=self.program_name, + is_running=False, + engine=self.__class__.__name__, + log_file=self.log_file + ) - @property - def is_running(self): - return self.container.status == ContainerStatus.RUNNING +# +# class ContainerProgram(Program): +# def __init__(self, container=None): +# self.container: Optional[Container, RemoteContainer] = container +# super().__init__(name=getattr(container, 'name', None)) +# +# def run(self) -> None: +# raise NotImplementedError +# +# def read_log(self, page=0, page_size=1000): +# if self.container: +# return self.container.logs(tail=page_size) +# +# # def start(self): +# # self.container.start() +# # +# def stop(self): +# assert self.container, 'container must be set' +# self.container.stop() +# self.container.remove() +# +# def restart(self): +# self.container.restart() +# +# @property +# def is_running(self): +# return self.container.status == ContainerStatus.RUNNING class ProgramAgent: diff --git a/django_common_task_system/system_task_execution/process.py b/django_common_task_system/system_task_execution/process.py index 4f96d33..28cd87e 100644 --- a/django_common_task_system/system_task_execution/process.py +++ b/django_common_task_system/system_task_execution/process.py @@ -17,6 +17,6 @@ def run(self): django.setup() super(ScheduleConsumerProcess, self).run() - def stop(self): - super(ScheduleConsumerProcess, self).stop() + def stop(self, destroy=False): + super(ScheduleConsumerProcess, self).stop(destroy=destroy) self.kill() diff --git a/django_common_task_system/system_task_execution/system_task_execution/consumer.py b/django_common_task_system/system_task_execution/system_task_execution/consumer.py index 7197fa5..2865294 100644 --- a/django_common_task_system/system_task_execution/system_task_execution/consumer.py +++ b/django_common_task_system/system_task_execution/system_task_execution/consumer.py @@ -6,7 +6,7 @@ from django_common_task_system.models import ExceptionReport from django_common_task_system import get_schedule_log_model from django_common_task_system.choices import ExecuteStatus -from django_common_task_system.program import Program, ProgramState +from django_common_task_system.program import LocalProgram, ProgramState, Key from django_common_task_system.utils.logger import add_file_handler from datetime import datetime @@ -175,19 +175,13 @@ def __init__(self, key): self.log_file = '' -class Consumer(Program): +class Consumer(LocalProgram): state_class = ConsumerState - state_key = 'consumer' + state_key = Key('consumer') def __init__(self, queue: Queue): super(Consumer, self).__init__(name='Consumer', logger=logger) self.queue = queue - self.log_file = add_file_handler(self.logger) - - def init_state(self): - super(Consumer, self).init_state( - log_file=self.log_file, - ) def run(self): queue = self.queue diff --git a/django_common_task_system/system_task_execution/thread.py b/django_common_task_system/system_task_execution/thread.py index 9e6290a..29b375b 100644 --- a/django_common_task_system/system_task_execution/thread.py +++ b/django_common_task_system/system_task_execution/thread.py @@ -13,7 +13,7 @@ def __init__(self): def program_id(self) -> int: return self.ident - def stop(self) -> str: + def stop(self, destroy=False) -> str: while self.is_alive(): time.sleep(0.5) return '' diff --git a/django_common_task_system/views.py b/django_common_task_system/views.py index e152468..c1d8b4a 100644 --- a/django_common_task_system/views.py +++ b/django_common_task_system/views.py @@ -316,10 +316,7 @@ class UserConsumerView(APIView): def get(self, request: Request, action: str): if action == ContainerProgramAction.START: - consumer = Consumer.objects.create() - return Response(models.ConsumerSerializer(consumer).data) - if action == 'register': - return UserConsumerView.register_client(request) + return self.post(request, 'start') consumer_id = request.GET.get('consumer_id', '') if not consumer_id.isdigit(): return Response({'error': 'invalid consumer_id: %s' % consumer_id}, status=status.HTTP_400_BAD_REQUEST) @@ -338,42 +335,35 @@ def get(self, request: Request, action: str): return Response({'error': 'invalid action: %s, only support start/stop/log' % action}, status=status.HTTP_400_BAD_REQUEST) - @staticmethod - def register_client(request: Request): - """ - container: Container = None - group = models.CharField(max_length=100, verbose_name='分组') - subscription_url = models.CharField(max_length=200, verbose_name='订阅地址') - subscription_kwargs = models.JSONField(verbose_name='订阅参数', default=dict) - client_id = models.IntegerField(verbose_name='客户端ID', primary_key=True, default=0) - process_id = models.PositiveIntegerField(verbose_name='进程ID', null=True, blank=True) - container_id = models.CharField(max_length=100, verbose_name='容器ID', blank=True, null=True) - container_name = models.CharField(max_length=100, verbose_name='容器名称', blank=True, null=True) - container_image = models.CharField(max_length=100, verbose_name='容器镜像', blank=True, null=True) - container_status = models.CharField(choices=ContainerStatus.choices, default=ContainerStatus.NONE, - max_length=20, verbose_name='容器状态') - run_in_container = models.BooleanField(default=True, verbose_name='是否在容器中运行') - env = models.CharField(max_length=500, verbose_name='环境变量', blank=True, null=True) - startup_status = models.CharField(max_length=500, choices=TaskClientStatus.choices, - verbose_name='启动结果', default=TaskClientStatus.SUCCEED) - settings = models.TextField(verbose_name='配置', blank=True, null=True) - create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') - startup_log = models.CharField(max_length=2000, null=True, blank=True) - """ - data = request.data - settings = data.get('settings') - container_id = data.get('container_id') - subscription_url = data.get('subscription_url') - subscription_kwargs = data.get('subscription_kwargs') - process_id = data.get('process_id') - client = Consumer(container_id=container_id, - subscription_url=subscription_url, - startup_log='启动成功', - settings=settings, - process_id=process_id, - ) - client.save() - return Response('上报成功') + def post(self, request: Request, action: str): + if action == 'start': + data = { + 'consume_url': request.data.get('consume_url'), + } + elif action == 'register': + serializer = models.ConsumerSerializer(data=request.data) + serializer.context['request'] = request + serializer.is_valid(raise_exception=True) + serializer.save() + return Response(serializer.data) + else: + return Response({'error': 'invalid action: %s, only support start/register' % action}, + status=status.HTTP_400_BAD_REQUEST) + # data = request.data + # settings = data.get('settings') + # machine_setting = data.get('machine') + # if not machine_setting: + # pass + # machine = models.Machine(**machine_setting) + # container_setting = data.get('container') + # consume_url = data.get('consume_url') + # consume_kwargs = data.get('consume_kwargs') + # consumer = Consumer(machine=machine, + # consume_url=consume_url, + # consume_kwargs=consume_kwargs, + # ) + # consumer.save() + # return Response(models.ConsumerSerializer(consumer).data) def log_view(request: Request, filename):