OpenSearch¶
運算子¶
在 OpenSearch 中建立索引¶
使用 OpenSearchCreateIndexOperator 在 OpenSearch 域中建立新的索引。
tests/system/opensearch/example_opensearch.py
create_index = OpenSearchCreateIndexOperator(
task_id="create_index",
index_name=INDEX_NAME,
index_body={"settings": {"index": {"number_of_shards": 1}}},
)
在 OpenSearch 的索引中新增文件¶
使用 OpenSearchAddDocumentOperator 向 OpenSearch 索引新增單個文件
tests/system/opensearch/example_opensearch.py
add_document_by_args = OpenSearchAddDocumentOperator(
task_id="add_document_with_args",
index_name=INDEX_NAME,
doc_id=1,
document={"log_group_id": 1, "logger": "python", "message": "hello world"},
)
add_document_by_class = OpenSearchAddDocumentOperator(
task_id="add_document_by_class",
doc_class=LogDocument(log_group_id=2, logger="airflow", message="hello airflow"),
)
對 OpenSearch 索引執行查詢¶
使用 OpenSearchQueryOperator 對 OpenSearch 索引執行查詢。
tests/system/opensearch/example_opensearch.py
search_low_level = OpenSearchQueryOperator(
task_id="low_level_query",
index_name="system_test",
query={"query": {"bool": {"must": {"match": {"message": "hello world"}}}}},
)
search = Search()
search._index = [INDEX_NAME]
search_object = search.filter("term", logger="airflow").query("match", message="hello airflow")
search_high_level = OpenSearchQueryOperator(task_id="high_level_query", search_object=search_object)