Ansible_API.py
7.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/env python3
import json
import shutil
from ansible.module_utils.common.collections import ImmutableDict
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
from ansible import context
import ansible.constants as C
class ResultCallback(CallbackBase):
"""A sample callback plugin used for performing an action as results come in
If you want to collect all results into a single object for processing at
the end of the execution, look into utilizing the ``json`` callback plugin
or writing your own custom callback plugin
"""
def format_print(self, backStatus, result):
host = result._host
print_color='yellow'
if backStatus == 'SUCESS' and result._result['changed'] == True:
status = 'CHANGED'
print_color = 'yellow'
elif backStatus == 'UNREACHABLE!' or backStatus == 'FAIL!':
status = backStatus
print_color = 'red'
else:
status = backStatus
print_color = 'green'
if status == 'UNREACHABLE!':
message = result._result['msg']
else:
message = '\n'.join(result._result['stdout_lines'])
if print_color == 'red':
print("\033[0;31m%s | %s >>\n%s\n\n\033[0m" % (host, status, message))
elif print_color == 'yellow':
print("\033[0;33m%s | %s >>\n%s\n\n\033[0m" % (host, status, message))
elif print_color == 'green':
print("\033[0;32m%s | %s >>\n%s\n\n\033[0m" % (host, status, message))
def v2_runner_on_ok(self, result, **kwargs):
"""Print a json representation of the result
This method could store the result in an instance attribute for retrieval later
"""
host = result._host
# print(json.dumps({host.name: result._result['stdout_lines']}, indent=4))
# print('SUCCESS: ' + host.name + ' ===> ' + json.dumps(result._result['stdout_lines']))
self.format_print('SUCESS', result)
def v2_runner_on_failed(self, result, **kwargs):
host = result._host
# print('FAIL: ' + host.name + ' ===> ' + result._result['stdout_lines'])
self.format_print('FAIL!', result)
def v2_runner_on_unreachable(self, result):
host = result._host
# print(json.dumps({host.name: result._result}, indent=4))
self.format_print('UNREACHABLE!', result)
#class ResultCallback(CallbackBase):
# """
# 重写callbackBase类的部分方法
# """
# def __init__(self, *args, **kwargs):
# super().__init__(*args, **kwargs)
# self.host_ok = {}
# self.host_unreachable = {}
# self.host_failed = {}
# self.task_ok = {}
# def v2_runner_on_unreachable(self, result):
# self.host_unreachable[result._host.get_name()] = result
#
# def v2_runner_on_ok(self, result, **kwargs):
# self.host_ok[result._host.get_name()] = result
#
# def v2_runner_on_failed(self, result, **kwargs):
# self.host_failed[result._host.get_name()] = result
class MyAnsiable():
def __init__(self,
connection='local', # 连接方式 local 本地方式,smart ssh方式
remote_user=None, # 远程用户
ack_pass=None, # 提示输入密码
sudo=None, sudo_user=None, ask_sudo_pass=None,
module_path=None, # 模块路径,可以指定一个自定义模块的路径
forks=10,
become=True, # 是否提权
become_method='sudo', # 提权方式 默认 sudo 可以是 su
become_user='root', # 提权后,要成为的用户,并非登录用户
check=False, diff=False,
listhosts=None, listtasks=None,listtags=None,
verbosity=3,
syntax=None,
start_at_task=None,
inventory=None):
# 函数文档注释
"""
初始化函数,定义的默认的选项值,
在初始化的时候可以传参,以便覆盖默认选项的值
"""
context.CLIARGS = ImmutableDict(
connection=connection,
remote_user=remote_user,
ack_pass=ack_pass,
sudo=sudo,
sudo_user=sudo_user,
ask_sudo_pass=ask_sudo_pass,
module_path=module_path,
become=become,
become_method=become_method,
become_user=become_user,
verbosity=verbosity,
listhosts=listhosts,
listtasks=listtasks,
listtags=listtags,
syntax=syntax,
start_at_task=start_at_task,
)
# 三元表达式,假如没有传递 inventory, 就使用 "localhost,"
# 确定 inventory 文件
self.inventory = inventory if inventory else "localhost,"
# 实例化数据解析器
self.loader = DataLoader()
# 实例化 资产配置对象
self.inv_obj = InventoryManager(loader=self.loader, sources=self.inventory)
# 设置密码,可以为空字典,但必须有此参数
self.passwords = {}
# 实例化回调插件对象
self.results_callback = ResultCallback()
# 变量管理器
self.variable_manager = VariableManager(self.loader, self.inv_obj)
def run(self, hosts='localhost', gether_facts="no", module="ping", args=''):
play_source = dict(
name = "Ad-hoc",
hosts = hosts,
gather_facts = gether_facts,
tasks = [
# 这里每个 task 就是这个列表中的一个元素,格式是嵌套的字典
# 也可以作为参数传递过来,这里就简单化了。
{"action":{"module": module, "args": args}},
])
play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
tqm = None
try:
tqm = TaskQueueManager(
inventory=self.inv_obj ,
variable_manager=self.variable_manager,
loader=self.loader,
passwords=self.passwords,
stdout_callback=self.results_callback)
result = tqm.run(play)
finally:
if tqm is not None:
tqm.cleanup()
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
def playbook(self,playbooks):
from ansible.executor.playbook_executor import PlaybookExecutor
playbook = PlaybookExecutor(playbooks=playbooks, # 注意这里是一个列表
inventory=self.inv_obj,
variable_manager=self.variable_manager,
loader=self.loader,
passwords=self.passwords)
# 使用回调函数
playbook._tqm._stdout_callback = self.results_callback
result = playbook.run()
def get_result(self):
result_raw = {'success':{},'failed':{},'unreachable':{}}
# print(self.results_callback.host_ok)
for host,result in self.results_callback.host_ok.items():
result_raw['success'][host] = result._result["stdout_lines"]
for host,result in self.results_callback.host_failed.items():
result_raw['failed'][host] = result._result
for host,result in self.results_callback.host_unreachable.items():
result_raw['unreachable'][host] = result._result
# 最终打印结果,并且使用 JSON 继续格式化
print(json.dumps(result_raw, indent=4))
if __name__ == '__main__':
ansible2 = MyAnsiable(inventory='tmp_hosts', connection='smart')
ansible2.run(hosts= "192.168.8.8", module="script", args='test_reboot.sh')
# ansible2.run(hosts= "192.168.8.100", module="raw", args='ls')
#ansible2.