mirror of
				https://github.com/house-of-vanity/OutFleet.git
				synced 2025-10-25 09:49:08 +00:00 
			
		
		
		
	
		
			
	
	
		
			1 line
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			1 line
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | from django.core.management.base import BaseCommand\nfrom django.utils import timezone\nfrom vpn.models import User, ACLLink, UserStatistics\nfrom vpn.tasks import update_user_statistics\n\n\nclass Command(BaseCommand):\n    help = 'Initialize user statistics cache by running the update task'\n    \n    def add_arguments(self, parser):\n        parser.add_argument(\n            '--async',\n            action='store_true',\n            help='Run statistics update as async Celery task (default: sync)',\n        )\n        parser.add_argument(\n            '--force',\n            action='store_true',\n            help='Force update even if statistics already exist',\n        )\n    \n    def handle(self, *args, **options):\n        # Check if statistics already exist\n        existing_stats = UserStatistics.objects.count()\n        \n        if existing_stats > 0 and not options['force']:\n            self.stdout.write(\n                self.style.WARNING(\n                    f'Statistics cache already contains {existing_stats} entries. '\n                    'Use --force to update anyway.'\n                )\n            )\n            return\n        \n        # Check if there are users with ACL links\n        users_with_links = User.objects.filter(acl__isnull=False).distinct().count()\n        total_links = ACLLink.objects.count()\n        \n        self.stdout.write(\n            f'Found {users_with_links} users with {total_links} ACL links total'\n        )\n        \n        if total_links == 0:\n            self.stdout.write(\n                self.style.WARNING('No ACL links found. Nothing to process.')\n            )\n            return\n        \n        if options['async']:\n            # Run as async Celery task\n            try:\n                task = update_user_statistics.delay()\n                self.stdout.write(\n                    self.style.SUCCESS(\n                        f'Statistics update task started. Task ID: {task.id}'\n                    )\n                )\n                self.stdout.write(\n                    'Check admin panel Task Execution Logs for progress.'\n                )\n            except Exception as e:\n                self.stdout.write(\n                    self.style.ERROR(f'Failed to start async task: {e}')\n                )\n        else:\n            # Run synchronously\n            self.stdout.write('Starting synchronous statistics update...')\n            \n            try:\n                # Import and call the task function directly\n                from vpn.tasks import update_user_statistics\n                \n                # Create a mock Celery request object for the task\n                class MockRequest:\n                    id = f'manual-{timezone.now().isoformat()}'\n                    retries = 0\n                \n                # Create mock task instance\n                task_instance = type('MockTask', (), {\n                    'request': MockRequest(),\n                })()\n                \n                # Call the task function directly\n                result = update_user_statistics(task_instance)\n                \n                self.stdout.write(\n                    self.style.SUCCESS(f'Statistics update completed: {result}')\n                )\n                \n                # Show summary\n                final_stats = UserStatistics.objects.count()\n                self.stdout.write(\n                    self.style.SUCCESS(\n                        f'Statistics cache now contains {final_stats} entries'\n                    )\n                )\n                \n            except Exception as e:\n                self.stdout.write(\n                    self.style.ERROR(f'Statistics update failed: {e}')\n                )\n                import traceback\n                self.stdout.write(traceback.format_exc())\n |