From d6112dfa24c1a693c00e5823a710fcf3baf1b274 Mon Sep 17 00:00:00 2001
From: bizhongliang <1183008540@qq.com>
Date: Thu, 19 Oct 2023 18:02:16 +0800
Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80Program=E6=93=8D=E4=BD=9C?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
django_common_task_system/admin.py | 35 ++--
django_common_task_system/choices.py | 11 +-
django_common_task_system/consumer.py | 17 +-
django_common_task_system/forms.py | 42 +---
django_common_task_system/models.py | 88 +++++++--
django_common_task_system/producer.py | 20 +-
django_common_task_system/program.py | 187 ++++++++++++++----
.../system_task_execution/process.py | 4 +-
.../system_task_execution/consumer.py | 12 +-
.../system_task_execution/thread.py | 2 +-
django_common_task_system/views.py | 70 +++----
11 files changed, 305 insertions(+), 183 deletions(-)
diff --git a/django_common_task_system/admin.py b/django_common_task_system/admin.py
index 59418e5..6eb8631 100644
--- a/django_common_task_system/admin.py
+++ b/django_common_task_system/admin.py
@@ -378,7 +378,7 @@ def get_readonly_fields(self, request, obj=None):
class ConsumerAdmin(admin.ModelAdmin):
- list_display = ('consumer_id', 'machine',
+ list_display = ('consumer_id', 'admin_machine',
'program_state',
'admin_consume_url',
'consume_status', 'program_status',
@@ -428,13 +428,14 @@ class ConsumerAdmin(admin.ModelAdmin):
# list_filter = ('runner_status',)
readonly_fields = ('create_time', )
- def machine(self, obj: models.Consumer):
+ def admin_machine(self, obj: models.Consumer):
attrs = [
- 'IP: %s' % obj.machine_ip,
- '主机名: %s' % obj.machine_name,
+ '主机名: %s' % obj.machine.hostname,
+ '内网IP: %s' % obj.machine.intranet_ip,
+ '外网IP: %s' % obj.machine.internet_ip,
]
return format_html('%s' % '
'.join(attrs) if attrs else '-')
- machine.short_description = '机器'
+ admin_machine.short_description = '机器'
def program_status(self, obj: models.Consumer):
return obj.program is not None and obj.program.is_running
@@ -458,11 +459,14 @@ def program_state(self, obj):
def admin_consume_url(self, obj: models.Consumer):
url = urlparse(obj.consume_url)
- return format_html(
- '%s' % (
- obj.consume_url, url.path
+ try:
+ return format_html(
+ '%s' % (
+ obj.consume_url, url.path
+ )
)
- )
+ except Exception as e:
+ return str(e)
admin_consume_url.short_description = '消费地址'
def action(self, obj: models.Consumer):
@@ -731,8 +735,9 @@ def get_queryset(self, request):
consumer_agent.state.pull()
state = consumer_agent.state
consumer_state = {
- '进程ID': state.ident,
- '进程状态': '运行中' if state.is_running else '已停止',
+ '程序ID': state.ident,
+ "程序名称": state.program_name,
+ '程序状态': '运行中' if state.is_running else '已停止',
'已处理计划数量': (state.succeed_count + state.failed_count),
'成功计划数量': state.succeed_count,
'失败计划数量': state.failed_count,
@@ -740,7 +745,7 @@ def get_queryset(self, request):
'日志文件': state.log_file.replace(os.getcwd(), '')
}
model.objects['consumer'] = model(
- name="系统计划处理进程",
+ name="系统计划消费线程",
state=consumer_state,
position=1
)
@@ -748,9 +753,9 @@ def get_queryset(self, request):
producer_agent.state.pull()
state = producer_agent.state
producer_state = {
- "运行ID": state.ident,
- "线程名称": state.name,
- "线程状态": "运行中" if state.is_running else "已停止",
+ "程序ID": state.ident,
+ "程序名称": state.program_name,
+ "程序状态": "运行中" if state.is_running else "已停止",
"已调度计划数量": state.scheduled_count,
"最近调度时间": state.last_schedule_time,
"日志文件": state.log_file.replace(os.getcwd(), '')
diff --git a/django_common_task_system/choices.py b/django_common_task_system/choices.py
index 8abeceb..89e6a42 100644
--- a/django_common_task_system/choices.py
+++ b/django_common_task_system/choices.py
@@ -1,6 +1,4 @@
-from django.db.models import TextChoices
-import queue
-import os
+from django.db.models import TextChoices, IntegerChoices
class TaskStatus(TextChoices):
@@ -72,6 +70,13 @@ class ProgramType(TextChoices):
PROCESS = 'Process'
+class ProgramSource(IntegerChoices):
+ REPORT = 1, '主动上报'
+ DETECT = 2, '被动检测'
+ ADMIN = 3, '管理员添加'
+ API = 4, 'API添加'
+
+
class ConsumeStatus(TextChoices):
# start status
INIT = 'Init', '初始化'
diff --git a/django_common_task_system/consumer.py b/django_common_task_system/consumer.py
index a0498e2..b74f81a 100644
--- a/django_common_task_system/consumer.py
+++ b/django_common_task_system/consumer.py
@@ -5,7 +5,7 @@
from threading import Thread
from docker.models.containers import Container
from datetime import datetime
-from django_common_task_system.program import ContainerProgram, ProgramAgent, ProgramState
+from django_common_task_system.program import Program, ProgramAgent, ProgramState, Key, MapKey, ListKey
import traceback
import docker
@@ -28,17 +28,14 @@ def __init__(self, key):
self.container: Optional[Container] = None
-class ConsumerProgram(ContainerProgram):
+class ConsumerProgram(Program):
state_class = ConsumerState
+ state_key = MapKey('consumers')
def __init__(self, model: Consumer, container=None):
self.model = model
- self.state_key = 'consumer_%s' % model.consumer_id
super().__init__(container=container)
- def init_state(self, **kwargs):
- pass
-
@property
def program_id(self) -> int:
return self.model.consumer_id
@@ -63,7 +60,11 @@ def load_from_container(cls, container: Container):
@classmethod
def load_consumer_from_cache(cls, cache: dict):
program = cache.pop('program')
- machine = Machine(**cache.pop('machine'))
+ machine_cache = cache.pop('machine')
+ if machine_cache:
+ machine = Machine(**machine_cache)
+ else:
+ machine = Machine.objects.local
consumer = Consumer(machine=machine, **cache)
if program:
container_cache = program.get('container', {})
@@ -123,7 +124,7 @@ def run(self):
model.startup_log = traceback.format_exc()
model.save()
- def stop(self):
+ def stop(self, destroy=False):
if isinstance(self.container, Container):
self.container.stop()
# self.container.remove()
diff --git a/django_common_task_system/forms.py b/django_common_task_system/forms.py
index f2495d3..efd382b 100644
--- a/django_common_task_system/forms.py
+++ b/django_common_task_system/forms.py
@@ -7,7 +7,7 @@
from django.contrib.admin import widgets
from django.utils.module_loading import import_string
from django_common_task_system.choices import (
- ScheduleType, ScheduleTimingType, ScheduleStatus, TaskStatus)
+ ScheduleType, ScheduleTimingType, ScheduleStatus, TaskStatus, ProgramSource)
from django_common_objects.widgets import JSONWidget
from django_common_task_system.utils import foreign_key
from datetime import datetime, time as datetime_time
@@ -18,7 +18,6 @@
from django_common_task_system.utils.cache import ttl_cache
from .choices import ProgramType
from . import models
-from .builtins import builtins
from . import get_schedule_model, get_task_model
from .fields import (NLPSentenceWidget, PeriodScheduleFiled, OnceScheduleField, MultiWeekdaySelectFiled,
MultiMonthdaySelectFiled, MultiYearDaySelectWidget, MultiDaySelectField, PeriodWidget,
@@ -398,43 +397,13 @@ def __init__(self, *args, **kwargs):
path = reverse('schedule-get', kwargs={'code': obj.code})
consume_url_choices.append((path, obj.name))
self.fields['consume_url'].choices = consume_url_choices
- intranet_ip = ttl_cache()(ip_utils.get_intranet_ip)()
- internet_ip = self.get_internet_ip()
- ip_choices = (
- (intranet_ip, "%s(内网)" % intranet_ip),
- (internet_ip, "%s(外网)" % internet_ip),
- ('127.0.0.1', '127.0.0.1')
- )
+ ip_choices = [(models.Machine.localhost_ip, '127.0.0.1(本机)')]
+ for machine in models.Machine.objects.all():
+ ip_choices.append((machine.intranet_ip, "%s(%s)内网" % (machine.intranet_ip, machine.hostname)))
+ ip_choices.append((machine.internet_ip, "%s(%s)外网" % (machine.internet_ip, machine.hostname)))
self.fields['consume_host'].choices = ip_choices
- # self.fields['machine'].choices = self.get_machine_choices()
self.initial['consume_port'] = os.environ['DJANGO_SERVER_ADDRESS'].split(':')[-1]
- @staticmethod
- def get_machine_choices():
- machines = {}
- for o in models.Consumer.objects.all():
- machines.setdefault(o.machine_ip, []).append(o)
- choices = []
- for ip, clients in machines.items():
- choices.append(("%s-%s" % (ip, clients[0].machine_name),
- "%s(%s)%s clients running" % (ip, clients[0].machine_name, len(clients))))
- intranet_ip = ttl_cache()(ip_utils.get_intranet_ip)()
- internet_ip = ConsumerForm.get_internet_ip()
- for ip in (intranet_ip, internet_ip, '127.0.0.1'):
- if ip in machines:
- break
- else:
- choices.append(('127.0.0.1-本机', "127.0.0.1(本机)0 clients running"))
- return choices
-
- @staticmethod
- @ttl_cache()
- def get_internet_ip():
- try:
- return ip_utils.get_internet_ip()
- except Exception as e:
- return "获取失败: %s" % str(e)[:50]
-
@staticmethod
def validate_setting(setting_str, setting_dict):
try:
@@ -464,6 +433,7 @@ def clean(self):
raise forms.ValidationError('command is required for mysql consume')
consumer.program_type = cleaned_data['program_type']
consumer.machine = cleaned_data['machine']
+ consumer.program_source = ProgramSource.ADMIN
if consumer.program_type == ProgramType.DOCKER:
consumer.program_setting = {
'image': cleaned_data['container_image'],
diff --git a/django_common_task_system/models.py b/django_common_task_system/models.py
index d81d1f9..38f4d6e 100644
--- a/django_common_task_system/models.py
+++ b/django_common_task_system/models.py
@@ -6,7 +6,7 @@
from django_common_task_system.choices import (
TaskStatus, ScheduleStatus, ScheduleCallbackStatus,
ScheduleCallbackEvent, ScheduleQueueModule, ConsumeStatus, ProgramType,
- PermissionType, ExecuteStatus, ScheduleExceptionReason
+ PermissionType, ExecuteStatus, ScheduleExceptionReason, ProgramSource
)
from django_common_objects.models import CommonTag, CommonCategory
from django_common_objects import fields as common_fields
@@ -20,8 +20,9 @@
from django.utils.functional import cached_property
from django_common_task_system.cache_service import cache_agent
from functools import cmp_to_key
-from django_common_task_system.program import ContainerProgram
+from django_common_task_system.program import Program, RemoteContainer
from rest_framework import serializers
+from django_common_task_system.utils import ip as ip_util
import os
import time
import re
@@ -258,7 +259,7 @@ class Query:
order_by = []
select_related = False
- def using(self, alias):
+ def using(self, _):
return self
def all(self):
@@ -392,45 +393,90 @@ def delete(self, consumer: 'Consumer'):
consumer.program.stop()
cache_agent.hdel(self.cache_key, consumer.consumer_id)
+ @staticmethod
+ def create(consume_url=None, program=None, program_source=ProgramSource.REPORT, **kwargs):
+ for field in Consumer._meta.fields:
+ if field.name not in kwargs and field.default is not models.fields.NOT_PROVIDED:
+ if callable(field.default):
+ kwargs[field.name] = field.default()
+ else:
+ kwargs[field.name] = field.default
+ machine = kwargs.pop('machine', None)
+ if program_source == ProgramSource.REPORT:
+ assert machine is not None, 'machine is required when program_source is REPORT'
+ assert consume_url is not None, 'consume_url is required when program_source is REPORT'
+ machine = Machine(**machine)
+ else:
+ if machine is None:
+ machine = Machine.objects.local
+ if consume_url is None:
+ consume_url = 'http://%s' % machine.localhost_ip
+ consumer = Consumer(consume_url=consume_url, machine=machine, **kwargs)
+ consumer.program = program
+ consumer.save()
+ return consumer
+
class MachineManager(CustomManager):
+ _local = None
+
+ @property
+ def local(self):
+ if self._local is None:
+ for _ in range(3):
+ try:
+ internet_ip = ip_util.get_internet_ip()
+ break
+ except Exception:
+ continue
+ else:
+ internet_ip = None
+ self._local = Machine(hostname='本机', intranet_ip=ip_util.get_intranet_ip(),
+ internet_ip=internet_ip, group='默认')
+ return self._local
def all(self):
- return QuerySet([
- Machine(name='本机', ip='127.0.0.1', group='默认'),
- ], Machine)
+ return QuerySet([self.local, *dict.values(self)], Machine)
class Machine(models.Model):
- name = models.CharField(max_length=100, verbose_name='机器名', primary_key=True)
- ip = models.GenericIPAddressField(max_length=100, verbose_name='机器IP', default='127.0.0.1')
+ hostname = models.CharField(max_length=100, verbose_name='机器名')
+ intranet_ip = models.GenericIPAddressField(max_length=100, verbose_name='内网IP')
+ internet_ip = models.GenericIPAddressField(max_length=100, verbose_name='外网IP', primary_key=True)
group = models.CharField(max_length=100, verbose_name='分组', default='默认')
objects = MachineManager()
+ @property
+ def localhost_ip(self):
+ return "127.0.0.1"
+
class Meta:
managed = False
verbose_name = verbose_name_plural = '机器管理'
def __str__(self):
- return "%s(%s)" % (self.name, self.ip)
+ return "%s(%s)" % (self.hostname, self.intranet_ip)
+
+ def save(
+ self, force_insert=False, force_update=False, using=None, update_fields=None
+ ):
+ Machine.objects[self.internet_ip] = self
class Consumer(models.Model):
# 两种运行模式: 容器模式,进程模式
- program: ContainerProgram = None
+ program: Program = None
machine = models.ForeignKey(Machine, on_delete=models.DO_NOTHING, db_constraint=False, verbose_name='机器')
consumer_id = models.IntegerField(verbose_name='客户端ID', primary_key=True)
- # machine_name = models.CharField(max_length=100, verbose_name='机器名', default='本机')
- # machine_ip = models.GenericIPAddressField(max_length=100, verbose_name='机器IP', default='127.0.0.1')
- # group = models.CharField(max_length=100, verbose_name='分组', default='默认')
consume_url = models.CharField(max_length=200, verbose_name='订阅地址')
consume_kwargs = models.JSONField(verbose_name='订阅参数', default=dict)
program_type = models.CharField(max_length=100, verbose_name='运行引擎', choices=ProgramType.choices,
default=ProgramType.DOCKER)
program_setting = models.JSONField(verbose_name='引擎设置', default=dict)
program_env = models.CharField(max_length=500, verbose_name='环境变量', blank=True, null=True)
- consume_status = models.CharField(max_length=500, choices=ConsumeStatus.choices,
+ program_source = models.IntegerField(verbose_name='程序来源', default=ProgramSource.REPORT)
+ consume_status = models.CharField(max_length=500, choices=ConsumeStatus.choices,
verbose_name='启动结果', default=ConsumeStatus.RUNNING)
startup_log = models.CharField(max_length=2000, null=True, blank=True)
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
@@ -627,14 +673,18 @@ class Meta:
class MachineSerializer(serializers.ModelSerializer):
+ internet_ip = serializers.IPAddressField(read_only=True)
+
class Meta:
fields = '__all__'
model = Machine
class ConsumerSerializer(serializers.ModelSerializer):
- program = serializers.SerializerMethodField()
+ program = serializers.SerializerMethodField(label='程序')
machine = MachineSerializer()
+ consumer_id = serializers.IntegerField(read_only=True)
+ program_source = serializers.IntegerField(required=False, default=ProgramSource.REPORT)
@staticmethod
def get_program(obj: Consumer):
@@ -657,6 +707,14 @@ def get_program(obj: Consumer):
}
return None
+ def create(self, validated_data):
+ if validated_data['program_source'] == ProgramSource.REPORT:
+ validated_data['machine']['internet_ip'] = self.context['request'].META.get('REMOTE_ADDR')
+ program = Program(container=RemoteContainer(self.initial_data['program']))
+ else:
+ program = None
+ return Consumer.objects.create(program=program, **validated_data)
+
class Meta:
fields = '__all__'
model = Consumer
diff --git a/django_common_task_system/producer.py b/django_common_task_system/producer.py
index 31d2c28..4d33e96 100644
--- a/django_common_task_system/producer.py
+++ b/django_common_task_system/producer.py
@@ -3,8 +3,7 @@
from django_common_task_system.builtins import builtins
from django_common_task_system import get_schedule_model, get_schedule_serializer
from django_common_task_system.models import AbstractSchedule
-from django_common_task_system.utils.logger import add_file_handler
-from django_common_task_system.program import Program, ProgramAgent, ProgramState
+from django_common_task_system.program import LocalProgram, ProgramAgent, ProgramState, Key
from django_common_task_system.schedule.config import ScheduleConfig
from datetime import datetime
import time
@@ -23,18 +22,9 @@ def __init__(self, key):
self.log_file = ''
-class Producer(Program):
+class Producer(LocalProgram):
state_class = ProducerState
- state_key = 'producer'
-
- def __init__(self):
- super(Producer, self).__init__(name='Producer')
- self.log_file = add_file_handler(self.logger)
-
- def init_state(self, **kwargs):
- super(Producer, self).init_state(
- log_file=self.log_file,
- )
+ state_key = Key('producer')
def produce(self):
state = self.state
@@ -109,8 +99,8 @@ def __init__(self):
def program_id(self) -> int:
return self.ident
- def stop(self):
- super(ProducerThread, self).stop()
+ def stop(self, destroy=False):
+ super(ProducerThread, self).stop(destroy=destroy)
while self.is_alive():
time.sleep(0.5)
return ''
diff --git a/django_common_task_system/program.py b/django_common_task_system/program.py
index 98dffdb..0de8376 100644
--- a/django_common_task_system/program.py
+++ b/django_common_task_system/program.py
@@ -1,11 +1,14 @@
+import json
import threading
import logging
import enum
import os
-from typing import Callable, Optional
+from typing import Callable, Optional, Union
from docker.models.containers import Container
from django_common_task_system.cache_service import cache_agent
from django_common_task_system.choices import ContainerStatus
+from django_common_task_system.utils.logger import add_file_handler
+from django_common_task_system.log import PagedLog
class ProgramAction(str, enum.Enum):
@@ -23,14 +26,47 @@ class ContainerProgramAction(str, enum.Enum):
DESTROY = 'destroy'
+class RemoteContainer(Container):
+ def __init__(self, attrs):
+ super(RemoteContainer, self).__init__(attrs)
+
+ def start(self, **kwargs):
+ pass
+
+ def stop(self, **kwargs):
+ pass
+
+ def restart(self, **kwargs):
+ pass
+
+ def logs(self, **kwargs):
+ return ''
+
+ def remove(self, **kwargs):
+ pass
+
+
+class Key(str):
+ pass
+
+
+class MapKey(str):
+ pass
+
+
+class ListKey(str):
+ pass
+
+
class ProgramState(dict):
- def __init__(self, key):
+ def __init__(self, key: Union[MapKey, Key]):
super(ProgramState, self).__init__()
self.key = key
self.ident = None
self.is_running = False
self.engine = None
self.create_time = None
+ self.program_name = None
def __setattr__(self, key, value):
self[key] = value
@@ -42,27 +78,68 @@ def commit(self, **kwargs) -> dict:
return kwargs or self
def push(self, **kwargs):
- cache_agent.hset(self.key, mapping=kwargs)
+ if isinstance(self.key, Key):
+ cache_agent.hset(self.key, mapping=kwargs)
+ else:
+ cache_agent.hset(self.key, self.ident, json.dumps(kwargs))
def commit_and_push(self, **kwargs):
return self.push(**self.commit(**kwargs))
def pull(self):
- state = cache_agent.hgetall(self.key)
- for k, v in state.items():
- setattr(self, k, v)
+ if isinstance(self.key, Key):
+ state = cache_agent.hgetall(self.key)
+ else:
+ state = cache_agent.hget(self.key, self.ident)
+ if state:
+ state = json.loads(state)
+ if state:
+ for k, v in state.items():
+ setattr(self, k, v)
+
+ def delete(self):
+ if isinstance(self.key, Key):
+ cache_agent.delete(self.key)
+ else:
+ cache_agent.hdel(self.key, self.ident)
class Program:
state_class = ProgramState
- state_key = ''
+ state_key: Key = None
- def __init__(self, name=None, logger: logging.Logger = None):
- assert self.state_key, 'state_key must be set'
+ def __init__(self, name=None, container=None, logger: logging.Logger = None):
+ assert isinstance(self.state_key, Key), 'state_key type must be Key'
self._event = threading.Event()
self.state = self.state_class(self.state_key)
- self.program_name = name or self.__class__.__name__
- self.logger = logger or logging.getLogger(self.program_name.lower())
+ self.container: Optional[Container, RemoteContainer] = container
+ self._program_name = name
+ self._logger = logger
+ self._log_file = None
+
+ @property
+ def program_name(self):
+ if self._program_name:
+ name = self._program_name
+ elif self.container is not None:
+ name = self.container.name
+ else:
+ name = self.__class__.__name__
+ return name
+
+ @property
+ def log_file(self):
+ if self._log_file is None:
+ _ = self.logger
+ return self._log_file
+
+ @property
+ def logger(self):
+ if self._logger is None:
+ self._logger = logging.getLogger(self.program_name)
+ if self._log_file is None:
+ self._log_file = add_file_handler(self._logger)
+ return self._logger
@property
def is_running(self):
@@ -72,12 +149,17 @@ def is_running(self):
def program_id(self) -> int:
return os.getpid()
- def init_state(self, **kwargs):
+ def pre_started(self):
self.state.commit_and_push(
- name=self.program_name,
+ program_name=self.program_name,
is_running=False,
engine=self.__class__.__name__,
- **kwargs
+ )
+
+ def on_started(self):
+ self.state.commit_and_push(
+ is_running=True,
+ ident=self.program_id
)
def run(self) -> None:
@@ -91,50 +173,77 @@ def start_if_not_started(self) -> str:
if self.is_running:
error = '%s already started, pid: %s' % (self, self.program_id)
else:
- self.init_state()
+ self.pre_started()
start_program()
- self.state.commit_and_push(is_running=True, ident=self.program_id)
+ self.on_started()
self.logger.info('%s started, pid: %s' % (self, self.program_id))
error = ''
self.logger.info(error)
return error
- def stop(self):
+ def stop(self, destroy=False):
self._event.clear()
+ if self.container:
+ self.container.stop()
+ if destroy:
+ self.container.remove()
+ if destroy:
+ self.state.delete()
+ else:
+ self.state.commit_and_push(
+ is_running=False,
+ )
def read_log(self, page=0, page_size=10):
- raise NotImplementedError
+ if self.container:
+ return self.container.logs(tail=page_size)
+ else:
+ return PagedLog(self.log_file, page_size=page_size).read_page(page=page)
def __str__(self):
return "Program(%s)" % self.__class__.__name__
-class ContainerProgram(Program):
- def __init__(self, container=None):
- self.container: Optional[Container] = container
- super().__init__(name=getattr(container, 'name', None))
+class LocalProgram(Program):
def run(self) -> None:
raise NotImplementedError
- def read_log(self, page=0, page_size=1000):
- if self.container:
- return self.container.logs(tail=page_size)
-
- # def start(self):
- # self.container.start()
- #
- def stop(self):
- assert self.container, 'container must be set'
- self.container.stop()
- self.container.remove()
-
- def restart(self):
- self.container.restart()
+ def pre_started(self):
+ self.state.commit_and_push(
+ name=self.program_name,
+ is_running=False,
+ engine=self.__class__.__name__,
+ log_file=self.log_file
+ )
- @property
- def is_running(self):
- return self.container.status == ContainerStatus.RUNNING
+#
+# class ContainerProgram(Program):
+# def __init__(self, container=None):
+# self.container: Optional[Container, RemoteContainer] = container
+# super().__init__(name=getattr(container, 'name', None))
+#
+# def run(self) -> None:
+# raise NotImplementedError
+#
+# def read_log(self, page=0, page_size=1000):
+# if self.container:
+# return self.container.logs(tail=page_size)
+#
+# # def start(self):
+# # self.container.start()
+# #
+# def stop(self):
+# assert self.container, 'container must be set'
+# self.container.stop()
+# self.container.remove()
+#
+# def restart(self):
+# self.container.restart()
+#
+# @property
+# def is_running(self):
+# return self.container.status == ContainerStatus.RUNNING
class ProgramAgent:
diff --git a/django_common_task_system/system_task_execution/process.py b/django_common_task_system/system_task_execution/process.py
index 4f96d33..28cd87e 100644
--- a/django_common_task_system/system_task_execution/process.py
+++ b/django_common_task_system/system_task_execution/process.py
@@ -17,6 +17,6 @@ def run(self):
django.setup()
super(ScheduleConsumerProcess, self).run()
- def stop(self):
- super(ScheduleConsumerProcess, self).stop()
+ def stop(self, destroy=False):
+ super(ScheduleConsumerProcess, self).stop(destroy=destroy)
self.kill()
diff --git a/django_common_task_system/system_task_execution/system_task_execution/consumer.py b/django_common_task_system/system_task_execution/system_task_execution/consumer.py
index 7197fa5..2865294 100644
--- a/django_common_task_system/system_task_execution/system_task_execution/consumer.py
+++ b/django_common_task_system/system_task_execution/system_task_execution/consumer.py
@@ -6,7 +6,7 @@
from django_common_task_system.models import ExceptionReport
from django_common_task_system import get_schedule_log_model
from django_common_task_system.choices import ExecuteStatus
-from django_common_task_system.program import Program, ProgramState
+from django_common_task_system.program import LocalProgram, ProgramState, Key
from django_common_task_system.utils.logger import add_file_handler
from datetime import datetime
@@ -175,19 +175,13 @@ def __init__(self, key):
self.log_file = ''
-class Consumer(Program):
+class Consumer(LocalProgram):
state_class = ConsumerState
- state_key = 'consumer'
+ state_key = Key('consumer')
def __init__(self, queue: Queue):
super(Consumer, self).__init__(name='Consumer', logger=logger)
self.queue = queue
- self.log_file = add_file_handler(self.logger)
-
- def init_state(self):
- super(Consumer, self).init_state(
- log_file=self.log_file,
- )
def run(self):
queue = self.queue
diff --git a/django_common_task_system/system_task_execution/thread.py b/django_common_task_system/system_task_execution/thread.py
index 9e6290a..29b375b 100644
--- a/django_common_task_system/system_task_execution/thread.py
+++ b/django_common_task_system/system_task_execution/thread.py
@@ -13,7 +13,7 @@ def __init__(self):
def program_id(self) -> int:
return self.ident
- def stop(self) -> str:
+ def stop(self, destroy=False) -> str:
while self.is_alive():
time.sleep(0.5)
return ''
diff --git a/django_common_task_system/views.py b/django_common_task_system/views.py
index e152468..c1d8b4a 100644
--- a/django_common_task_system/views.py
+++ b/django_common_task_system/views.py
@@ -316,10 +316,7 @@ class UserConsumerView(APIView):
def get(self, request: Request, action: str):
if action == ContainerProgramAction.START:
- consumer = Consumer.objects.create()
- return Response(models.ConsumerSerializer(consumer).data)
- if action == 'register':
- return UserConsumerView.register_client(request)
+ return self.post(request, 'start')
consumer_id = request.GET.get('consumer_id', '')
if not consumer_id.isdigit():
return Response({'error': 'invalid consumer_id: %s' % consumer_id}, status=status.HTTP_400_BAD_REQUEST)
@@ -338,42 +335,35 @@ def get(self, request: Request, action: str):
return Response({'error': 'invalid action: %s, only support start/stop/log' % action},
status=status.HTTP_400_BAD_REQUEST)
- @staticmethod
- def register_client(request: Request):
- """
- container: Container = None
- group = models.CharField(max_length=100, verbose_name='分组')
- subscription_url = models.CharField(max_length=200, verbose_name='订阅地址')
- subscription_kwargs = models.JSONField(verbose_name='订阅参数', default=dict)
- client_id = models.IntegerField(verbose_name='客户端ID', primary_key=True, default=0)
- process_id = models.PositiveIntegerField(verbose_name='进程ID', null=True, blank=True)
- container_id = models.CharField(max_length=100, verbose_name='容器ID', blank=True, null=True)
- container_name = models.CharField(max_length=100, verbose_name='容器名称', blank=True, null=True)
- container_image = models.CharField(max_length=100, verbose_name='容器镜像', blank=True, null=True)
- container_status = models.CharField(choices=ContainerStatus.choices, default=ContainerStatus.NONE,
- max_length=20, verbose_name='容器状态')
- run_in_container = models.BooleanField(default=True, verbose_name='是否在容器中运行')
- env = models.CharField(max_length=500, verbose_name='环境变量', blank=True, null=True)
- startup_status = models.CharField(max_length=500, choices=TaskClientStatus.choices,
- verbose_name='启动结果', default=TaskClientStatus.SUCCEED)
- settings = models.TextField(verbose_name='配置', blank=True, null=True)
- create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
- startup_log = models.CharField(max_length=2000, null=True, blank=True)
- """
- data = request.data
- settings = data.get('settings')
- container_id = data.get('container_id')
- subscription_url = data.get('subscription_url')
- subscription_kwargs = data.get('subscription_kwargs')
- process_id = data.get('process_id')
- client = Consumer(container_id=container_id,
- subscription_url=subscription_url,
- startup_log='启动成功',
- settings=settings,
- process_id=process_id,
- )
- client.save()
- return Response('上报成功')
+ def post(self, request: Request, action: str):
+ if action == 'start':
+ data = {
+ 'consume_url': request.data.get('consume_url'),
+ }
+ elif action == 'register':
+ serializer = models.ConsumerSerializer(data=request.data)
+ serializer.context['request'] = request
+ serializer.is_valid(raise_exception=True)
+ serializer.save()
+ return Response(serializer.data)
+ else:
+ return Response({'error': 'invalid action: %s, only support start/register' % action},
+ status=status.HTTP_400_BAD_REQUEST)
+ # data = request.data
+ # settings = data.get('settings')
+ # machine_setting = data.get('machine')
+ # if not machine_setting:
+ # pass
+ # machine = models.Machine(**machine_setting)
+ # container_setting = data.get('container')
+ # consume_url = data.get('consume_url')
+ # consume_kwargs = data.get('consume_kwargs')
+ # consumer = Consumer(machine=machine,
+ # consume_url=consume_url,
+ # consume_kwargs=consume_kwargs,
+ # )
+ # consumer.save()
+ # return Response(models.ConsumerSerializer(consumer).data)
def log_view(request: Request, filename):