Ansible API 2.0的测试

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ansible API 2.0的测试相关的知识,希望对你有一定的参考价值。

    因项目需要使用到ansible api,根据修改官方文档提供的使用范例,经过多次测试,现将能用的代码分享给大家,大家只需根据自己的实际环境修改该代码即可。

    官方文档:http://docs.ansible.com/ansible/latest/dev_guide/developing_api.html#python-api-2-0

    

#coding:utf-8
import json
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase

class ResultCallback(CallbackBase):
    def v2_runner_on_ok(self, result, **kwargs):
        host = result._host
        print(json.dumps({host.name: result._result}, indent=4))


# 初始化需要的对象
Options = namedtuple('Options', ['connection', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'diff'])

# module_path参数指定本地ansible模块包的路径
loader = DataLoader()
options = Options(connection='smart', module_path='/usr/lib/python2.7/dist-packages/ansible/modules', forks=5, become=None, become_method=None, become_user="root", check=False, diff=False)
passwords = dict(vault_pass='secret')

# 实例化ResultCallback来处理结果
results_callback = ResultCallback()

# 创建库存(inventory)并传递给VariableManager
inventory = InventoryManager(loader=loader, sources=['../conf/hosts']) #../conf/hosts是定义hosts
variable_manager = VariableManager(loader=loader, inventory=inventory)

# 创建任务
play_source =  dict(
        name = "Ansible Play",
        hosts = "cephnode",
        gather_facts = 'no',
        tasks = [
            dict(action=dict(module='shell', args='touch /tmp/7.txt'), register='shell_out'), #定义一条任务,如有多条任务也应按照这样的方式定义
         ]
    )
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)

# 开始执行
tqm = None
try:
    tqm = TaskQueueManager(
              inventory=inventory,
              variable_manager=variable_manager,
              loader=loader,
              options=options,
              passwords=passwords,
              stdout_callback=results_callback,  # 使用自定义回调代替“default”回调插件(如不需要stdout_callback参数则按照默认的方式输出)
          )
    result = tqm.run(play)
finally:
    if tqm is not None:
        tqm.cleanup()


我的../conf/hosts文件内容如下:

[cephnode]
192.168.89.136


注意:

    如没有明确指定inventory(如下的参数),那么会默认从/etc/ansible/hosts中读取hosts

sources=['../conf/hosts']

   

补充一下,刚说了定义多条任务的方式,举个例子:

tasks = [
            dict(action=dict(module='shell', args='mkdir /tmp/toby'), register='shell_out'), #首先创建目录
            dict(action=dict(module='copy', args='src=/tmp/abc123.txt dest=/tmp/toby'), register='shell_out') #然后将本地的abc123.txt通过copy模块下发到目标主机的/tmp/toby/目录下
         ]



以上是关于Ansible API 2.0的测试的主要内容,如果未能解决你的问题,请参考以下文章

ansible python api 2.0使用

ansible-playbook api 2.0

Ansible API 2.0解析

Ansible AdHoc & playbook API + 动态生成Inventory +结果关注

python学习之ansible api

红帽发布Ansible 2.0,社区贡献最大