Skip to content

Commit

Permalink
统一Program操作
Browse files Browse the repository at this point in the history
  • Loading branch information
cone387 committed Oct 19, 2023
1 parent a6fe53c commit d6112df
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 183 deletions.
35 changes: 20 additions & 15 deletions django_common_task_system/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def get_readonly_fields(self, request, obj=None):


class ConsumerAdmin(admin.ModelAdmin):
list_display = ('consumer_id', 'machine',
list_display = ('consumer_id', 'admin_machine',
'program_state',
'admin_consume_url',
'consume_status', 'program_status',
Expand Down Expand Up @@ -428,13 +428,14 @@ class ConsumerAdmin(admin.ModelAdmin):
# list_filter = ('runner_status',)
readonly_fields = ('create_time', )

def machine(self, obj: models.Consumer):
def admin_machine(self, obj: models.Consumer):
attrs = [
'<b>IP</b>: %s' % obj.machine_ip,
'<b>主机名</b>: %s' % obj.machine_name,
'<b>主机名</b>: %s' % obj.machine.hostname,
'<b>内网IP</b>: %s' % obj.machine.intranet_ip,
'<b>外网IP</b>: %s' % obj.machine.internet_ip,
]
return format_html('<span style="line-height: 2">%s</span>' % '<br>'.join(attrs) if attrs else '-')
machine.short_description = '机器'
admin_machine.short_description = '机器'

def program_status(self, obj: models.Consumer):
return obj.program is not None and obj.program.is_running
Expand All @@ -458,11 +459,14 @@ def program_state(self, obj):

def admin_consume_url(self, obj: models.Consumer):
url = urlparse(obj.consume_url)
return format_html(
'<a href="%s" target="_blank">%s</a>' % (
obj.consume_url, url.path
try:
return format_html(
'<a href="%s" target="_blank">%s</a>' % (
obj.consume_url, url.path
)
)
)
except Exception as e:
return str(e)
admin_consume_url.short_description = '消费地址'

def action(self, obj: models.Consumer):
Expand Down Expand Up @@ -731,26 +735,27 @@ def get_queryset(self, request):
consumer_agent.state.pull()
state = consumer_agent.state
consumer_state = {
'进程ID': state.ident,
'进程状态': '运行中' if state.is_running else '已停止',
'程序ID': state.ident,
"程序名称": state.program_name,
'程序状态': '运行中' if state.is_running else '已停止',
'已处理计划数量': (state.succeed_count + state.failed_count),
'成功计划数量': state.succeed_count,
'失败计划数量': state.failed_count,
'最近处理时间': state.last_process_time,
'日志文件': state.log_file.replace(os.getcwd(), '')
}
model.objects['consumer'] = model(
name="系统计划处理进程",
name="系统计划消费线程",
state=consumer_state,
position=1
)

producer_agent.state.pull()
state = producer_agent.state
producer_state = {
"运行ID": state.ident,
"线程名称": state.name,
"线程状态": "运行中" if state.is_running else "已停止",
"程序ID": state.ident,
"程序名称": state.program_name,
"程序状态": "运行中" if state.is_running else "已停止",
"已调度计划数量": state.scheduled_count,
"最近调度时间": state.last_schedule_time,
"日志文件": state.log_file.replace(os.getcwd(), '')
Expand Down
11 changes: 8 additions & 3 deletions django_common_task_system/choices.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from django.db.models import TextChoices
import queue
import os
from django.db.models import TextChoices, IntegerChoices


class TaskStatus(TextChoices):
Expand Down Expand Up @@ -72,6 +70,13 @@ class ProgramType(TextChoices):
PROCESS = 'Process'


class ProgramSource(IntegerChoices):
REPORT = 1, '主动上报'
DETECT = 2, '被动检测'
ADMIN = 3, '管理员添加'
API = 4, 'API添加'


class ConsumeStatus(TextChoices):
# start status
INIT = 'Init', '初始化'
Expand Down
17 changes: 9 additions & 8 deletions django_common_task_system/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from threading import Thread
from docker.models.containers import Container
from datetime import datetime
from django_common_task_system.program import ContainerProgram, ProgramAgent, ProgramState
from django_common_task_system.program import Program, ProgramAgent, ProgramState, Key, MapKey, ListKey
import traceback
import docker

Expand All @@ -28,17 +28,14 @@ def __init__(self, key):
self.container: Optional[Container] = None


class ConsumerProgram(ContainerProgram):
class ConsumerProgram(Program):
state_class = ConsumerState
state_key = MapKey('consumers')

def __init__(self, model: Consumer, container=None):
self.model = model
self.state_key = 'consumer_%s' % model.consumer_id
super().__init__(container=container)

def init_state(self, **kwargs):
pass

@property
def program_id(self) -> int:
return self.model.consumer_id
Expand All @@ -63,7 +60,11 @@ def load_from_container(cls, container: Container):
@classmethod
def load_consumer_from_cache(cls, cache: dict):
program = cache.pop('program')
machine = Machine(**cache.pop('machine'))
machine_cache = cache.pop('machine')
if machine_cache:
machine = Machine(**machine_cache)
else:
machine = Machine.objects.local
consumer = Consumer(machine=machine, **cache)
if program:
container_cache = program.get('container', {})
Expand Down Expand Up @@ -123,7 +124,7 @@ def run(self):
model.startup_log = traceback.format_exc()
model.save()

def stop(self):
def stop(self, destroy=False):
if isinstance(self.container, Container):
self.container.stop()
# self.container.remove()
Expand Down
42 changes: 6 additions & 36 deletions django_common_task_system/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.contrib.admin import widgets
from django.utils.module_loading import import_string
from django_common_task_system.choices import (
ScheduleType, ScheduleTimingType, ScheduleStatus, TaskStatus)
ScheduleType, ScheduleTimingType, ScheduleStatus, TaskStatus, ProgramSource)
from django_common_objects.widgets import JSONWidget
from django_common_task_system.utils import foreign_key
from datetime import datetime, time as datetime_time
Expand All @@ -18,7 +18,6 @@
from django_common_task_system.utils.cache import ttl_cache
from .choices import ProgramType
from . import models
from .builtins import builtins
from . import get_schedule_model, get_task_model
from .fields import (NLPSentenceWidget, PeriodScheduleFiled, OnceScheduleField, MultiWeekdaySelectFiled,
MultiMonthdaySelectFiled, MultiYearDaySelectWidget, MultiDaySelectField, PeriodWidget,
Expand Down Expand Up @@ -398,43 +397,13 @@ def __init__(self, *args, **kwargs):
path = reverse('schedule-get', kwargs={'code': obj.code})
consume_url_choices.append((path, obj.name))
self.fields['consume_url'].choices = consume_url_choices
intranet_ip = ttl_cache()(ip_utils.get_intranet_ip)()
internet_ip = self.get_internet_ip()
ip_choices = (
(intranet_ip, "%s(内网)" % intranet_ip),
(internet_ip, "%s(外网)" % internet_ip),
('127.0.0.1', '127.0.0.1')
)
ip_choices = [(models.Machine.localhost_ip, '127.0.0.1(本机)')]
for machine in models.Machine.objects.all():
ip_choices.append((machine.intranet_ip, "%s(%s)内网" % (machine.intranet_ip, machine.hostname)))
ip_choices.append((machine.internet_ip, "%s(%s)外网" % (machine.internet_ip, machine.hostname)))
self.fields['consume_host'].choices = ip_choices
# self.fields['machine'].choices = self.get_machine_choices()
self.initial['consume_port'] = os.environ['DJANGO_SERVER_ADDRESS'].split(':')[-1]

@staticmethod
def get_machine_choices():
machines = {}
for o in models.Consumer.objects.all():
machines.setdefault(o.machine_ip, []).append(o)
choices = []
for ip, clients in machines.items():
choices.append(("%s-%s" % (ip, clients[0].machine_name),
"%s(%s)%s clients running" % (ip, clients[0].machine_name, len(clients))))
intranet_ip = ttl_cache()(ip_utils.get_intranet_ip)()
internet_ip = ConsumerForm.get_internet_ip()
for ip in (intranet_ip, internet_ip, '127.0.0.1'):
if ip in machines:
break
else:
choices.append(('127.0.0.1-本机', "127.0.0.1(本机)0 clients running"))
return choices

@staticmethod
@ttl_cache()
def get_internet_ip():
try:
return ip_utils.get_internet_ip()
except Exception as e:
return "获取失败: %s" % str(e)[:50]

@staticmethod
def validate_setting(setting_str, setting_dict):
try:
Expand Down Expand Up @@ -464,6 +433,7 @@ def clean(self):
raise forms.ValidationError('command is required for mysql consume')
consumer.program_type = cleaned_data['program_type']
consumer.machine = cleaned_data['machine']
consumer.program_source = ProgramSource.ADMIN
if consumer.program_type == ProgramType.DOCKER:
consumer.program_setting = {
'image': cleaned_data['container_image'],
Expand Down
88 changes: 73 additions & 15 deletions django_common_task_system/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from django_common_task_system.choices import (
TaskStatus, ScheduleStatus, ScheduleCallbackStatus,
ScheduleCallbackEvent, ScheduleQueueModule, ConsumeStatus, ProgramType,
PermissionType, ExecuteStatus, ScheduleExceptionReason
PermissionType, ExecuteStatus, ScheduleExceptionReason, ProgramSource
)
from django_common_objects.models import CommonTag, CommonCategory
from django_common_objects import fields as common_fields
Expand All @@ -20,8 +20,9 @@
from django.utils.functional import cached_property
from django_common_task_system.cache_service import cache_agent
from functools import cmp_to_key
from django_common_task_system.program import ContainerProgram
from django_common_task_system.program import Program, RemoteContainer
from rest_framework import serializers
from django_common_task_system.utils import ip as ip_util
import os
import time
import re
Expand Down Expand Up @@ -258,7 +259,7 @@ class Query:
order_by = []
select_related = False

def using(self, alias):
def using(self, _):
return self

def all(self):
Expand Down Expand Up @@ -392,45 +393,90 @@ def delete(self, consumer: 'Consumer'):
consumer.program.stop()
cache_agent.hdel(self.cache_key, consumer.consumer_id)

@staticmethod
def create(consume_url=None, program=None, program_source=ProgramSource.REPORT, **kwargs):
for field in Consumer._meta.fields:
if field.name not in kwargs and field.default is not models.fields.NOT_PROVIDED:
if callable(field.default):
kwargs[field.name] = field.default()
else:
kwargs[field.name] = field.default
machine = kwargs.pop('machine', None)
if program_source == ProgramSource.REPORT:
assert machine is not None, 'machine is required when program_source is REPORT'
assert consume_url is not None, 'consume_url is required when program_source is REPORT'
machine = Machine(**machine)
else:
if machine is None:
machine = Machine.objects.local
if consume_url is None:
consume_url = 'http://%s' % machine.localhost_ip
consumer = Consumer(consume_url=consume_url, machine=machine, **kwargs)
consumer.program = program
consumer.save()
return consumer


class MachineManager(CustomManager):
_local = None

@property
def local(self):
if self._local is None:
for _ in range(3):
try:
internet_ip = ip_util.get_internet_ip()
break
except Exception:
continue
else:
internet_ip = None
self._local = Machine(hostname='本机', intranet_ip=ip_util.get_intranet_ip(),
internet_ip=internet_ip, group='默认')
return self._local

def all(self):
return QuerySet([
Machine(name='本机', ip='127.0.0.1', group='默认'),
], Machine)
return QuerySet([self.local, *dict.values(self)], Machine)


class Machine(models.Model):
name = models.CharField(max_length=100, verbose_name='机器名', primary_key=True)
ip = models.GenericIPAddressField(max_length=100, verbose_name='机器IP', default='127.0.0.1')
hostname = models.CharField(max_length=100, verbose_name='机器名')
intranet_ip = models.GenericIPAddressField(max_length=100, verbose_name='内网IP')
internet_ip = models.GenericIPAddressField(max_length=100, verbose_name='外网IP', primary_key=True)
group = models.CharField(max_length=100, verbose_name='分组', default='默认')

objects = MachineManager()

@property
def localhost_ip(self):
return "127.0.0.1"

class Meta:
managed = False
verbose_name = verbose_name_plural = '机器管理'

def __str__(self):
return "%s(%s)" % (self.name, self.ip)
return "%s(%s)" % (self.hostname, self.intranet_ip)

def save(
self, force_insert=False, force_update=False, using=None, update_fields=None
):
Machine.objects[self.internet_ip] = self


class Consumer(models.Model):
# 两种运行模式: 容器模式,进程模式
program: ContainerProgram = None
program: Program = None
machine = models.ForeignKey(Machine, on_delete=models.DO_NOTHING, db_constraint=False, verbose_name='机器')
consumer_id = models.IntegerField(verbose_name='客户端ID', primary_key=True)
# machine_name = models.CharField(max_length=100, verbose_name='机器名', default='本机')
# machine_ip = models.GenericIPAddressField(max_length=100, verbose_name='机器IP', default='127.0.0.1')
# group = models.CharField(max_length=100, verbose_name='分组', default='默认')
consume_url = models.CharField(max_length=200, verbose_name='订阅地址')
consume_kwargs = models.JSONField(verbose_name='订阅参数', default=dict)
program_type = models.CharField(max_length=100, verbose_name='运行引擎', choices=ProgramType.choices,
default=ProgramType.DOCKER)
program_setting = models.JSONField(verbose_name='引擎设置', default=dict)
program_env = models.CharField(max_length=500, verbose_name='环境变量', blank=True, null=True)
consume_status = models.CharField(max_length=500, choices=ConsumeStatus.choices,
program_source = models.IntegerField(verbose_name='程序来源', default=ProgramSource.REPORT)
consume_status = models.CharField(max_length=500, choices=ConsumeStatus.choices,
verbose_name='启动结果', default=ConsumeStatus.RUNNING)
startup_log = models.CharField(max_length=2000, null=True, blank=True)
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
Expand Down Expand Up @@ -627,14 +673,18 @@ class Meta:


class MachineSerializer(serializers.ModelSerializer):
internet_ip = serializers.IPAddressField(read_only=True)

class Meta:
fields = '__all__'
model = Machine


class ConsumerSerializer(serializers.ModelSerializer):
program = serializers.SerializerMethodField()
program = serializers.SerializerMethodField(label='程序')
machine = MachineSerializer()
consumer_id = serializers.IntegerField(read_only=True)
program_source = serializers.IntegerField(required=False, default=ProgramSource.REPORT)

@staticmethod
def get_program(obj: Consumer):
Expand All @@ -657,6 +707,14 @@ def get_program(obj: Consumer):
}
return None

def create(self, validated_data):
if validated_data['program_source'] == ProgramSource.REPORT:
validated_data['machine']['internet_ip'] = self.context['request'].META.get('REMOTE_ADDR')
program = Program(container=RemoteContainer(self.initial_data['program']))
else:
program = None
return Consumer.objects.create(program=program, **validated_data)

class Meta:
fields = '__all__'
model = Consumer
Loading

0 comments on commit d6112df

Please sign in to comment.