kmap/main.py

196 lines
4.2 KiB
Python

# # -*- encoding: utf-8 -*-
# # @File : main
# # @Docs : 程序入口
import multiprocessing
from typing import Optional
from pydantic import BaseModel
from starlette.staticfiles import StaticFiles
from uvicorn import Config, Server
from api import Base_api, STATIC
from webui import Handler
# 文件挂载
Base_api.mount(
path="/static",
app=StaticFiles(directory=STATIC),
name="static",
)
class ProcessExecutor:
def __init__(self):
self.tasks = []
def add_task(self, func, *args, **kwargs):
self.tasks.append((func, args, kwargs))
def start(self):
processes = []
with multiprocessing.Pool() as pool:
for task in self.tasks:
func, args, kwargs = task
process = pool.apply_async(func, args=args, kwds=kwargs)
processes.append(process)
for process in processes:
process.get()
class WebModel(BaseModel):
"""
web 模型
"""
reload: bool = True
host: Optional[str] = "localhost"
port: Optional[int] = 8090
def web_server(conf: WebModel = WebModel()) -> bool:
srv: Server = Server(Config(
app=Base_api,
host=conf.host,
port=conf.port,
reload=conf.reload,
))
srv.run()
return srv.started
def web_ui():
import webview.menu as wm
app = Handler()
app.add_menu(
'服务选择',
[
wm.MenuAction('前端', app.frontend),
wm.MenuSeparator(),
wm.MenuAction("后端", app.backend),
],
)
app.add_menu(
'主页',
[
wm.MenuAction('运行状况', app.index),
wm.MenuSeparator(),
wm.MenuAction('自适应窗口', app.auto_resize),
],
)
app.start_webview()
if __name__ == "__main__":
executor = ProcessExecutor()
executor.add_task(web_ui)
executor.add_task(web_server)
try:
executor.start()
except (Exception, RuntimeError) as _:
pass
# import asyncio
#
# from utils.common import BaseClient
# from utils.config import Config, log
# from utils.model.agent import AgentModel, ChatModel, TaskModel
#
#
# class AsyncQueue:
# """
# 异步队列
# """
#
# def __init__(self):
# self.queue = asyncio.Queue()
#
# async def put(self, item):
# await self.queue.put(item)
#
# async def get(self):
# return await self.queue.get()
#
# def empty(self):
# return self.queue.empty()
#
# def size(self):
# return self.queue.qsize()
#
#
# async def process_queue(queue):
# while not queue.empty():
# item: TaskModel = await queue.get()
# # 处理item
# try:
# reply = await item.execute()
# except (Exception, RuntimeError) as err:
# log.exception(err)
# else:
# log.info(reply)
#
#
# print(Config.get_select_config())
# client = BaseClient(
# ChatModel(**Config.get_select_config())
# )
# researcher = AgentModel(
# role='Researcher',
# backstory="You're a world class researcher working on a major data science company",
# client=client,
# )
# writer = AgentModel(
# role='Writer',
# backstory="You're a famous technical writer, specialized on writing data related content",
# client=client,
# )
#
# task1 = TaskModel(
# agent=researcher,
# description="你好"
# )
#
# task2 = TaskModel(
# agent=writer,
# description="你是谁?"
# )
#
#
# async def task():
# # 加入数据到队列
# async_queue = AsyncQueue()
# # 添加队列任务
# await async_queue.put(
# task1,
# )
# await async_queue.put(
# task2,
# )
# # 异步处理队列
# await process_queue(async_queue)
#
#
# asyncio.run(task())
# if __name__ == '__main__':
#
# asyncio.run(
# task()
# )
# if __name__ == '__main__':
# import asyncio
#
# client = BaseClient(
# ChatModel(**Config.get_select_config())
# )
# # log.info(client.send_to_chatgpt("你好"))
# asyncio.run(client.send_to_chatgpt(
# [
# {
# "role": "user",
# "content": "你好!"
# }
# ]
# ))