diff --git a/django_common_task_system/admin.py b/django_common_task_system/admin.py index 6eb8631..66a1ea8 100644 --- a/django_common_task_system/admin.py +++ b/django_common_task_system/admin.py @@ -382,12 +382,13 @@ class ConsumerAdmin(admin.ModelAdmin): 'program_state', 'admin_consume_url', 'consume_status', 'program_status', + 'program_source', 'action', 'create_time') form = forms.ConsumerForm fieldsets = ( (None, { 'fields': ( - ('machine', 'program_type') + ('machine', 'program_type', 'program_source') ), }), ("容器配置", { diff --git a/django_common_task_system/consumer.py b/django_common_task_system/consumer.py index b74f81a..3a8ea34 100644 --- a/django_common_task_system/consumer.py +++ b/django_common_task_system/consumer.py @@ -30,7 +30,7 @@ def __init__(self, key): class ConsumerProgram(Program): state_class = ConsumerState - state_key = MapKey('consumers') + state_key = MapKey('consumer-programs') def __init__(self, model: Consumer, container=None): self.model = model @@ -54,7 +54,7 @@ def load_from_container(cls, container: Container): consume_kwargs=kwargs, create_time=datetime.strptime(container.attrs['Created'].split('.')[0], "%Y-%m-%dT%H:%M:%S"), ) - consumer.program = cls(model=consumer, container=container) + # consumer.program = cls(model=consumer, container=container) consumer.save() @classmethod @@ -90,7 +90,7 @@ def _run(self): settings_file = '/mnt/task-system-client-settings.py' command = f'common-task-system-client --subscription-url="{model.consume_url}" --settings="{settings_file}"' try: - container = docker_client.containers.create( + self.container = docker_client.containers.create( setting.image, command=command, name=setting.name, volumes=[f"{model.settings_file}:{settings_file}"], detach=True @@ -106,30 +106,29 @@ def _run(self): pass else: raise RuntimeError('pull image failed: %s' % setting.image) - container = docker_client.containers.create(setting.image, + self.container = docker_client.containers.create(setting.image, command=command, name=setting.name, detach=True) - container.start() - container = docker_client.containers.get(container.short_id) - self.container = container + self.container.start() + self.container = docker_client.containers.get(self.container.short_id) def run(self): model = self.model - model.program = self + # model.program = self try: self._run() model.consume_status = ConsumeStatus.RUNNING except Exception as _: model.consume_status = ConsumeStatus.FAILED model.startup_log = traceback.format_exc() - model.save() + model.save(commit=False) def stop(self, destroy=False): if isinstance(self.container, Container): self.container.stop() # self.container.remove() self.model.consume_status = ConsumeStatus.STOPPED - self.model.save() + self.model.save(commit=False) def read_log(self, page=0, page_size=1000): if self.model.consume_status == ConsumeStatus.RUNNING.value: diff --git a/django_common_task_system/forms.py b/django_common_task_system/forms.py index efd382b..7793d47 100644 --- a/django_common_task_system/forms.py +++ b/django_common_task_system/forms.py @@ -354,6 +354,7 @@ def __init__(self, attrs=None): class ConsumerForm(forms.ModelForm): + program_source = forms.CharField(initial=ProgramSource.ADMIN.value, widget=forms.HiddenInput()) consume_url = forms.ChoiceField(label='订阅地址', required=False) consume_scheme = forms.ChoiceField(label='订阅Scheme', choices={x: x for x in ['http', 'https']}.items()) consume_host = forms.ChoiceField(label='订阅Host') @@ -397,11 +398,13 @@ def __init__(self, *args, **kwargs): path = reverse('schedule-get', kwargs={'code': obj.code}) consume_url_choices.append((path, obj.name)) self.fields['consume_url'].choices = consume_url_choices - ip_choices = [(models.Machine.localhost_ip, '127.0.0.1(本机)')] + local = models.Machine.objects.local + ip_choices = [(local.localhost_ip, '%s(%s)' % (local.localhost_ip, local.hostname))] for machine in models.Machine.objects.all(): ip_choices.append((machine.intranet_ip, "%s(%s)内网" % (machine.intranet_ip, machine.hostname))) ip_choices.append((machine.internet_ip, "%s(%s)外网" % (machine.internet_ip, machine.hostname))) self.fields['consume_host'].choices = ip_choices + self.initial['machine'] = models.Machine.objects.local self.initial['consume_port'] = os.environ['DJANGO_SERVER_ADDRESS'].split(':')[-1] @staticmethod @@ -433,7 +436,7 @@ def clean(self): raise forms.ValidationError('command is required for mysql consume') consumer.program_type = cleaned_data['program_type'] consumer.machine = cleaned_data['machine'] - consumer.program_source = ProgramSource.ADMIN + consumer.program_source = cleaned_data['program_source'] if consumer.program_type == ProgramType.DOCKER: consumer.program_setting = { 'image': cleaned_data['container_image'], diff --git a/django_common_task_system/models.py b/django_common_task_system/models.py index 38f4d6e..038824b 100644 --- a/django_common_task_system/models.py +++ b/django_common_task_system/models.py @@ -20,7 +20,7 @@ from django.utils.functional import cached_property from django_common_task_system.cache_service import cache_agent from functools import cmp_to_key -from django_common_task_system.program import Program, RemoteContainer +# from django_common_task_system.program import Program, RemoteContainer, ProgramManager from rest_framework import serializers from django_common_task_system.utils import ip as ip_util import os @@ -356,29 +356,47 @@ def __get__(self, instance, owner): return self._meta.managers_map[self.manager.name] -class ConsumerManager(CustomManager): - cache_key = 'consumers' +class CacheManager(CustomManager): + cache_key = None + + @property + def serializer_class(self): + raise NotImplementedError def all(self): - from .consumer import ConsumerProgram - consumers = [] + objects = [] consumer_mapping = cache_agent.hgetall(self.cache_key) if consumer_mapping: - for k, v in consumer_mapping.items(): - consumers.append(ConsumerProgram.load_consumer_from_cache(v)) - return QuerySet(consumers, Consumer) + for k, item in consumer_mapping.items(): + if isinstance(item, str): + item = json.loads(item) + serializer = self.serializer_class(data=item) + serializer.is_valid(raise_exception=True) + objects.append(serializer.save(commit=False)) + return QuerySet(objects, self.model) + + def add(self, obj: models.Model): + cache_agent.hset(self.cache_key, mapping={ + obj.pk: self.serializer_class(obj).data, + }) def get(self, pk, default=None): - string = cache_agent.hget(self.cache_key, pk) - if string: - from .consumer import ConsumerProgram - return ConsumerProgram.load_consumer_from_cache(json.loads(string)) + cache = cache_agent.hget(self.cache_key, pk) + if cache: + if isinstance(cache, str): + cache = json.loads(cache) + serializer = self.serializer_class(data=cache) + serializer.is_valid(raise_exception=True) + return serializer.save(commit=False) return default - def add(self, consumer): - cache_agent.hset(self.cache_key, mapping={ - consumer.consumer_id: ConsumerSerializer(consumer).data, - }) + +class ConsumerManager(CacheManager): + cache_key = 'consumers' + + @property + def serializer_class(self): + return ConsumerSerializer def update_or_create(self, consumer_id, **kwargs): super(ConsumerManager, self).update_or_create() @@ -394,26 +412,27 @@ def delete(self, consumer: 'Consumer'): cache_agent.hdel(self.cache_key, consumer.consumer_id) @staticmethod - def create(consume_url=None, program=None, program_source=ProgramSource.REPORT, **kwargs): + def create(program=None, commit=True, **kwargs): for field in Consumer._meta.fields: if field.name not in kwargs and field.default is not models.fields.NOT_PROVIDED: if callable(field.default): kwargs[field.name] = field.default() else: kwargs[field.name] = field.default + program_source = kwargs.pop('program_source', ProgramSource.REPORT) machine = kwargs.pop('machine', None) + if machine is not None: + machine = Machine(**machine) if program_source == ProgramSource.REPORT: assert machine is not None, 'machine is required when program_source is REPORT' - assert consume_url is not None, 'consume_url is required when program_source is REPORT' - machine = Machine(**machine) + assert kwargs.get('consume_url'), 'consume_url is required when program_source is REPORT' else: if machine is None: machine = Machine.objects.local - if consume_url is None: - consume_url = 'http://%s' % machine.localhost_ip - consumer = Consumer(consume_url=consume_url, machine=machine, **kwargs) - consumer.program = program - consumer.save() + if not kwargs.get('consume_url'): + kwargs['consume_url'] = 'http://%s' % str(machine.localhost_ip) + consumer = Consumer(machine=machine, program_source=program_source, **kwargs) + consumer.save(commit=commit) return consumer @@ -464,9 +483,40 @@ def save( Machine.objects[self.internet_ip] = self +class ProgramManager(CacheManager): + cache_key = 'programs' + + @property + def serializer_class(self): + return ProgramSerializer + + +class Program(models.Model): + program_id = models.IntegerField(verbose_name='程序ID', primary_key=True) + container = models.JSONField(verbose_name='容器信息', null=True, blank=True) + program_name = models.CharField(max_length=100, verbose_name='程序名') + create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') + + objects = ProgramManager() + + class Meta: + managed = False + verbose_name = verbose_name_plural = '程序管理' + + def __str__(self): + return str(self.program_id) + + def save( + self, force_insert=False, force_update=False, using=None, update_fields=None + ): + Program.objects[self.program_id] = self + + class Consumer(models.Model): # 两种运行模式: 容器模式,进程模式 - program: Program = None + # program: Program = None + program = models.ForeignKey(Program, on_delete=models.DO_NOTHING, db_constraint=False, + verbose_name='程序', null=True, blank=True) machine = models.ForeignKey(Machine, on_delete=models.DO_NOTHING, db_constraint=False, verbose_name='机器') consumer_id = models.IntegerField(verbose_name='客户端ID', primary_key=True) consume_url = models.CharField(max_length=200, verbose_name='订阅地址') @@ -475,10 +525,11 @@ class Consumer(models.Model): default=ProgramType.DOCKER) program_setting = models.JSONField(verbose_name='引擎设置', default=dict) program_env = models.CharField(max_length=500, verbose_name='环境变量', blank=True, null=True) - program_source = models.IntegerField(verbose_name='程序来源', default=ProgramSource.REPORT) + program_source = models.IntegerField(verbose_name='程序来源', default=ProgramSource.REPORT, + choices=ProgramSource.choices) consume_status = models.CharField(max_length=500, choices=ConsumeStatus.choices, verbose_name='启动结果', default=ConsumeStatus.RUNNING) - startup_log = models.CharField(max_length=2000, null=True, blank=True) + startup_log = models.TextField(null=True, blank=True) create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') setting = models.JSONField(verbose_name='消费端设置', default=dict) @@ -491,6 +542,11 @@ class Meta: def __str__(self): return str(self.consumer_id) + # @property + # def program(self): + # from django_common_task_system.consumer import ConsumerProgram + # return ProgramManager(program_class=ConsumerProgram).get(self.consumer_id) + @cached_property def settings_file(self): tmp_path = os.path.join(os.getcwd(), "tmp") @@ -506,23 +562,25 @@ def log_file(self): return os.path.join(log_path, "log_%s.log" % self.consumer_id) def save( - self, force_insert=False, force_update=False, using=None, update_fields=None + self, commit=True, force_insert=False, force_update=False, using=None, update_fields=None ): if not self.consumer_id: # 使用毫秒级时间戳作为consumer_id self.consumer_id = int(time.time() * 1000) - Consumer.objects.add(self) - # consumer不可更新,只能创建和删除 - if self.program is None: - self.create_time = timezone.now() - post_save.send( - sender=Consumer, - instance=self, - created=True, - update_fields=update_fields, - raw=False, - using=using, - ) + if commit: + Consumer.objects.add(self) + + # consumer不可更新,只能创建和删除 + if self.program is None: + self.create_time = timezone.now() + post_save.send( + sender=Consumer, + instance=self, + created=True, + update_fields=update_fields, + raw=False, + using=using, + ) def delete(self, using=None, keep_parents=False): Consumer.objects.delete(self) @@ -673,17 +731,23 @@ class Meta: class MachineSerializer(serializers.ModelSerializer): - internet_ip = serializers.IPAddressField(read_only=True) + internet_ip = serializers.IPAddressField(required=False, allow_blank=True, allow_null=True) class Meta: fields = '__all__' model = Machine +class ProgramSerializer(serializers.ModelSerializer): + class Meta: + fields = '__all__' + model = Program + + class ConsumerSerializer(serializers.ModelSerializer): program = serializers.SerializerMethodField(label='程序') machine = MachineSerializer() - consumer_id = serializers.IntegerField(read_only=True) + consumer_id = serializers.IntegerField(required=False, allow_null=True) program_source = serializers.IntegerField(required=False, default=ProgramSource.REPORT) @staticmethod @@ -707,13 +771,44 @@ def get_program(obj: Consumer): } return None + def save(self, **kwargs): + assert hasattr(self, '_errors'), ( + 'You must call `.is_valid()` before calling `.save()`.' + ) + + assert not self.errors, ( + 'You cannot call `.save()` on a serializer with invalid data.' + ) + + assert not hasattr(self, '_data'), ( + "You cannot call `.save()` after accessing `serializer.data`." + "If you need to access data before committing to the database then " + "inspect 'serializer.validated_data' instead. " + ) + + validated_data = {**self.validated_data, **kwargs} + + if self.instance is not None: + self.instance = self.update(self.instance, validated_data) + assert self.instance is not None, ( + '`update()` did not return an object instance.' + ) + else: + self.instance = self.create(validated_data) + assert self.instance is not None, ( + '`create()` did not return an object instance.' + ) + + return self.instance + def create(self, validated_data): - if validated_data['program_source'] == ProgramSource.REPORT: + commit = validated_data.pop('commit', True) + if commit and validated_data['program_source'] == ProgramSource.REPORT: validated_data['machine']['internet_ip'] = self.context['request'].META.get('REMOTE_ADDR') - program = Program(container=RemoteContainer(self.initial_data['program'])) - else: - program = None - return Consumer.objects.create(program=program, **validated_data) + # program = Program(container=RemoteContainer(self.initial_data['program'])) + # else: + program = None + return Consumer.objects.create(program=program, commit=commit, **validated_data) class Meta: fields = '__all__' diff --git a/django_common_task_system/program.py b/django_common_task_system/program.py index 0de8376..1403d42 100644 --- a/django_common_task_system/program.py +++ b/django_common_task_system/program.py @@ -3,7 +3,7 @@ import logging import enum import os -from typing import Callable, Optional, Union +from typing import Callable, Optional, Union, List, Dict from docker.models.containers import Container from django_common_task_system.cache_service import cache_agent from django_common_task_system.choices import ContainerStatus @@ -67,6 +67,8 @@ def __init__(self, key: Union[MapKey, Key]): self.engine = None self.create_time = None self.program_name = None + self.program_class = None + self.container = None def __setattr__(self, key, value): self[key] = value @@ -84,7 +86,11 @@ def push(self, **kwargs): cache_agent.hset(self.key, self.ident, json.dumps(kwargs)) def commit_and_push(self, **kwargs): - return self.push(**self.commit(**kwargs)) + if isinstance(self.key, Key): + return self.push(**self.commit(**kwargs)) + else: + self.commit(**kwargs) + return self.push(**self) def pull(self): if isinstance(self.key, Key): @@ -104,12 +110,56 @@ def delete(self): cache_agent.hdel(self.key, self.ident) +class ContainerState(dict): + def __init__(self, container: Container = None, state: Dict = None): + super(ContainerState, self).__init__() + assert container or state, 'container or state must be set' + if container: + config = container.attrs['Config'] + self.name = container.name + self.id = container.id + self.short_id = container.short_id + self.status = container.status + self.environment = config['Env'] + self.command = config['Cmd'] + self.image = config['Image'] + self.volume = config['Volumes'] + self.create_time = container.attrs['Created'] + else: + for k, v in state.items(): + setattr(self, k, v) + + def __setattr__(self, key, value): + self[key] = value + super(ContainerState, self).__setattr__(key, value) + + def to_attrs(self): + return { + 'Id': self.id, + 'Created': self.create_time, + 'State': { + 'Status': self.status, + }, + 'Config': { + 'Image': self.image, + 'Cmd': self.command, + 'Env': self.environment, + 'Volumes': self.volume, + }, + } + + def to_container(self): + return RemoteContainer(self.to_attrs()) + + class Program: state_class = ProgramState state_key: Key = None def __init__(self, name=None, container=None, logger: logging.Logger = None): - assert isinstance(self.state_key, Key), 'state_key type must be Key' + assert isinstance(self.state_key, (Key, MapKey, ListKey)), \ + '%s.state_key type must be (Key, MapKey, ListKey), current is %s' % ( + self.__class__.__name__, self.state_key) self._event = threading.Event() self.state = self.state_class(self.state_key) self.container: Optional[Container, RemoteContainer] = container @@ -117,6 +167,20 @@ def __init__(self, name=None, container=None, logger: logging.Logger = None): self._logger = logger self._log_file = None + @classmethod + def load_from_state(cls, state: dict): + container_state = state.pop('container', None) + if container_state: + container = ContainerState(state=container_state).to_container() + else: + container = None + program_class = state.pop('program_class', None) + package, program_class = program_class.rsplit('.', 1) + program_package = __import__(package, fromlist=[program_class]) + program_class = getattr(program_package, program_class) + program = program_class(container=container) + return program + @property def program_name(self): if self._program_name: @@ -154,13 +218,21 @@ def pre_started(self): program_name=self.program_name, is_running=False, engine=self.__class__.__name__, + program_class=self.__class__.__module__ + '.' + self.__class__.__name__, ) def on_started(self): - self.state.commit_and_push( - is_running=True, - ident=self.program_id - ) + if self.container: + self.state.commit_and_push( + container=ContainerState(self.container), + is_running=self.container.status == ContainerStatus.RUNNING.lower(), + ident=self.program_id + ) + else: + self.state.commit_and_push( + is_running=True, + ident=self.program_id + ) def run(self) -> None: raise NotImplementedError @@ -217,6 +289,35 @@ def pre_started(self): log_file=self.log_file ) + +class ProgramManager: + default_state_key = 'programs' + + def __init__(self, key: Union[MapKey, str] = None, program_class=None): + if key is None: + key = program_class.state_key + if not key: + key = self.default_state_key + if isinstance(key, str): + key = MapKey(key) + assert isinstance(program_class.state_key, MapKey), 'state_key type must be MapKey' + self.state_key = key + + def all(self) -> List[Program]: + programs = cache_agent.hgetall(self.state_key) + if programs: + return [Program.load_from_state(json.loads(program)) for program in programs.values()] + else: + return [] + + def get(self, program_id) -> Optional[Program]: + program = cache_agent.hget(self.state_key, program_id) + if program: + return Program.load_from_state(json.loads(program)) + + def add(self, program: Program) -> None: + program.state.push() + # # class ContainerProgram(Program): # def __init__(self, container=None):