Elasticsearch実践ガイド:RestTemplateによるデータ操作と分散ログ収集基盤の構築

環境準備と基本設定

Elasticsearchのアーカイブを解凍後、bin/elasticsearchを実行することで9200ポートでサービスが起動します。操作インターフェースとしてKibanaを併設する場合、バージョンをElasticsearchと一致させる必要があります。kibana.ymlではserver.portserver.hostelasticsearch.hostsの3項目を修正し、専用ユーザー権限で起動プロセスを実行することで権限エラーを回避できます。

インデックス設計と分析器の選定

Elasticsearchではデータが主シャードとリプライカシャードに分散配置されます。主シャード数はインデックス作成時に固定されるため、データスケールに合わせて慎重に設計する必要があります。一方、リプライカ数はスループット向上やフェイルオーバーに対応するため、運用中に動的に変更可能です。

日本語テキストの検索では、標準分析器では単文字単位で分割されるため検索精度が低下します。代わりにik_max_wordik_smartを組み合わせたIK分析器を導入し、インデックス作成時は最大粒度、検索時は最小粒度でトークン化するのが推奨されます。完全一致検索にはtermクエリを使用し、同義語の処理には分析器に辞書ファイルを紐付ける構成が有効です。

Spring Data Elasticsearchにおけるクエリ実装

Java環境ではElasticsearchRestTemplateを通じて検索、更新、一括処理を実装できます。以下に主要な操作パターンを示します。

条件検索とフィールド抽出

String[] targetFields = {"recordIdentifier", "reviewStatus"};
FetchSourceFilter projection = FetchSourceFilterBuilder.filter(true, targetFields);

SearchQuery searchCriteria = new NativeSearchQueryBuilder()
        .withSourceFilter(projection)
        .withQuery(QueryBuilders.termsQuery("recordIdentifier", identifierList))
        .build();

List<ArticleDocument> matchedDocs = restTemplate.queryForList(searchCriteria, ArticleDocument.class);

インラインスクリプトによるフィールド更新

Map<String, Object> scriptArgs = Map.of("archiveFlag", true);
String scriptBody = "ctx._source.isArchived = params.archiveFlag";
Script inlineScript = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptBody, scriptArgs);

UpdateByQueryRequest updateReq = new UpdateByQueryRequest("documents_v2");
updateReq.setDocTypes("_doc");
updateReq.setQuery(QueryBuilders.termQuery("recordIdentifier", identifierList));
updateReq.setScript(inlineScript);
updateReq.setConflicts("proceed");
updateReq.setRefresh(true);

restTemplate.getClient().updateByQuery(updateReq, RequestOptions.DEFAULT);

ストリーム処理を活用した一括更新

Map<String, Object> updatePayload = Map.of(
    "isArchived", true,
    "modifiedTimestamp", Instant.now().toString()
);
UpdateRequest baseUpdateOp = new UpdateRequest().doc(updatePayload);

List<UpdateQuery> bulkOperations = identifierList.stream()
        .map(id -> new UpdateQueryBuilder()
                .withUpdateRequest(baseUpdateOp)
                .withIndexName("documents_v2")
                .withType("_doc")
                .withId(id)
                .withDoUpsert(true)
                .build())
        .collect(Collectors.toList());

restTemplate.bulkUpdate(bulkOperations, 
    BulkOptions.builder().refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).build());

複合条件検索とエンティティマッピング

BoolQueryBuilder conditionRoot = QueryBuilders.boolQuery();
conditionRoot.must(QueryBuilders.termQuery("creatorId", operatorId));
conditionRoot.must(QueryBuilders.termQuery("isDeleted", false));

// ネステド型フィールドの深層検索
conditionRoot.must(QueryBuilders.nestedQuery(
    "referenceNodes",
    QueryBuilders.termQuery("referenceNodes.categoryCode", targetCategory),
    ScoreMode.None
));

// オプショナル条件の追加
BoolQueryBuilder optionalFilter = QueryBuilders.boolQuery();
optionalFilter.should(QueryBuilders.wildcardQuery(
    "referenceNodes.hierarchyPath", "*." + targetNodeId + ".*"
));
conditionRoot.must(optionalFilter);

FieldSortBuilder sortConfig = new FieldSortBuilder("modifiedTimestamp").order(SortOrder.DESC);
HighlightBuilder highlightConfig = new HighlightBuilder()
        .preTags("<mark class='hit'>")
        .postTags("</mark>")
        .field("documentTitle");

SearchQuery finalQuery = new NativeSearchQueryBuilder()
        .withQuery(conditionRoot)
        .withSort(sortConfig)
        .withHighlightBuilder(highlightConfig)
        .withPageable(PageRequest.of(pageIndex, pageSize))
        .build();

restTemplate.queryForPage(finalQuery, ArticleDocument.class, new HighlightResultMapper(defaultMapper));

対応するJavaエンティティクラスでは、@Document@Fieldアノテーションを用いてインデックス名やデータ型を定義します。多言語対応や分析器の分離が必要な場合は@MultiFieldを活用し、設定ファイル経由でシャード数やリフレッシュインターバルを管理できます。

@Data
@Document(indexName = "documents_v2", type = "_doc")
@Setting(settingPath = "/config/es-config.json")
public class ArticleDocument {

    @Id
    @Field(type = FieldType.Keyword, name = "recordIdentifier")
    private String recordIdentifier;

    @MultiField(mainField = @Field(type = FieldType.Text, name = "documentTitle", analyzer = "standard"),
            otherFields = {
                @InnerField(suffix = "jp", type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart"),
                @InnerField(suffix = "en", type = FieldType.Text, analyzer = "english")
            })
    private String documentTitle;

    @Field(type = FieldType.Keyword, name = "creatorId")
    private String creatorId;

    @Field(type = FieldType.Date, name = "createdTimestamp", format = DateFormat.custom, pattern = "yyyy-MM-dd HH:mm:ss")
    private String createdTimestamp;

    @Field(type = FieldType.Nested, name = "referenceNodes")
    private List<NodeReference> referenceNodes;
}
{
  "index": {
    "number_of_shards": 3,
    "number_of_replicas": 2,
    "refresh_interval": "5s"
  },
  "analysis": {
    "filter": {
      "custom_stopwords": {
        "type": "stop",
        "stopwords": ["this", "that", "the"]
      }
    },
    "analyzer": {
      "custom_text": {
        "tokenizer": "standard",
        "filter": ["lowercase", "custom_stopwords"]
      }
    }
  }
}

分散ログ収集基盤の構築

マイクロサービスや分散デプロイ環境では、各ノードのログを一元管理する仕組みが必要です。ファイル監視エージェントとストリームプロセッサ、検索エンジンを組み合わせることで、リアルタイムのログ可視化を実現できます。

Logstashの設定では、ファイルパスの監視とBeats受信ポートの定義をinputセクションで行い、マルチラインコーデックを使用してスタックトレースや改行を含むログを単一のドキュメントとして結合します。outputでは対象Elasticsearchクラスタへの送信先とインデックスパターンを指定します。

input {
  file {
    path => ["/var/log/services/app-*.log"]
    start_position => "beginning"
    codec => multiline {
      pattern => "^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}"
      negate => true
      what => "previous"
    }
  }
  beats {
    port => 5044
  }
}

output {
  if [type] == "service_log" {
    elasticsearch {
      hosts => ["http://127.0.0.1:9200"]
      index => "svc-logs-%{+YYYY.MM.dd}"
    }
  }
}

設定ファイルを直接起動コマンドに渡す際、インデントや特殊文字の混入は構文エラーの原因となるため、手動入力やエディタの正規表現チェックを推奨します。インデックスが正常に投入されたら、KibanaのDiscover機能でインデックスパターンを登録し、フィールドマッピングの同期完了後にログ検索を開始できます。

タグ: Elasticsearch Spring Data Elasticsearch Kibana Logstash Java

5月26日 22:54 投稿