Elasticsearch – Ein praktischer Einstieg

Tweets mit Logstash indizieren

Im Buch wird für die Demonstration der Eigenschaften der unterschiedlichen Aggregationen auf Kurznachrichten von Twitter zugegriffen. In Anhang B ist die Installation des Twitter-Rivers beschrieben, der genutzt werden kann, um Tweets in Elasticsearch zu indizieren. Da die River mittlerweile als Deprecated markiert sind und schon nicht mehr in der Version 2.0 von Elasticsearch enthalten sind, ist angebracht, auch noch eine alternative Möglichkeit zu beschreiben, um Tweets in Elasticsearch zu indizieren.

Dieser Artikel beschreibt, wie wir Logstash nutzen können, um über den Twitter-Input Daten einzulesen und über den Elasticsearch-Output nach Elasticsearch zu schreiben. Große Teile sind von einem englischsprachigen Artikel von David Pilato zu diesem Thema inspiriert.

Um Logstash zu nutzen, müssen wir das passende Archiv von der Elasticsearch-Webseite herunterladen und entpacken. Im Beispiel kommt Logstash 1.5.4 zum Einsatz. Die Konfiguration erfolgt in einer einzelnen Datei, beispielsweise namens twitter.conf, die wir an beliebiger Stelle im Dateisystem ablegen können.

Eine Logstash-Konfigurationsdatei besteht normalerweise aus drei Sektionen:

Für unser Beispiel benötigen wir nur die input- und die output-Sektion, da wir die Daten nicht modifizieren, sondern sie wie sie sind nach Elasticsearch schreiben. Das komplette Beispiel findet sich in den Konfigurationsdateien twitter.conf und twitter_template.json im Repository mit den Beispieldaten zum Buch.

Twitter-Input konfigurieren

In der input-Sektion konfigurieren wir den Twitter-Input.

input {
  twitter {
      consumer_key => "..."
      consumer_secret => "..."
      oauth_token => "..."
      oauth_token_secret => "..."
      keywords => [ "logstash", "elasticsearch" ]
      full_tweet => true
  }
}

Der Zugriff auf die Twitter-API war früher offen, mittlerweile muss man sich allerdings einen Zugang im Twitter-Developer-Portal anlegen. Dort sind dann die unterschiedlichen für den Zugriff notwendigen Daten erhältlich. Allerdings sind auch für einen solchen Zugang nur manche der Daten frei, wenn zu viele Daten abgerufen werden kann es passieren, dass der Zugriff temporär gedrosselt oder gesperrt wird.

Seit Kurzem ist es mit dem Plugin auch möglich, auf den Twitter-Sample-Stream zuzugreifen, der im Buch auch mit dem Twitter-River verwendet wird. In diesem Beispiel geben wir allerdings an, dass wir nur an Tweets mit den Schlagwörtern logstash und elasticsearch interessiert sind.

Über full_tweet geben wir schließlich noch an, dass wir alle Daten erhalten wollen. Dadurch werden alle notwendigen Felder zurückgegeben.

Twitter-Input testen

Wir können den Twitter-Input testen, indem wir den stdout-Output konfigurieren, der die einkommenden Daten einfach auf der Kommandozeile ausgibt.

output {
  stdout {
    codec => rubydebug
  }
}

Dadurch werden die Daten als Ruby-Hash ausgegeben, einer Datenstruktur die JSON ähnelt. Ein ankommender Tweet kann in Auszügen folgendermaßen aussehen.

{
  "created_at": "Wed Aug 26 11:45:59 +0000 2015",
  "id": 636504862134521900,
  "text": "Looking forward  to be at #elasticsearch FFM this evening. I'll be giving a short talk on how to index tweets with #logstash",
  "user": {
    "id": 313122677,
    "name": "Florian Hopf",
    "screen_name": "fhopf",
    "location": "Karlsruhe"
  }
}

Wichtige Felder für die Auswertung können das text-Feld sein, das den gesamten Text enthält. Datum, Hashtag und User-Informationen können über Aggregationen ausgewertet werden.

Elasticsearch-Output konfigurieren

Um die Daten wirklich zu indizieren konfigurieren wir den Elasticsearch-Output. Eine Konfiguration für Elasticsearch auf dem lokalen Host kann folgendermaßen aussehen.

output {
  elasticsearch {
    protocol => "http"
    host => "localhost"
    index => "twitter"
    document_type => "tweet"
    template => "twitter_template.json"
    template_name => "twitter"
  }
}

Wir geben an, dass wir über HTTP auf Elasticsearch auf demselben System über den Standardport 9200 zugreifen wollen. Als Indexname wird twitter vergeben, als Typ tweet. Damit die Daten sinnvoll gemappt werden können wird noch ein Index-Template unter dem Namen twitter registriert, das unter dem Namen twitter_template.json im selben Verzeichnis wie die Konfigurationsdatei liegen muss. In dieser Datei werden die Index-Einstellungen und das Mapping für den Typ beschrieben.

Im folgenden gehen wir die Datei nach und nach durch. Um eine korrekte Darstellung zu ermöglichen sind in den Blöcken öffnende und schließende Klammern angegeben die in der Originaldatei nicht vorkommen.

{
  "template": "twitter",
  "order":    1,
  "settings": {
    "number_of_shards": 1
  },

Zu Beginn wird der Name festgelegt, der dem Namen in der Konfiguration entsprechen muss. order beschreibt die Reihenfolge, die verwendet werden soll, wenn mehrere Index-Templates gefunden werden. Im settings-Block ist schließlich noch beschrieben, dass nur ein einzelner Shard für den Index verwendet werden soll.

{
  "mappings": {
    "tweet": {
      "_all": {
        "enabled": false
      }
    }
  }

Der Mapping-Bereich beginnt damit, dass das _all-Feld deaktiviert wird, da der Zugriff immer über Felder erfolgen kann.

    {
      "dynamic_templates" : [ {
         "message_field" : {
           "match" : "message",
           "match_mapping_type" : "string",
           "mapping" : {
             "type" : "string", "index" : "analyzed", "omit_norms" : true
           }
         }
       }, 
       {
         "string_fields" : {
           "match" : "*",
           "match_mapping_type" : "string",
           "mapping" : {
             "type" : "string", "index" : "analyzed", "omit_norms" : true,
               "fields" : {
                 "raw" : {"type": "string", "index" : "not_analyzed", "ignore_above" : 256}
               }
           }
         }
       } ]
    }

Über den interessanten Trick der dynamic_templates kann für Gruppen von Feldern ein Mapping hinterlegt werden. Im Beispiel sind zwei Gruppen hinterlegt, wobei message_field sich nur auf ein einzelnes Feld bezieht, das anders behandelt werden soll. Für alle anderen String-Felder wird ein zusätzliches Unterfeld namens raw hinzugefügt, das die ersten 256 Zeichen des Originalinhalts enthält. Diese Felder können dann vor allem von Aggregationen verwendet werden, die auf den Index-Termen arbeiten.

    {
      "properties": {
        "text": {
          "type": "string"
        },
          "coordinates": {
          "properties": {
             "coordinates": {
                "type": "geo_point"
             },
             "type": {
                "type": "string"
             }
          }
       }
      }
    }

Abschließend werden noch zwei weitere spezielle Felder konfiguriert: text enthält den gesamten Inhalt der Nachricht, coordinates die von Twitter übermittelten Geodaten. Durch die Konfiguration als geo_point können auch die Geofunktionalitäten von Elasitcsearch auf die Daten angewandt werden.

Um unsere Tweets in Elasticsearch zu testen können wir beispielsweise eine Terms-Aggregation auf dem Originalinhalt des Benutzernamens anfordern.

{
    "aggs": {
        "users": {
            "terms": {
                "field": "user.name.raw"    
            }
        }
    }
}

Nicht alle Aggregations-Beispiele im Buch lassen sich 1:1 auf die über Logstash indizierten Daten abbilden da über den Twitter-River einzelen Felder wie der Benutzername anders abgebildet werden. Ein Aufruf des Mappings unter /twitter/tweet/_mapping sollte allerdings als Anhaltspunkt ausreichen. Die Datenstruktur ist auf jeden Fall ähnlich genug zum Twitter-River.

Viel Spaß beim Aggregieren!