supplierTaskAgent.go 15 KB


  1. package main
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/json"
  6. "flag"
  7. "fmt"
  8. "io/ioutil"
  9. "log"
  10. "net"
  11. "net/http"
  12. "os"
  13. "os/exec"
  14. "time"
  15. "github.com/mitchellh/go-homedir"
  16. "golang.org/x/crypto/ssh"
  17. gossh "golang.org/x/crypto/ssh"
  18. "gopkg.in/yaml.v2"
  19. )
  20. //声明变量
  21. var (
  22. err error
  23. f []byte
  24. logFile *os.File
  25. Config_dir string
  26. Conf Config
  27. Loger *log.Logger
  28. TaskCache []Task
  29. resp *http.Response
  30. content []byte
  31. )
  32. type Config struct {
  33. Url string
  34. User string
  35. Password string
  36. Logdir string
  37. Cmdb_url string
  38. DingDing_webHook string
  39. SshKeyPath string
  40. Type_cmd struct {
  41. N1 []string
  42. N2 []string
  43. N8 []string
  44. }
  45. Cmdb_Token string
  46. }
  47. type Hostinfo struct {
  48. Id int //id
  49. Ip string //ip地址
  50. Status int //节点状态
  51. Hostname string //主机名
  52. Node_type float64 //节点类型
  53. Server_room_name string //机房
  54. ServerId int //服务器id
  55. Idc_id int //idc id
  56. Sup_id int //供应商id
  57. Node_id float64 //node类型id
  58. }
  59. type Task struct {
  60. Id int
  61. Hostnumber int
  62. Hostgroup string
  63. Describe string
  64. Status string
  65. Name string
  66. ExecResult string
  67. Starttime int64
  68. Stoptime int64
  69. Createtime int64
  70. }
  71. type TaskResult struct {
  72. Ip string //ip地址
  73. Status int //节点状态
  74. Hostname string //主机名
  75. Node_type float64 //节点类型
  76. Server_room_name string //机房
  77. Result struct {
  78. Cmdb_api_result string //cmdb结果
  79. Cmd_result string //命令结果
  80. }
  81. }
  82. //声明变量 end
  83. //钉钉发消息
  84. func SendDingMsg(msg string) {
  85. //请求地址模板
  86. webHook := Conf.DingDing_webHook
  87. // content := `{'msgtype': 'text',
  88. // 'text': {'content': '` + msg + `'}
  89. // }`
  90. content := make(map[string]interface{})
  91. text := make(map[string]string)
  92. text["content"] = msg
  93. content["msgtype"] = "text"
  94. content["text"] = text
  95. fmt.Println(content)
  96. ddd, err := json.Marshal(content)
  97. fmt.Println("钉钉发消息", string(ddd), err)
  98. //创建一个请求
  99. req, err := http.NewRequest("POST", webHook, bytes.NewReader(ddd))
  100. if err != nil {
  101. Loger.Println("钉钉发消息失败", err)
  102. }
  103. client := &http.Client{}
  104. //设置请求头
  105. req.Header.Set("Content-Type", "application/json; charset=utf-8")
  106. //发送请求
  107. resp, err := client.Do(req)
  108. //关闭请求
  109. defer resp.Body.Close()
  110. if err != nil {
  111. Loger.Println("钉钉发消息失败", err)
  112. }
  113. }
  114. //不实时执行linux命令
  115. func Cmd(command string) string {
  116. cmd := exec.Command("/bin/bash", "-c", command)
  117. bytes, err := cmd.Output()
  118. if err != nil {
  119. return fmt.Sprintln("执行命令:", command+"\n", err)
  120. }
  121. resp := string(bytes)
  122. return "执行命令:" + command + "\n" + resp
  123. }
  124. //检测任务并执行
  125. func TimeCheck() {
  126. fmt.Println("1")
  127. t := TaskCache
  128. d := time.Now().Unix()
  129. for _, v := range t {
  130. if v.Status == "false" {
  131. // 开始任务
  132. if d == v.Starttime {
  133. r, err := TaskStart(v.Hostgroup, v.Describe, v.Name, v.Starttime, v.Stoptime, v.Hostnumber)
  134. fmt.Println("开始任务", err)
  135. SendDingMsg(r)
  136. }
  137. //结束任务
  138. if d == v.Stoptime {
  139. r, err := TaskStop(v.Hostgroup)
  140. fmt.Println("结束任务", err)
  141. SendDingMsg(r)
  142. }
  143. }
  144. }
  145. }
  146. //更新任务缓存
  147. func TaskUpcache() {
  148. for {
  149. content, s := CmdbApi("get_task", nil)
  150. if s == true {
  151. if err = json.Unmarshal(content, &TaskCache); err != nil {
  152. fmt.Println(err, string(content))
  153. }
  154. }
  155. time.Sleep(1 * time.Second)
  156. }
  157. }
  158. type Cli struct {
  159. user string
  160. pwd string
  161. addr string
  162. client *gossh.Client
  163. session *gossh.Session
  164. LastResult string
  165. sshKeyPath string
  166. }
  167. func (c *Cli) Connect() (*Cli, error) {
  168. config := &gossh.ClientConfig{}
  169. config.SetDefaults()
  170. config.User = c.user
  171. // config.Auth = []gossh.AuthMethod{gossh.Password(c.pwd)}
  172. config.Auth = []ssh.AuthMethod{publicKeyAuthFunc(c.sshKeyPath)}
  173. config.HostKeyCallback = func(hostname string, remote net.Addr, key gossh.PublicKey) error { return nil }
  174. client, err := gossh.Dial("tcp", c.addr, config)
  175. if nil != err {
  176. return c, err
  177. }
  178. c.client = client
  179. return c, nil
  180. }
  181. func (c Cli) Run(shell string) (string, error) {
  182. if c.client == nil {
  183. if _, err := c.Connect(); err != nil {
  184. return "", err
  185. }
  186. }
  187. session, err := c.client.NewSession()
  188. if err != nil {
  189. return "", err
  190. }
  191. defer session.Close()
  192. buf, err := session.CombinedOutput(shell)
  193. c.LastResult = string(buf)
  194. return c.LastResult, err
  195. }
  196. //cmdb api
  197. func CmdbApi(action string, data interface{}) ([]byte, bool) {
  198. now := time.Now().Unix()
  199. u := make(map[string]string)
  200. m := md5.Sum([]byte(Conf.User + Conf.Password + fmt.Sprintln(now)))
  201. client := http.Client{Timeout: 5 * time.Second}
  202. u["action"] = "get_task"
  203. u["user"] = Conf.User
  204. u["time"] = fmt.Sprintln(now)
  205. u["sign"] = fmt.Sprintf("%x", m)
  206. d, err1 := json.Marshal(data)
  207. if err1 != nil {
  208. fmt.Println(err1)
  209. }
  210. u["data"] = string(d)
  211. _u, err2 := json.Marshal(u)
  212. if err2 != nil {
  213. fmt.Println(err2)
  214. }
  215. resp, err3 := client.Post(Conf.Url, "application/json;charset=utf-8", bytes.NewBuffer(_u))
  216. if err2 != nil {
  217. fmt.Println("1", err3)
  218. }
  219. defer resp.Body.Close()
  220. content, err4 := ioutil.ReadAll(resp.Body)
  221. if err4 != nil {
  222. fmt.Println("Read failed:", err4)
  223. return []byte(""), false
  224. }
  225. return content, true
  226. }
  227. //http请求
  228. func Httprequest(url, method, data string) ([]byte, error) {
  229. var (
  230. req *http.Request
  231. resp *http.Response
  232. body []byte
  233. err error
  234. )
  235. d := []byte(data)
  236. if data != "" {
  237. if req, err = http.NewRequest(method, url, bytes.NewReader(d)); err != nil {
  238. return body, err
  239. }
  240. } else {
  241. if req, err = http.NewRequest(method, url, nil); err != nil {
  242. return body, err
  243. }
  244. }
  245. req.Header.Set("Content-Type", "application/json;charset=UTF-8")
  246. req.Header.Set("Authorization", "Token "+Conf.Cmdb_Token)
  247. if resp, err = http.DefaultClient.Do(req); err != nil {
  248. return body, err
  249. }
  250. defer resp.Body.Close()
  251. if body, err = ioutil.ReadAll(resp.Body); err != nil {
  252. return body, err
  253. }
  254. return body, err
  255. }
  256. func publicKeyAuthFunc(kPath string) ssh.AuthMethod {
  257. //交叉编译打开文件
  258. keyPath, err := homedir.Expand(kPath)
  259. if err != nil {
  260. log.Fatal("find key's home dir failed", err)
  261. }
  262. //读文件
  263. key, err := ioutil.ReadFile(keyPath)
  264. if err != nil {
  265. log.Fatal("ssh key file read failed", err)
  266. }
  267. //解析为一个对象
  268. // Create the Signer for this private key.
  269. signer, err := ssh.ParsePrivateKey(key)
  270. if err != nil {
  271. log.Fatal("ssh key signer failed", err)
  272. }
  273. //放入key对象 生成一个指针
  274. return ssh.PublicKeys(signer)
  275. }
  276. //开始任务
  277. func TaskStart(hostgroup string, desc string, name string, starttime int64, stoptime int64, hostnumber int) (string, error) {
  278. //关节点
  279. var (
  280. h []Hostinfo
  281. v []byte
  282. taskResult []TaskResult
  283. )
  284. if err = json.Unmarshal([]byte(hostgroup), &h); err != nil {
  285. return "", err
  286. }
  287. for index := range h {
  288. switch h[index].Node_type {
  289. case 16:
  290. t := TaskResult{}
  291. if v, err = Httprequest(Conf.Cmdb_url+"nodes/"+fmt.Sprintln(h[index].Node_id), "PUT", `{"status":1,"node_type":`+fmt.Sprintln(h[index].Node_type)+`}`); err != nil {
  292. continue
  293. }
  294. c := ""
  295. for cmd_index := range Conf.Type_cmd.N8 {
  296. c += Cmd(fmt.Sprintln("ssh root@127.0.0.1 '%s' -p 7721 ", Conf.Type_cmd.N8[cmd_index]))
  297. }
  298. t.Server_room_name = h[index].Server_room_name
  299. t.Node_type = h[index].Node_type
  300. t.Ip = h[index].Ip
  301. t.Hostname = h[index].Hostname
  302. t.Status = h[index].Status
  303. t.Result.Cmd_result = c
  304. t.Result.Cmdb_api_result = string(v)
  305. taskResult = append(taskResult, t)
  306. case 32:
  307. t := TaskResult{}
  308. if v, err = Httprequest(Conf.Cmdb_url+"nodes/"+fmt.Sprintln(h[index].Node_id), "PUT", `{"status":1,"node_type":`+fmt.Sprintln(h[index].Node_type)+`}`); err != nil {
  309. continue
  310. }
  311. c := ""
  312. for cmd_index := range Conf.Type_cmd.N8 {
  313. c += Cmd(Conf.Type_cmd.N8[cmd_index])
  314. }
  315. t.Server_room_name = h[index].Server_room_name
  316. t.Node_type = h[index].Node_type
  317. t.Ip = h[index].Ip
  318. t.Hostname = h[index].Hostname
  319. t.Status = h[index].Status
  320. t.Result.Cmd_result = c
  321. t.Result.Cmdb_api_result = string(v)
  322. taskResult = append(taskResult, t)
  323. case 64:
  324. t := TaskResult{}
  325. if v, err = Httprequest(Conf.Cmdb_url+"nodes/"+fmt.Sprintln(h[index].Node_id), "PUT", `{"status":1,"node_type":`+fmt.Sprintln(h[index].Node_type)+`}`); err != nil {
  326. continue
  327. }
  328. c := ""
  329. for cmd_index := range Conf.Type_cmd.N8 {
  330. c += Cmd(Conf.Type_cmd.N8[cmd_index])
  331. }
  332. t.Server_room_name = h[index].Server_room_name
  333. t.Node_type = h[index].Node_type
  334. t.Ip = h[index].Ip
  335. t.Hostname = h[index].Hostname
  336. t.Status = h[index].Status
  337. t.Result.Cmd_result = c
  338. t.Result.Cmdb_api_result = string(v)
  339. taskResult = append(taskResult, t)
  340. case 8:
  341. t := TaskResult{}
  342. if v, err = Httprequest(Conf.Cmdb_url+"nodes/"+fmt.Sprintln(h[index].Node_id), "PUT", `{"status":1,"node_type":`+fmt.Sprintln(h[index].Node_type)+`}`); err != nil {
  343. continue
  344. }
  345. c := ""
  346. for cmd_index := range Conf.Type_cmd.N8 {
  347. cli := Cli{
  348. user: "root",
  349. pwd: "",
  350. addr: h[index].Ip + ":7721",
  351. sshKeyPath: Conf.SshKeyPath,
  352. }
  353. fmt.Println(cli)
  354. output, err := cli.Run(Conf.Type_cmd.N8[cmd_index])
  355. if err != nil {
  356. c += fmt.Sprintln(err)
  357. }
  358. c += fmt.Sprintln(output)
  359. }
  360. t.Server_room_name = h[index].Server_room_name
  361. t.Node_type = h[index].Node_type
  362. t.Ip = h[index].Ip
  363. t.Hostname = h[index].Hostname
  364. t.Status = h[index].Status
  365. t.Result.Cmd_result = c
  366. t.Result.Cmdb_api_result = string(v)
  367. taskResult = append(taskResult, t)
  368. default:
  369. t := TaskResult{}
  370. t.Server_room_name = h[index].Server_room_name
  371. t.Node_type = h[index].Node_type
  372. t.Ip = h[index].Ip
  373. t.Hostname = h[index].Hostname
  374. t.Status = h[index].Status
  375. taskResult = append(taskResult, t)
  376. }
  377. }
  378. m, err := UpVersion()
  379. t := struct {
  380. Title string
  381. Hostlength int
  382. TaskResult []TaskResult
  383. VersionInfo string
  384. Describe string
  385. Name string
  386. Startime string
  387. Stoptime string
  388. }{
  389. Title: "开始任务",
  390. Hostlength: hostnumber,
  391. TaskResult: taskResult,
  392. VersionInfo: m,
  393. Describe: desc,
  394. Name: name,
  395. Startime: time.Unix(starttime, 0).Format("2006-01-02 15:04:05"),
  396. Stoptime: time.Unix(stoptime, 0).Format("2006-01-02 15:04:05"),
  397. }
  398. d, err := yaml.Marshal(t)
  399. return string(d), err
  400. }
  401. //结束任务
  402. func TaskStop(hostgroup string) (string, error) {
  403. //开节点
  404. var (
  405. title string
  406. msg string
  407. h []Hostinfo
  408. v []byte
  409. )
  410. if err = json.Unmarshal([]byte(hostgroup), &h); err != nil {
  411. return "", err
  412. }
  413. msg += "结束任务" + "\n"
  414. msg += fmt.Sprintln("执行主机长度", len(h))
  415. for index := range h {
  416. switch h[index].Node_type {
  417. case 16:
  418. msg += fmt.Sprintln(h[index].Ip, "执行汇集节点命令")
  419. case 32:
  420. msg += fmt.Sprintln(h[index].Ip, "执行普通几点")
  421. case 64:
  422. msg += fmt.Sprintln(h[index].Ip, "执行普通64")
  423. case 8:
  424. if v, err = Httprequest(Conf.Cmdb_url+"nodes/"+fmt.Sprintln(h[index].Node_id), "PUT", `{"status":1,"node_type":`+fmt.Sprintln(h[index].Node_type)+`}`); err != nil {
  425. return msg, err
  426. }
  427. c := ""
  428. for cmd_index := range Conf.Type_cmd.N8 {
  429. c += Cmd(Conf.Type_cmd.N8[cmd_index])
  430. }
  431. title = fmt.Sprintln("主机", h[index].Ip, "类型", h[index].Node_type, "状态", h[index].Status, "机房", h[index].Server_room_name, "---------主机执行")
  432. msg += fmt.Sprintln(title, "执行cmdb接口"+string(v)+"\n", "------执行shells\n", c, "\n------执行shell结束")
  433. msg += fmt.Sprintln("主机", h[index].Ip, "类型", h[index].Node_type, "状态", h[index].Status, "机房", h[index].Server_room_name, "---------主机执行结束")
  434. case 2:
  435. //关闭节点
  436. msg += fmt.Sprintln(h[index].Ip, "执行普通64")
  437. default:
  438. msg += fmt.Sprintln(index, "无节点类型", h[index], "类型", h[index].Node_type, "节点状态", h[index].Status)
  439. }
  440. }
  441. m, err := UpVersion()
  442. msg += m
  443. return msg, err
  444. }
  445. //更新版本号+1
  446. func UpVersion() (string, error) {
  447. var (
  448. err error
  449. msg string
  450. v []byte
  451. )
  452. version := make(map[string]int)
  453. if v, err = Httprequest(Conf.Cmdb_url+"node-version/1", "GET", ""); err != nil {
  454. return "", err
  455. }
  456. if err = json.Unmarshal(v, &version); err != nil {
  457. return "", err
  458. }
  459. msg += fmt.Sprintln("version执行前版本", version["version"])
  460. version["version"] = version["version"] + 1
  461. if v, err = json.Marshal(version); err != nil {
  462. return "", err
  463. }
  464. if v, err = Httprequest(Conf.Cmdb_url+"node-version/1", "PUT", string(v)); err != nil {
  465. return "", err
  466. }
  467. msg += fmt.Sprintln("当前版本号", string(v))
  468. return msg, err
  469. }
  470. //初始化
  471. func init() {
  472. flag.StringVar(&Config_dir, "taskConfig", "./taskConfig.json", "配置文件默认地址")
  473. flag.Parse()
  474. currentTime := time.Now()
  475. //加载配置文件
  476. if f, err = ioutil.ReadFile(Config_dir); err != nil {
  477. fmt.Println("读取错误", err)
  478. os.Exit(255)
  479. }
  480. if err = json.Unmarshal(f, &Conf); err != nil {
  481. fmt.Println("配置文件加载错误", err)
  482. os.Exit(255)
  483. }
  484. // dddd, _ := json.MarshalIndent(Conf, "", " ")
  485. // fmt.Println("初始化配置文件内容如下\n", string(dddd))
  486. if logFile, err = os.OpenFile(currentTime.Format(Conf.Logdir+"/2006-01-02.log"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766); err != nil {
  487. if err != nil {
  488. fmt.Println("创建日志失败", err)
  489. os.Exit(255)
  490. }
  491. }
  492. Loger = log.New(logFile, "[LOG]", log.Ldate|log.Lshortfile|log.Ltime)
  493. }
  494. func main() {
  495. go TaskUpcache()
  496. content, s := CmdbApi("get_task", nil)
  497. if s == true {
  498. if err = json.Unmarshal(content, &TaskCache); err != nil {
  499. fmt.Println(err, string(content))
  500. }
  501. }
  502. r, err := TaskStart(TaskCache[0].Hostgroup, TaskCache[0].Describe, TaskCache[0].Name, TaskCache[0].Starttime, TaskCache[0].Stoptime, TaskCache[0].Hostnumber)
  503. fmt.Println("开始任务", err)
  504. fmt.Println(r)
  505. SendDingMsg(r)
  506. // for {
  507. // if len(TaskCache) == 0 {
  508. // fmt.Println(111)
  509. // return
  510. // }
  511. // TimeCheck()
  512. // time.Sleep(1 * time.Second)
  513. // }
  514. }