重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要讲解了“怎么用python + Element实现作业任务Job操作”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用python + Element实现作业任务Job操作”吧!
创新互联是专业的兰山网站建设公司,兰山接单;提供成都网站设计、做网站,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行兰山网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
修改 删除 立即 周期-----每隔 分钟执行一次 服务器"> 立即创建 取消 立即 周期-----每隔 分钟执行一次 立即创建 取消
urls.py
文件内容
from django.conf.urls import patterns from home_application.temp import views as temp_view from home_application.job import views as job_view urlpatterns = patterns( 'home_application.views', (r'^$', job_view.job), (r'^get_biz_list/$', job_view.get_biz_list), (r'^get_bk_hosts/$', job_view.get_bk_hosts), (r'^temp_view/$', temp_view.TemplateView.as_view()), (r'^job_view/$', job_view.JobView.as_view()), ... )
job\views.py
文件内容
import json from django.views.generic import View from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from django.http import JsonResponse from common.mymako import render_mako_context from home_application.models import Job def cc_search_host(): """ 根据条件查询主机 :return: """ url = "{0}/api/c/compapi/v2/cc/search_host/".format(BK_PAAS_HOST) kwargs = { "bk_app_code": APP_ID, "bk_app_secret": APP_TOKEN, "bk_username": "admin", "condition": [ { "bk_obj_id": "host", "fields": ["bk_host_name", "bk_host_innerip", "bk_host_id", "bk_cloud_id", ], }, { "bk_obj_id": "biz", "fields": ["bk_biz_id", "bk_biz_name"], } ], # "page": {"start": 0, "limit": 10} } response = requests.post(url=url, data=json.dumps(kwargs), verify=False) if response.status_code == 200: return json.loads(response.content).get("data") # 获取所有业务,测试OK def cc_search_business(fields=[]): """ 获取业务列表 :param request: :return: """ client = get_client_by_user("admin") params = {'fields': fields} res = client.cc.search_business(**params) if res.get('code') == 0: return res.get('data', {}).get('info', []) def job(request): return render_mako_context(request, '/home_application/job_center.html') def get_biz_list(request): fields = ["bk_biz_id", "bk_biz_name"] return JsonResponse({"result": True, "data": data}) def get_bk_hosts(request): from home_application.utils.cc_by_request import cc_search_host res_data = cc_search_host() return JsonResponse({"result": True, "data": res_data}) class CsrfExemptView(View): @method_decorator(csrf_exempt) def dispatch(self, request, *args, **kwargs): return super(CsrfExemptView, self).dispatch(request, *args, **kwargs) class JobView(CsrfExemptView): def get(self, request, *args, **kwargs): search_type = request.GET.get("search_type") query_str = request.GET.get("query_str") try: job_query = Job.objects.all() except Exception: return JsonResponse({"result": False}) if search_type: job_query = job_query.filter(type=search_type) if query_str: job_query = job_query.filter(job_name__icontains=query_str) res_data = [i.to_dict() for i in job_query] return JsonResponse({"result": True, "data": res_data}) def post(self, request, *args, **kwargs): data = json.loads(request.body) job_obj = { "bk_biz_id": data.get("add_biz").split(":")[0], "bk_biz_name": data.get("add_biz").split(":")[1], "temp_id": data.get("add_temp").split(":")[0], "temp_name": data.get("add_temp").split(":")[1], "job_name": data.get("add_job_name"), "type": data.get("add_type"), "interval": data.get("add_interval"), "host_ip": data.get("add_host_ip"), } try: Job.objects.create(**job_obj) return JsonResponse({"result": True}) except Exception as e: print(e) return JsonResponse({"result": False}) def put(self, request, *args, **kwargs): data = json.loads(request.body) pk = data.get("pk") bk_biz_id = data.get("edit_bk_biz").split(":")[0] temp_id = data.get("edit_temp_name").split(":")[0] temp_name = data.get("edit_temp_name").split(":")[1] bk_biz_name = data.get("edit_bk_biz").split(":")[1] job_name = data.get("edit_job_name") type = data.get("edit_type") interval = data.get("edit_interval") host_ip = data.get("edit_host_ip") job_obj = { "bk_biz_id": bk_biz_id, "bk_biz_name": bk_biz_name, "temp_id": temp_id, "temp_name": temp_name, "job_name": job_name, "type": type, "interval": interval, "host_ip": host_ip, } try: Job.objects.filter(pk=pk).update(**job_obj) return JsonResponse({"result": True}) except Exception as e: return JsonResponse({"result": False}) def delete(self, request, *args, **kwargs): data = json.loads(request.body) pk = data.get("id") try: Job.objects.filter(pk=pk).delete() return JsonResponse({"result": True}) except Exception: return JsonResponse({"result": False})
models.py
文件内容
class Job(models.Model): bk_biz_id = models.CharField(u"业务ID", max_length=8, blank=True, null=True) bk_biz_name = models.CharField(u"业务名称", max_length=32, blank=True, null=True) temp_id = models.CharField(u"模板ID", max_length=8, blank=True, null=True) temp_name = models.CharField(u"模板名称", max_length=32, blank=True, null=True) job_name = models.CharField(u"作业名称", max_length=16, blank=True, null=True) type = models.CharField(u"类型", max_length=8, blank=True, null=True) interval = models.CharField(u"间隔时间", max_length=4, blank=True, null=True) host_ip = models.CharField(u"主机IP", max_length=32, blank=True, null=True) create_time = models.DateTimeField(u"创建时间", auto_now_add=True) def to_dict(self): return { "pk": self.pk, "bk_biz_id": self.bk_biz_id, "bk_biz_name": self.bk_biz_name, "temp_id": self.temp_id, "temp_name": self.temp_name, "job_name": self.job_name, "type": self.type, "interval": self.interval, "host_ip": self.host_ip, "create_time": parse_datetime_to_timestr(self.create_time), }
实现效果
感谢各位的阅读,以上就是“怎么用python + Element实现作业任务Job操作”的内容了,经过本文的学习后,相信大家对怎么用python + Element实现作业任务Job操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!