diff --git a/changelog.d/0-release-notes/elasticsearch-to-opensearch b/changelog.d/0-release-notes/elasticsearch-to-opensearch new file mode 100644 index 00000000000..733b90d7d07 --- /dev/null +++ b/changelog.d/0-release-notes/elasticsearch-to-opensearch @@ -0,0 +1,10 @@ +Switch from ElasticSearch 6.8 to OpenSearch 1.3 (we're testing with OpenSearch +1.3.19.) Please note that the APIs of ElasticSearch 6.8 and OpenSearch 1.3 are +not compatible. I.e., either a migration is required +(https://opensearch.org/docs/latest/upgrade-to/) or the indexes need to be +rebuilt from scratch. The latter is done automatically when the +`elasticsearch-index-create` job of the `elasticsearch-index` Helm chart faces +an OpenSearch instance where Wire's indexes are missing. Alternatively, this can +be started manually; see +https://docs.wire.com/developer/reference/elastic-search.html#refill-es-documents-from-cassandra +. Depending on the amount of users in the database, this may take long. diff --git a/charts/integration/values.yaml b/charts/integration/values.yaml index 36305b2be75..92bd8807a8c 100644 --- a/charts/integration/values.yaml +++ b/charts/integration/values.yaml @@ -113,7 +113,7 @@ config: replicationFactor: 1 elasticsearch: - host: elasticsearch-ephemeral + host: opensearch-cluster-master sqsEndpointUrl: http://fake-aws-sqs:4568 sesEndpointUrl: http://fake-aws-ses:4569 diff --git a/deploy/dockerephemeral/docker-compose.yaml b/deploy/dockerephemeral/docker-compose.yaml index debbdb32fa4..86c9e23fc42 100644 --- a/deploy/dockerephemeral/docker-compose.yaml +++ b/deploy/dockerephemeral/docker-compose.yaml @@ -204,15 +204,9 @@ services: redis: ipv4_address: 172.20.0.36 - elasticsearch: - container_name: demo_wire_elasticsearch - build: - context: . - dockerfile_inline: | - FROM quay.io/wire/elasticsearch:0.0.9-amd64 - RUN /usr/share/elasticsearch/bin/elasticsearch-plugin install x-pack -b - # this seems to be necessary to run X-Pack on Alpine (https://discuss.elastic.co/t/elasticsearch-failing-to-start-due-to-x-pack/85125/7) - RUN rm -rf /usr/share/elasticsearch/plugins/x-pack/platform/linux-x86_64 + opensearch: + container_name: opensearch + image: opensearchproject/opensearch:1.3.19 ulimits: nofile: soft: 65536 @@ -221,17 +215,37 @@ services: - "127.0.0.1:9200:9200" - "127.0.0.1:9300:9300" environment: - - "xpack.ml.enabled=false" - - "xpack.security.enabled=true" - - "xpack.security.http.ssl.enabled=true" - - "xpack.ssl.certificate=certs/elasticsearch-cert.pem" - - "xpack.ssl.key=certs/elasticsearch-key.pem" - "bootstrap.system_call_filter=false" - "JVM_OPTIONS_ES=-Xmx512m -Xms512m" - "discovery.type=single-node" + + - "DISABLE_INSTALL_DEMO_CONFIG=true" + - "OPENSEARCH_INITIAL_ADMIN_PASSWORD=Ch4ng3m3Secr3t!" + volumes: + - ./docker/elasticsearch-cert.pem:/usr/share/opensearch/config/certs/tls.crt + - ./docker/elasticsearch-key.pem:/usr/share/opensearch/config/certs/tls.key + - ./docker/elasticsearch-ca.pem:/usr/share/opensearch/config/certs/ca.crt + - ./docker/opensearch/opensearch.yml:/usr/share/opensearch/config/opensearch.yml + - ./docker/opensearch/opensearch-security/config.yml:/usr/share/opensearch/plugins/opensearch-security/securityconfig/config.yml + - ./docker/opensearch/opensearch-security/internal_users.yml:/usr/share/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml + - ./docker/opensearch/opensearch-security/roles_mapping.yml:/usr/share/opensearch/plugins/opensearch-security/securityconfig/roles_mapping.yml + - ./docker/opensearch/opensearch-security/allowlist.yml:/usr/share/opensearch/plugins/opensearch-security/securityconfig/allowlist.yml + - ./docker/opensearch/opensearch-security/roles.yml:/usr/share/opensearch/plugins/opensearch-security/securityconfig/roles.yml + - ./docker/opensearch/opensearch-security/nodes_dn.yml:/usr/share/opensearch/plugins/opensearch-security/securityconfig/nodes_dn.yml + - ./docker/opensearch/opensearch-security/action_groups.yml:/usr/share/opensearch/plugins/opensearch-security/securityconfig/action_groups.yml + - ./docker/opensearch/opensearch-security/tenants.yml:/usr/share/opensearch/plugins/opensearch-security/securityconfig/tenants.yml + networks: + - demo_wire + + opensearch-dashboard: + image: opensearchproject/opensearch-dashboards:1 + container_name: opensearch-dashboards + ports: + - 5601:5601 + expose: + - "5601" volumes: - - ./docker/elasticsearch-cert.pem:/usr/share/elasticsearch/config/certs/elasticsearch-cert.pem - - ./docker/elasticsearch-key.pem:/usr/share/elasticsearch/config/certs/elasticsearch-key.pem + - ./docker/opensearch/opensearch_dashboards.yml:/usr/share/opensearch-dashboards/config/opensearch_dashboards.yml networks: - demo_wire diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch-security/action_groups.yml b/deploy/dockerephemeral/docker/opensearch/opensearch-security/action_groups.yml new file mode 100644 index 00000000000..7c40612b836 --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch-security/action_groups.yml @@ -0,0 +1,3 @@ +_meta: + type: "actiongroups" + config_version: 2 diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch-security/allowlist.yml b/deploy/dockerephemeral/docker/opensearch/opensearch-security/allowlist.yml new file mode 100644 index 00000000000..dd09dc80656 --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch-security/allowlist.yml @@ -0,0 +1,6 @@ +_meta: + type: "allowlist" + config_version: 2 + +config: + enabled: false diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch-security/config.yml b/deploy/dockerephemeral/docker/opensearch/opensearch-security/config.yml new file mode 100644 index 00000000000..fdbeb97420b --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch-security/config.yml @@ -0,0 +1,17 @@ +_meta: + type: "config" + config_version: 2 + +config: + dynamic: + authc: + basic_internal_auth_domain: + description: "Authenticate using HTTP basic against the internal users database" + http_enabled: true + transport_enabled: true + order: 1 + http_authenticator: + type: basic + challenge: true + authentication_backend: + type: internal diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch-security/internal_users.yml b/deploy/dockerephemeral/docker/opensearch/opensearch-security/internal_users.yml new file mode 100644 index 00000000000..947738cf0dd --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch-security/internal_users.yml @@ -0,0 +1,10 @@ +_meta: + type: "internalusers" + config_version: 2 + +elastic: + hash: "$2y$12$GRc68jkEX1m4uQpTVbwURu79xHxZ7vsbyEctOAADQwPjlhYS4LJVa" + reserved: true + description: "Wire User" + backend_roles: + - index_manager diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch-security/nodes_dn.yml b/deploy/dockerephemeral/docker/opensearch/opensearch-security/nodes_dn.yml new file mode 100644 index 00000000000..09afda4a1f3 --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch-security/nodes_dn.yml @@ -0,0 +1,3 @@ +_meta: + type: "nodesdn" + config_version: 2 diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch-security/roles.yml b/deploy/dockerephemeral/docker/opensearch/opensearch-security/roles.yml new file mode 100644 index 00000000000..9bbe7b23f39 --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch-security/roles.yml @@ -0,0 +1,3 @@ +_meta: + type: "roles" + config_version: 2 diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch-security/roles_mapping.yml b/deploy/dockerephemeral/docker/opensearch/opensearch-security/roles_mapping.yml new file mode 100644 index 00000000000..e7627c3e67b --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch-security/roles_mapping.yml @@ -0,0 +1,9 @@ +_meta: + type: "rolesmapping" + config_version: 2 + +all_access: + reserved: false + backend_roles: + - index_manager + description: "Map index_manager to full_access" diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch-security/tenants.yml b/deploy/dockerephemeral/docker/opensearch/opensearch-security/tenants.yml new file mode 100644 index 00000000000..e9582d70b59 --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch-security/tenants.yml @@ -0,0 +1,3 @@ +_meta: + type: "tenants" + config_version: 2 diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch.yml b/deploy/dockerephemeral/docker/opensearch/opensearch.yml new file mode 100644 index 00000000000..b02910412b9 --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch.yml @@ -0,0 +1,45 @@ +cluster.name: opensearch-cluster + +# Bind to all interfaces because we don't know what IP address Docker will assign to us. +network.host: 0.0.0.0 + +# Setting network.host to a non-loopback address enables the annoying bootstrap checks. "Single-node" mode disables them again. +discovery.type: single-node + +path.data: /usr/share/opensearch/data + +# WARNING: This is not a production-ready config! (Good enough for testing, +# though.) +plugins: + security: + ssl: + transport: + pemcert_filepath: certs/tls.crt + pemkey_filepath: certs/tls.key + pemtrustedcas_filepath: certs/ca.crt + enforce_hostname_verification: false + http: + enabled: true + pemcert_filepath: certs/tls.crt + pemkey_filepath: certs/tls.key + pemtrustedcas_filepath: certs/ca.crt + allow_unsafe_democertificates: true + allow_default_init_securityindex: true + audit.type: internal_opensearch + restapi: + roles_enabled: ["all_access", "security_rest_api_access"] + system_indices: + enabled: true + indices: + [ + ".opendistro-alerting-config", + ".opendistro-alerting-alert*", + ".opendistro-anomaly-results*", + ".opendistro-anomaly-detector*", + ".opendistro-anomaly-checkpoints", + ".opendistro-anomaly-detection-state", + ".opendistro-reports-*", + ".opendistro-notifications-*", + ".opendistro-notebooks", + ".opendistro-asynchronous-search-response*", + ] diff --git a/deploy/dockerephemeral/docker/opensearch/opensearch_dashboards.yml b/deploy/dockerephemeral/docker/opensearch/opensearch_dashboards.yml new file mode 100644 index 00000000000..240fb646f57 --- /dev/null +++ b/deploy/dockerephemeral/docker/opensearch/opensearch_dashboards.yml @@ -0,0 +1,8 @@ +opensearch.hosts: [https://opensearch:9200] +opensearch.ssl.verificationMode: none +opensearch.username: elastic +opensearch.password: changeme + +# Use this setting if you are running opensearch-dashboards without https +opensearch_security.cookie.secure: false +server.host: '0.0.0.0' diff --git a/hack/bin/integration-setup-federation.sh b/hack/bin/integration-setup-federation.sh index d25ae2138dd..a520aff597c 100755 --- a/hack/bin/integration-setup-federation.sh +++ b/hack/bin/integration-setup-federation.sh @@ -53,6 +53,7 @@ echo "Installing charts..." set +e # This exists because we need to run `helmfile` with `--skip-deps`, without that it doesn't work. helm repo add bedag https://bedag.github.io/helm-charts/ +helm repo add opensearch https://opensearch-project.github.io/helm-charts/ helmfile --environment "$HELMFILE_ENV" --file "${TOP_LEVEL}/hack/helmfile.yaml" sync --skip-deps --concurrency 0 EXIT_CODE=$? diff --git a/hack/helm_vars/opensearch/values.yaml.gotmpl b/hack/helm_vars/opensearch/values.yaml.gotmpl new file mode 100644 index 00000000000..72fa275b163 --- /dev/null +++ b/hack/helm_vars/opensearch/values.yaml.gotmpl @@ -0,0 +1,192 @@ +singleNode: true + +# Helm labels and annotations are automatically added for these Kubernetes +# manifests. +extraObjects: + - apiVersion: cert-manager.io/v1 + kind: Certificate + metadata: + name: opensearch-cert + namespace: {{ .Release.Namespace }} + spec: + issuerRef: + name: elasticsearch + kind: Issuer + + usages: + - server auth + - client auth + duration: 2160h # 90d + renewBefore: 360h # 15d + isCA: false + secretName: opensearch-ephemeral-certificate + + privateKey: + algorithm: ECDSA + size: 384 + encoding: PKCS8 + rotationPolicy: Always + + dnsNames: + - opensearch-cluster-master + - opensearch-cluster-master.{{ .Release.Namespace }}.svc.cluster.local + + commonName: opensearch-cluster-master + +opensearchHome: /usr/share/opensearch + +config: + opensearch.yml: | + cluster.name: opensearch-cluster + + # Bind to all interfaces because we don't know what IP address Docker will assign to us. + network.host: 0.0.0.0 + + discovery.type: single-node + + action.auto_create_index: true + + # WARNING: This config is not meant to be used as prod setup! Revise all + # lines before you copy them. + plugins: + security: + nodes_dn: + - '/CN=opensearch-cluster-master.*/' + ssl: + transport: + pemcert_filepath: esnode.pem + pemkey_filepath: esnode-key.pem + pemtrustedcas_filepath: root-ca.pem + enforce_hostname_verification: false + http: + enabled: true + pemcert_filepath: esnode.pem + pemkey_filepath: esnode-key.pem + pemtrustedcas_filepath: root-ca.pem + allow_unsafe_democertificates: true + allow_default_init_securityindex: true + audit.type: internal_opensearch + enable_snapshot_restore_privilege: true + check_snapshot_restore_write_privileges: true + restapi: + roles_enabled: ["all_access", "security_rest_api_access"] + system_indices: + enabled: true + indices: + [ + ".opendistro-alerting-config", + ".opendistro-alerting-alert*", + ".opendistro-anomaly-results*", + ".opendistro-anomaly-detector*", + ".opendistro-anomaly-checkpoints", + ".opendistro-anomaly-detection-state", + ".opendistro-reports-*", + ".opendistro-notifications-*", + ".opendistro-notebooks", + ".opendistro-asynchronous-search-response*", + ] + +securityConfig: + enabled: true + # The path will be different for OpenSearch 2.x.x! + path: "/usr/share/opensearch/plugins/opensearch-security/securityconfig" + + # Configure one user with full access (this could be refined in future.) + # Credentials: elastic:changeme + config: + dataComplete: "true" + data: + config.yml: | + _meta: + type: "config" + config_version: 2 + + config: + dynamic: + authc: + basic_internal_auth_domain: + description: "Authenticate using HTTP basic against the internal users database" + http_enabled: true + transport_enabled: true + order: 1 + http_authenticator: + type: basic + challenge: true + authentication_backend: + type: internal + + internal_users.yml: | + _meta: + type: "internalusers" + config_version: 2 + + elastic: + hash: "$2y$12$GRc68jkEX1m4uQpTVbwURu79xHxZ7vsbyEctOAADQwPjlhYS4LJVa" + reserved: true + description: "Wire User" + backend_roles: + - index_manager + + roles_mapping.yml: | + _meta: + type: "rolesmapping" + config_version: 2 + + all_access: + reserved: false + backend_roles: + - index_manager + description: "Map index_manager to full_access" + + allowlist.yml: | + _meta: + type: "allowlist" + config_version: 2 + + config: + enabled: false + + roles.yml: | + _meta: + type: "roles" + config_version: 2 + + nodes_dn.yml: | + _meta: + type: "nodesdn" + config_version: 2 + + action_groups.yml: | + _meta: + type: "actiongroups" + config_version: 2 + + tenants.yml: | + _meta: + type: "tenants" + config_version: 2 + +extraEnvs: + - name: OPENSEARCH_INITIAL_ADMIN_PASSWORD + value: "Ch4ng3m3Secr3t!" + - name: DISABLE_INSTALL_DEMO_CONFIG + value: "true" + +persistence: + enabled: false + +secretMounts: + - name: node-pem + secretName: opensearch-ephemeral-certificate + path: /usr/share/opensearch/config/esnode.pem + subPath: tls.crt + + - name: node-key + secretName: opensearch-ephemeral-certificate + path: /usr/share/opensearch/config/esnode-key.pem + subPath: tls.key + + - name: root-cacert + secretName: opensearch-ephemeral-certificate + path: /usr/share/opensearch/config/root-ca.pem + subPath: ca.crt diff --git a/hack/helm_vars/wire-federation-v0/values.yaml.gotmpl b/hack/helm_vars/wire-federation-v0/values.yaml.gotmpl index c012a3b19f1..4b4ba6d8e84 100644 --- a/hack/helm_vars/wire-federation-v0/values.yaml.gotmpl +++ b/hack/helm_vars/wire-federation-v0/values.yaml.gotmpl @@ -21,7 +21,7 @@ cassandra-migrations: replicationFactor: 1 elasticsearch-index: elasticsearch: - host: elasticsearch-ephemeral + host: opensearch-cluster-master index: directory_test cassandra: host: cassandra-ephemeral @@ -41,7 +41,7 @@ brig: host: cassandra-ephemeral replicaCount: 1 elasticsearch: - host: elasticsearch-ephemeral + host: opensearch-cluster-master index: directory_test authSettings: userTokenTimeout: 120 diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index 83cd888dbf9..2bab8da5e43 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -28,10 +28,10 @@ elasticsearch-index: imagePullPolicy: {{ .Values.imagePullPolicy }} elasticsearch: scheme: https - host: elasticsearch-ephemeral + host: opensearch-cluster-master index: directory_test tlsCaSecretRef: - name: "elasticsearch-ephemeral-certificate" + name: "opensearch-ephemeral-certificate" key: "ca.crt" cassandra: host: {{ .Values.cassandraHost }} @@ -67,13 +67,13 @@ brig: {{- end }} elasticsearch: scheme: https - host: elasticsearch-ephemeral + host: opensearch-cluster-master index: directory_test tlsCaSecretRef: - name: "elasticsearch-ephemeral-certificate" + name: "opensearch-ephemeral-certificate" key: "ca.crt" additionalTlsCaSecretRef: - name: "elasticsearch-ephemeral-certificate" + name: "opensearch-ephemeral-certificate" key: "ca.crt" rabbitmq: port: 5671 @@ -531,7 +531,7 @@ integration: {{- end }} elasticsearch: tlsCaSecretRef: - name: "elasticsearch-ephemeral-certificate" + name: "opensearch-ephemeral-certificate" key: "ca.crt" redis: tlsCaSecretRef: diff --git a/hack/helmfile.yaml b/hack/helmfile.yaml index 3581c373a78..7068989d07f 100644 --- a/hack/helmfile.yaml +++ b/hack/helmfile.yaml @@ -58,6 +58,9 @@ repositories: - name: bedag url: 'https://bedag.github.io/helm-charts/' + - name: opensearch + url: 'https://opensearch-project.github.io/helm-charts/' + releases: - name: 'fake-aws' namespace: '{{ .Values.namespace1 }}' @@ -77,30 +80,66 @@ releases: values: - './helm_vars/certs/values.yaml.gotmpl' - - name: 'databases-ephemeral' + - name: 'redis-ephemeral' + namespace: '{{ .Values.namespace1 }}' + chart: '../.local/charts/redis-ephemeral' + values: + - redis-ephemeral: + usePassword: true + password: very-secure-redis-master-password + tls: + enabled: true + certificatesSecret: redis-certificate + certFilename: "tls.crt" + certKeyFilename: "tls.key" + certCAFilename: "ca.crt" + authClients: false + needs: + - certs + + - name: 'cassandra-ephemeral' namespace: '{{ .Values.namespace1 }}' - chart: '../.local/charts/databases-ephemeral' + chart: '../.local/charts/cassandra-ephemeral' + + - name: 'redis-ephemeral-2' + namespace: '{{ .Values.namespace2 }}' + chart: '../.local/charts/redis-ephemeral' values: - redis-ephemeral: - redis-ephemeral: - usePassword: true - password: very-secure-redis-master-password - tls: - enabled: true - certificatesSecret: redis-certificate - certFilename: "tls.crt" - certKeyFilename: "tls.key" - certCAFilename: "ca.crt" - authClients: false - elasticsearch-ephemeral: + usePassword: true + password: very-secure-redis-master-password tls: enabled: true - issuerRef: - name: elasticsearch - kind: Issuer + certificatesSecret: redis-certificate + certFilename: "tls.crt" + certKeyFilename: "tls.key" + certCAFilename: "ca.crt" + authClients: false needs: - certs + - name: 'cassandra-ephemeral-2' + namespace: '{{ .Values.namespace2 }}' + chart: '../.local/charts/cassandra-ephemeral' + + - name: 'opensearch-ephemeral' + namespace: '{{ .Values.namespace1 }}' + chart: 'opensearch/opensearch' + # The 1.x.x and 2.x.x chart versions belong to the OpenSearch versions 1.x.x + # and 2.x.x respectively. I.e. both strains are actively maintained. + version: "1.31.0" + values: + - './helm_vars/opensearch/values.yaml.gotmpl' + + - name: 'opensearch-ephemeral-2' + namespace: '{{ .Values.namespace2 }}' + chart: 'opensearch/opensearch' + # The 1.x.x and 2.x.x chart versions belong to the OpenSearch versions 1.x.x + # and 2.x.x respectively. I.e. both strains are actively maintained. + version: "1.31.0" + values: + - './helm_vars/opensearch/values.yaml.gotmpl' + # Required for testing redis migration - name: 'redis-ephemeral-2' namespace: '{{ .Values.namespace1 }}' @@ -117,30 +156,6 @@ releases: values: - './helm_vars/certs/values.yaml.gotmpl' - - name: 'databases-ephemeral' - namespace: '{{ .Values.namespace2 }}' - chart: '../.local/charts/databases-ephemeral' - values: - - redis-ephemeral: - redis-ephemeral: - usePassword: true - password: very-secure-redis-master-password - tls: - enabled: true - certificatesSecret: redis-certificate - certFilename: "tls.crt" - certKeyFilename: "tls.key" - certCAFilename: "ca.crt" - authClients: false - elasticsearch-ephemeral: - tls: - enabled: true - issuerRef: - name: elasticsearch - kind: Issuer - needs: - - certs - - name: k8ssandra-test-cluster chart: '../.local/charts/k8ssandra-test-cluster' namespace: '{{ .Values.namespace1 }}' @@ -230,7 +245,9 @@ releases: - name: cargohold.config.settings.federationDomain value: {{ .Values.federationDomain1 }} needs: - - 'databases-ephemeral' + - 'opensearch-ephemeral' + - 'cassandra-ephemeral' + - 'redis-ephemeral' - name: 'wire-server' namespace: '{{ .Values.namespace2 }}' @@ -246,4 +263,6 @@ releases: - name: cargohold.config.settings.federationDomain value: {{ .Values.federationDomain2 }} needs: - - 'databases-ephemeral' + - 'opensearch-ephemeral-2' + - 'cassandra-ephemeral-2' + - 'redis-ephemeral-2' diff --git a/integration/test/Test/FeatureFlags/SearchVisibilityInbound.hs b/integration/test/Test/FeatureFlags/SearchVisibilityInbound.hs index 55d40c5c2f7..b8a909018f6 100644 --- a/integration/test/Test/FeatureFlags/SearchVisibilityInbound.hs +++ b/integration/test/Test/FeatureFlags/SearchVisibilityInbound.hs @@ -2,6 +2,7 @@ module Test.FeatureFlags.SearchVisibilityInbound where import qualified API.Galley as Public import qualified API.GalleyInternal as Internal +import Control.Retry import SetupHelpers import Test.FeatureFlags.Util import Testlib.Prelude @@ -29,4 +30,6 @@ testSearchVisibilityInboundInternal access = do void $ withWebSocket alice $ \ws -> do setFlag access ws tid featureName enabled - setFlag access ws tid featureName disabled + -- Wait until the change is reflected in OpenSearch. + recoverAll (exponentialBackoff 500000 <> limitRetries 5) + $ \_ -> setFlag access ws tid featureName disabled diff --git a/libs/wire-subsystems/src/Wire/IndexedUserStore.hs b/libs/wire-subsystems/src/Wire/IndexedUserStore.hs index c3fe401f4f8..12dc1f078aa 100644 --- a/libs/wire-subsystems/src/Wire/IndexedUserStore.hs +++ b/libs/wire-subsystems/src/Wire/IndexedUserStore.hs @@ -12,8 +12,8 @@ import Wire.API.User.Search import Wire.UserSearch.Types data IndexedUserStoreError - = IndexUpdateError ES.EsError - | IndexLookupError ES.EsError + = IndexUpdateError (Either ES.EsProtocolException ES.EsError) + | IndexLookupError (Either ES.EsProtocolException ES.EsError) | IndexError Text deriving (Show) diff --git a/libs/wire-subsystems/src/Wire/IndexedUserStore/ElasticSearch.hs b/libs/wire-subsystems/src/Wire/IndexedUserStore/ElasticSearch.hs index 6f8dd26e89f..78928780bdf 100644 --- a/libs/wire-subsystems/src/Wire/IndexedUserStore/ElasticSearch.hs +++ b/libs/wire-subsystems/src/Wire/IndexedUserStore/ElasticSearch.hs @@ -2,7 +2,7 @@ module Wire.IndexedUserStore.ElasticSearch where -import Control.Error (lastMay) +import Control.Error (ExceptT (..), lastMay, runExceptT) import Control.Exception (throwIO) import Data.Aeson import Data.Aeson.Key qualified as Key @@ -13,7 +13,9 @@ import Data.Id import Data.Text qualified as Text import Data.Text.Ascii import Data.Text.Encoding qualified as Text +import Database.Bloodhound (BHResponse (BHResponse)) import Database.Bloodhound qualified as ES +import Database.Bloodhound.Common.Requests qualified as ESR import Imports import Network.HTTP.Client import Network.HTTP.Types @@ -65,7 +67,7 @@ getTeamSizeImpl :: getTeamSizeImpl cfg tid = do let indexName = cfg.conn.indexName countResEither <- embed $ ES.runBH cfg.conn.env $ ES.countByIndex indexName (ES.CountQuery query) - countRes <- either (liftIO . throwIO . IndexLookupError) pure countResEither + countRes <- either (liftIO . throwIO . IndexLookupError . Right) pure countResEither pure . TeamSize $ ES.crCount countRes where query = @@ -91,31 +93,45 @@ upsertImpl cfg docId userDoc versioning = do where indexDoc :: ES.IndexName -> ES.BH (Sem r) () indexDoc idx = do - r <- ES.indexDocument idx mappingName settings userDoc docId + r <- + hoistBH (embed @IO) $ + ES.performBHRequest . fmap fst . ES.keepBHResponse $ + ESR.indexDocument idx settings userDoc docId unless (ES.isSuccess r || ES.isVersionConflict r) $ do lift $ Metrics.incCounter indexUpdateErrorCounter - res <- liftIO $ ES.parseEsResponse r - liftIO . throwIO . IndexUpdateError . either id id $ res + liftIO . throwIO . IndexUpdateError $ parseESError r lift $ Metrics.incCounter indexUpdateSuccessCounter settings = ES.defaultIndexDocumentSettings {ES.idsVersionControl = versioning} +hoistBH :: (forall x. m x -> n x) -> ES.BH m a -> ES.BH n a +hoistBH nat (ES.BH action) = ES.BH $ hoistReaderT (hoistExceptT nat) action + +hoistReaderT :: (forall x. m x -> n x) -> ReaderT r m a -> ReaderT r n a +hoistReaderT nat (ReaderT f) = ReaderT $ \r -> nat (f r) + +-- Hoist a natural transformation from m to n through ExceptT +hoistExceptT :: (forall x. m x -> n x) -> ExceptT e m a -> ExceptT e n a +hoistExceptT nat (ExceptT ema) = ExceptT (nat ema) + +castResponse :: forall context1 val1 context2 val2. BHResponse context1 val1 -> BHResponse context2 val2 +castResponse BHResponse {..} = BHResponse {..} + updateTeamSearchVisibilityInboundImpl :: forall r. (Member (Embed IO) r) => IndexedUserStoreConfig -> TeamId -> SearchVisibilityInbound -> Sem r () updateTeamSearchVisibilityInboundImpl cfg tid vis = void $ runInBothES cfg updateAllDocs where updateAllDocs :: ES.IndexName -> ES.BH (Sem r) () updateAllDocs idx = do - r <- ES.updateByQuery idx query (Just script) + r <- hoistBH (embed @IO) $ ES.performBHRequest . fmap fst . ES.keepBHResponse $ ESR.updateByQuery @Value idx query (Just script) unless (ES.isSuccess r || ES.isVersionConflict r) $ do - res <- liftIO $ ES.parseEsResponse r - liftIO . throwIO . IndexUpdateError . either id id $ res + liftIO . throwIO . IndexUpdateError $ parseESError r query :: ES.Query query = ES.TermQuery (ES.Term "team" $ idToText tid) Nothing script :: ES.Script - script = ES.Script (Just (ES.ScriptLanguage "painless")) (Just (ES.ScriptInline scriptText)) Nothing Nothing + script = ES.Script (Just (ES.ScriptLanguage "painless")) (ES.ScriptInline scriptText) Nothing -- Unfortunately ES disallows updating ctx._version with a "Update By Query" scriptText = @@ -128,10 +144,9 @@ updateTeamSearchVisibilityInboundImpl cfg tid vis = bulkUpsertImpl :: (Member (Embed IO) r) => IndexedUserStoreConfig -> [(ES.DocId, UserDoc, ES.VersionControl)] -> Sem r () bulkUpsertImpl cfg docs = do let bhe = cfg.conn.env - ES.IndexName idx = cfg.conn.indexName - ES.MappingName mpp = mappingName + idx = ES.unIndexName cfg.conn.indexName (ES.Server base) = ES.bhServer bhe - baseReq <- embed $ parseRequest (Text.unpack $ base <> "/" <> idx <> "/" <> mpp <> "/_bulk") + baseReq <- embed $ parseRequest (Text.unpack $ base <> "/" <> idx <> "/_bulk") let reqWithoutCreds = baseReq { method = "POST", @@ -139,10 +154,9 @@ bulkUpsertImpl cfg docs = do requestBody = RequestBodyLBS (toLazyByteString (foldMap encodeActionAndData docs)) } req <- embed $ bhe.bhRequestHook reqWithoutCreds - res <- embed $ httpLbs req (ES.bhManager bhe) + res <- fmap (BHResponse @ES.StatusDependant @ES.BulkResponse) . embed $ httpLbs req (ES.bhManager bhe) unless (ES.isSuccess res) $ do - parsedRes <- liftIO $ ES.parseEsResponse res - liftIO . throwIO . IndexUpdateError . either id id $ parsedRes + liftIO . throwIO . IndexUpdateError $ parseESError res where encodeJSONToString :: (ToJSON a) => a -> Builder encodeJSONToString = fromEncoding . toEncoding @@ -171,8 +185,11 @@ bulkUpsertImpl cfg docs = do ] ] +parseESError :: BHResponse context a -> Either ES.EsProtocolException ES.EsError +parseESError res = either id id <$> ES.parseEsResponse (castResponse @_ @_ @_ @ES.EsError res) + doesIndexExistImpl :: (Member (Embed IO) r) => IndexedUserStoreConfig -> Sem r Bool -doesIndexExistImpl cfg = do +doesIndexExistImpl cfg = embed $ do (mainExists, fromMaybe True -> additionalExists) <- runInBothES cfg ES.indexExists pure $ mainExists && additionalExists @@ -264,7 +281,7 @@ paginateTeamMembersImpl cfg BrowseTeamFilters {..} maxResults mPagingState = do mps = fromSearchAfterKey <$> lastMay (mapMaybe ES.hitSort hits) results = mapMaybe ES.hitSource hits in SearchResult - { searchFound = ES.hitsTotal . ES.searchHits $ es, + { searchFound = es.searchHits.hitsTotal.value, searchReturned = length results, searchTook = ES.took es, searchResults = results, @@ -274,11 +291,9 @@ paginateTeamMembersImpl cfg BrowseTeamFilters {..} maxResults mPagingState = do } searchInMainIndex :: forall r. (Member (Embed IO) r) => IndexedUserStoreConfig -> ES.Search -> Sem r (ES.SearchResult UserDoc) -searchInMainIndex cfg search = do - r <- ES.runBH cfg.conn.env $ do - res <- ES.searchByType cfg.conn.indexName mappingName search - liftIO $ ES.parseEsResponse res - either (embed . throwIO . IndexLookupError) pure r +searchInMainIndex cfg search = embed $ do + r <- ES.runBH cfg.conn.env $ ES.searchByIndex @UserDoc cfg.conn.indexName search + either (throwIO . IndexLookupError . Right) pure r queryIndex :: (Member (Embed IO) r) => @@ -293,7 +308,7 @@ queryIndex cfg s (IndexQuery q f _) = do mkResult es = let results = mapMaybe ES.hitSource . ES.hits . ES.searchHits $ es in SearchResult - { searchFound = ES.hitsTotal . ES.searchHits $ es, + { searchFound = es.searchHits.hitsTotal.value, searchReturned = length results, searchTook = ES.took es, searchResults = results, @@ -423,7 +438,7 @@ termQ :: Text -> Text -> ES.Query termQ f v = ES.TermQuery ES.Term - { ES.termField = f, + { ES.termField = Key.fromText f, ES.termValue = v } Nothing @@ -487,7 +502,7 @@ matchTeamMembersSearchableByAllTeams = boolQuery { ES.boolQueryMustMatch = [ ES.QueryExistsQuery $ ES.FieldName "team", - ES.TermQuery (ES.Term (Key.toText searchVisibilityInboundFieldName) "searchable-by-all-teams") Nothing + ES.TermQuery (ES.Term searchVisibilityInboundFieldName "searchable-by-all-teams") Nothing ] } @@ -508,15 +523,13 @@ matchUsersNotInTeam tid = -------------------------------------------- -- Utils -runInBothES :: (Monad m) => IndexedUserStoreConfig -> (ES.IndexName -> ES.BH m a) -> m (a, Maybe a) -runInBothES cfg f = do - x <- ES.runBH cfg.conn.env $ f cfg.conn.indexName - y <- forM cfg.additionalConn $ \additional -> - ES.runBH additional.env $ f additional.indexName - pure (x, y) - -mappingName :: ES.MappingName -mappingName = ES.MappingName "user" +runInBothES :: forall m a. (MonadIO m) => IndexedUserStoreConfig -> (ES.IndexName -> ES.BH m a) -> m (a, Maybe a) +runInBothES cfg f = + either (liftIO . throwIO) pure =<< runExceptT do + x <- ExceptT $ ES.runBH cfg.conn.env $ f cfg.conn.indexName + y <- forM @Maybe @(ExceptT ES.EsError m) cfg.additionalConn $ \additional -> + ExceptT $ ES.runBH additional.env $ f additional.indexName + pure (x, y) boolQuery :: ES.BoolQuery boolQuery = ES.mkBoolQuery [] [] [] [] diff --git a/libs/wire-subsystems/src/Wire/IndexedUserStore/MigrationStore/ElasticSearch.hs b/libs/wire-subsystems/src/Wire/IndexedUserStore/MigrationStore/ElasticSearch.hs index 9532a54246c..5d067a09939 100644 --- a/libs/wire-subsystems/src/Wire/IndexedUserStore/MigrationStore/ElasticSearch.hs +++ b/libs/wire-subsystems/src/Wire/IndexedUserStore/MigrationStore/ElasticSearch.hs @@ -1,8 +1,15 @@ +-- 'putMapping' is incorrectly deprecated in bloodhound +{-# OPTIONS_GHC -fno-warn-deprecations #-} + module Wire.IndexedUserStore.MigrationStore.ElasticSearch where import Data.Aeson +import Data.ByteString.Char8 qualified as BS +import Data.Either import Data.Text qualified as Text +import Data.Text.Encoding (encodeUtf8) import Database.Bloodhound qualified as ES +import Database.Bloodhound.Common.Requests qualified as ESR import Imports import Polysemy import Polysemy.Error @@ -20,24 +27,35 @@ interpretIndexedUserMigrationStoreES env = interpret $ \case ensureMigrationIndexImpl :: (Member TinyLog r, Member (Embed IO) r, Member (Error MigrationException) r) => ES.BHEnv -> Sem r () ensureMigrationIndexImpl env = do - unlessM (ES.runBH env $ ES.indexExists migrationIndexName) $ do + exists <- + either (\l -> logAndThrow CreateMigrationIndexFailed l) pure + <$> liftIO (ES.runBH env (ES.indexExists migrationIndexName)) + unlessM exists $ do Log.info $ Log.msg (Log.val "Creating migrations index, used for tracking which migrations have run") - ES.runBH env (ES.createIndexWith [] 1 migrationIndexName) + liftIO (ES.runBH env . ES.performBHRequest . ES.keepBHResponse $ (ESR.createIndexWith [] 1 migrationIndexName)) >>= throwIfNotCreated CreateMigrationIndexFailed - ES.runBH env (ES.putMapping migrationIndexName migrationMappingName migrationIndexMapping) + liftIO (ES.runBH env . ES.performBHRequest . ES.keepBHResponse $ (ESR.putMapping @Value migrationIndexName migrationIndexMapping)) >>= throwIfNotCreated PutMappingFailed where - throwIfNotCreated mkErr response = - unless (ES.isSuccess response) $ - throw $ - mkErr (show response) + throwIfNotCreated :: (Member TinyLog r, Member (Error MigrationException) r) => (String -> MigrationException) -> Either ES.EsError (ES.BHResponse a b, c) -> Sem r () + throwIfNotCreated mkErr (Left e) = logAndThrow mkErr e + throwIfNotCreated mkErr (Right (resp, _)) = + if ES.isSuccess resp + then pure () + else logAndThrow mkErr resp + + logAndThrow :: (Member TinyLog r, Member (Error MigrationException) r, Show e) => (String -> MigrationException) -> e -> Sem r a + logAndThrow mkErr errMsg = do + Log.warn $ + Log.msg (Log.val ("An OpenSearch/ElasticSearch error appeared: " `BS.append` (encodeUtf8 . Text.pack . show) errMsg)) + throw $ + mkErr (show errMsg) getLatestMigrationVersionImpl :: (Member (Embed IO) r, Member (Error MigrationException) r) => ES.BHEnv -> Sem r MigrationVersion getLatestMigrationVersionImpl env = do - reply <- ES.runBH env $ ES.searchByIndex migrationIndexName (ES.mkSearch Nothing Nothing) - resp <- liftIO $ ES.parseEsResponse reply - result <- either (throw . FetchMigrationVersionsFailed . show) pure resp + reply <- liftIO $ ES.runBH env $ ES.searchByIndex @MigrationVersion migrationIndexName (ES.mkSearch Nothing Nothing) + result <- either (throw . FetchMigrationVersionsFailed . show) pure reply let versions = map ES.hitSource $ ES.hits . ES.searchHits $ result case versions of [] -> @@ -49,20 +67,21 @@ getLatestMigrationVersionImpl env = do persistMigrationVersionImpl :: (Member (Embed IO) r, Member TinyLog r, Member (Error MigrationException) r) => ES.BHEnv -> MigrationVersion -> Sem r () persistMigrationVersionImpl env v = do - let docId = ES.DocId . Text.pack . show $ migrationVersion v - persistResponse <- ES.runBH env $ ES.indexDocument migrationIndexName migrationMappingName ES.defaultIndexDocumentSettings v docId - if ES.isCreated persistResponse - then do - Log.info $ - Log.msg (Log.val "Migration success recorded") - . Log.field "migrationVersion" v - else throw $ PersistVersionFailed v $ show persistResponse + let docIdText = Text.pack . show $ migrationVersion v + docId = ES.DocId docIdText + persistResponse <- liftIO $ ES.runBH env $ ES.indexDocument migrationIndexName ES.defaultIndexDocumentSettings v docId + case persistResponse of + Left _ -> throw $ PersistVersionFailed v $ show persistResponse + Right r -> + if ES.idxDocId r == docIdText + then do + Log.info $ + Log.msg (Log.val "Migration success recorded") + . Log.field "migrationVersion" v + else throw $ PersistVersionFailed v $ show persistResponse migrationIndexName :: ES.IndexName -migrationIndexName = ES.IndexName "wire_brig_migrations" - -migrationMappingName :: ES.MappingName -migrationMappingName = ES.MappingName "wire_brig_migrations" +migrationIndexName = [ES.qqIndexName|wire_brig_migrations|] migrationIndexMapping :: Value migrationIndexMapping = diff --git a/nix/haskell-pins.nix b/nix/haskell-pins.nix index 810bdbaa8f0..7112e003599 100644 --- a/nix/haskell-pins.nix +++ b/nix/haskell-pins.nix @@ -94,9 +94,10 @@ let bloodhound = { src = fetchgit { - url = "https://github.com/wireapp/bloodhound"; - rev = "abf819a4a6ec7601f1e58cb8da13b2fdad377d9e"; - hash = "sha256-m1O+F/mOJN5z5WNChmeyHP4dtmLRkl2YnLlTuwzRelk="; + url = "https://github.com/bitemyapp/bloodhound"; + # v0.23.0.0 + rev = "76a27a44223e8c24ec2c6a13504ea671887f2672"; + hash = "sha256-R3uemM/4rc6hwsT/bDimMDmxzb+nKlmTxJu4DsKeXC8="; }; }; diff --git a/services/brig/brig.cabal b/services/brig/brig.cabal index 103cfd80c8a..702f4b2a18b 100644 --- a/services/brig/brig.cabal +++ b/services/brig/brig.cabal @@ -221,7 +221,7 @@ library , base16-bytestring >=0.1 , base64-bytestring >=1.0 , bilge >=0.21.1 - , bloodhound >=0.13 + , bloodhound , brig-types >=0.91.1 , bytestring >=0.10 , bytestring-conversion >=0.2 diff --git a/services/brig/src/Brig/Index/Eval.hs b/services/brig/src/Brig/Index/Eval.hs index 64dd9ebca59..68f9a3bf988 100644 --- a/services/brig/src/Brig/Index/Eval.hs +++ b/services/brig/src/Brig/Index/Eval.hs @@ -36,8 +36,9 @@ import Data.Aeson qualified as Aeson import Data.ByteString.Lazy.UTF8 qualified as UTF8 import Data.Credentials (Credentials (..)) import Data.Id +import Database.Bloodhound (tryPerformBHRequest) import Database.Bloodhound qualified as ES -import Database.Bloodhound.Internal.Client (BHEnv (..)) +import Database.Bloodhound.Common.Requests qualified as ESR import Imports import Polysemy import Polysemy.Embed (runEmbedded) @@ -106,7 +107,7 @@ runSem esConn cas galleyEndpoint logger action = do mEsCreds :: Maybe Credentials <- for esConn.esCredentials initCredentials casClient <- defInitCassandra (toCassandraOpts cas) logger let bhEnv = - BHEnv + ES.BHEnv { bhServer = toESServer esConn.esServer, bhManager = mgr, bhRequestHook = maybe pure (\creds -> ES.basicAuthHook (ES.EsUsername creds.username) (ES.EsPassword creds.password)) mEsCreds @@ -178,7 +179,7 @@ runCommand l = \case (reindexSettings ^. reindexEsConnection . to esCaCert) mCreds <- for (reindexSettings ^. reindexEsConnection . to esCredentials) initCredentials let bhEnv = initES (reindexSettings ^. reindexEsConnection . to esServer) mgr mCreds - ES.runBH bhEnv $ do + esRes <- (ES.runBH bhEnv) $ do let src = reindexSettings ^. reindexEsConnection . to esIndex dest = view reindexDestIndex reindexSettings timeoutSeconds = view reindexTimeoutSeconds reindexSettings @@ -192,13 +193,14 @@ runCommand l = \case throwM $ ReindexFromAnotherIndexError $ "Destination index " <> show dest <> " doesn't exist" Log.info l $ Log.msg ("Reindexing" :: ByteString) . Log.field "from" (show src) . Log.field "to" (show dest) - eitherTaskNodeId <- ES.reindexAsync $ ES.mkReindexRequest src dest + eitherTaskNodeId <- tryPerformBHRequest $ ESR.reindexAsync $ ES.mkReindexRequest src dest case eitherTaskNodeId of Left e -> throwM $ ReindexFromAnotherIndexError $ "Error occurred while running reindex: " <> show e Right taskNodeId -> do Log.info l $ Log.field "taskNodeId" (show taskNodeId) waitForTaskToComplete @ES.ReindexResponse timeoutSeconds taskNodeId Log.info l $ Log.msg ("Finished reindexing" :: ByteString) + either throwM pure esRes where initIndex :: ESConnectionSettings -> Endpoint -> IO IndexEnv initIndex esConn gly = do @@ -223,12 +225,12 @@ runCommand l = \case let env = ES.mkBHEnv (toESServer esURI) mgr in maybe env (\(creds :: Credentials) -> env {ES.bhRequestHook = ES.basicAuthHook (ES.EsUsername creds.username) (ES.EsPassword creds.password)}) mCreds -waitForTaskToComplete :: forall a m. (ES.MonadBH m, MonadThrow m, FromJSON a) => Int -> ES.TaskNodeId -> m () +waitForTaskToComplete :: forall a m. (ES.MonadBH m, FromJSON a) => Int -> ES.TaskNodeId -> m () waitForTaskToComplete timeoutSeconds taskNodeId = do -- Delay is 0.1 seconds, so retries are limited to timeoutSeconds * 10 let policy = constantDelay 100000 <> limitRetries (timeoutSeconds * 10) let retryCondition _ = fmap not . isTaskComplete - taskEither <- retrying policy retryCondition (const $ ES.getTask @m @a taskNodeId) + taskEither <- retrying policy retryCondition (const . ES.tryPerformBHRequest $ ESR.getTask @a taskNodeId) task <- either errTaskGet pure taskEither unless (ES.taskResponseCompleted task) $ do throwM $ ReindexFromAnotherIndexError $ "Timed out waiting for task: " <> show taskNodeId diff --git a/services/brig/src/Brig/Index/Options.hs b/services/brig/src/Brig/Index/Options.hs index f9f382c0043..dc74ff8e0d2 100644 --- a/services/brig/src/Brig/Index/Options.hs +++ b/services/brig/src/Brig/Index/Options.hs @@ -134,7 +134,7 @@ localElasticSettings = { _esConnection = ESConnectionSettings { esServer = [uri|https://localhost:9200|], - esIndex = ES.IndexName "directory_test", + esIndex = [ES.qqIndexName|directory_test|], esCaCert = Just "test/resources/elasticsearch-ca.pem", esInsecureSkipVerifyTls = False, esCredentials = Just "test/resources/elasticsearch-credentials.yaml" @@ -188,29 +188,35 @@ restrictedElasticSettingsParser = do { _esConnection = localElasticSettings._esConnection { esServer = server, - esIndex = ES.IndexName (prefix <> "_test"), + esIndex = mkIndexName (prefix <> "_test"), esCredentials = mCreds, esCaCert = mCaCert, esInsecureSkipVerifyTls = verifyCa } } -indexNameParser :: Parser ES.IndexName +indexNameParser :: Parser String indexNameParser = - ES.IndexName . view packed - <$> strOption - ( long "elasticsearch-index" - <> metavar "STRING" - <> help "Elasticsearch Index Name." - <> value (view (_IndexName . unpacked) localElasticSettings._esConnection.esIndex) - <> showDefault - ) + strOption + ( long "elasticsearch-index" + <> metavar "STRING" + <> help "Elasticsearch Index Name." + <> value + ( Text.unpack + ( ES.unIndexName (localElasticSettings._esConnection.esIndex) + ) + ) + <> showDefault + ) + +mkIndexName :: String -> ES.IndexName +mkIndexName = either (error "invalid index name") id . ES.mkIndexName . Text.pack connectionSettingsParser :: Parser ESConnectionSettings connectionSettingsParser = ESConnectionSettings <$> elasticServerParser - <*> indexNameParser + <*> fmap mkIndexName indexNameParser <*> caCertParser <*> verifyCaParser <*> credentialsPathParser @@ -332,13 +338,14 @@ reindexToAnotherIndexSettingsParser :: Parser ReindexFromAnotherIndexSettings reindexToAnotherIndexSettingsParser = ReindexFromAnotherIndexSettings <$> connectionSettingsParser - <*> ( ES.IndexName . view packed - <$> strOption - ( long "destination-index" - <> metavar "STRING" - <> help "Elasticsearch index name to reindex to" - ) - ) + <*> fmap + mkIndexName + ( strOption + ( long "destination-index" + <> metavar "STRING" + <> help "Elasticsearch index name to reindex to" + ) + ) <*> option auto ( long "timeout" @@ -416,9 +423,6 @@ commandParser = ) ) -_IndexName :: Iso' ES.IndexName Text -_IndexName = iso (\(ES.IndexName n) -> n) ES.IndexName - _Keyspace :: Iso' C.Keyspace Text _Keyspace = iso C.unKeyspace C.Keyspace diff --git a/services/brig/src/Brig/User/Search/Index.hs b/services/brig/src/Brig/User/Search/Index.hs index a067b296324..f1ea27c816f 100644 --- a/services/brig/src/Brig/User/Search/Index.hs +++ b/services/brig/src/Brig/User/Search/Index.hs @@ -1,5 +1,6 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE StrictData #-} +{-# OPTIONS_GHC -Wno-deprecations #-} -- This file is part of the Wire Server implementation. -- @@ -36,15 +37,14 @@ module Brig.User.Search.Index -- * Re-exports ES.IndexSettings (..), - ES.IndexName (..), + ES.IndexName, ) where import Bilge.IO (MonadHttp) import Bilge.IO qualified as RPC import Brig.Index.Types (CreateIndexSettings (..)) -import Control.Lens hiding ((#), (.=)) -import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow, throwM) +import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow, catch, throwM) import Control.Monad.Except import Data.Aeson as Aeson import Data.Credentials @@ -52,7 +52,9 @@ import Data.Id import Data.Map qualified as Map import Data.Text qualified as Text import Data.Text.Encoding +import Database.Bloodhound (BHResponse (getResponse)) import Database.Bloodhound qualified as ES +import Database.Bloodhound.Common.Requests qualified as ESR import Imports hiding (log, searchable) import Network.HTTP.Client hiding (host, path, port) import Network.HTTP.Types (statusCode) @@ -112,7 +114,11 @@ instance MonadLogger (ExceptT e IndexIO) where log l m = lift (log l m) instance ES.MonadBH IndexIO where - getBHEnv = asks idxElastic + dispatch req = do + bhEnv <- asks idxElastic + either throwM pure =<< ES.runBH bhEnv (ES.dispatch req) + tryEsError action = (Right <$> action) `catch` \e -> pure (Left e) + throwEsError = throwM instance MonadHttp IndexIO where handleRequestWithCont req handler = do @@ -167,16 +173,16 @@ createIndex' failIfExists (CreateIndexSettings settings shardCount mbDeleteTempl ( encodeUtf8 ("Delete index template " <> "\"" <> tname <> "\"") ) - $ ES.deleteTemplate templateName + $ fmap fst (ES.performBHRequest $ ES.keepBHResponse $ ESR.deleteTemplate templateName) unless (ES.isSuccess dr) $ throwM (IndexError "Deleting index template failed.") - cr <- traceES "Create index" $ ES.createIndexWith fullSettings shardCount idx + cr <- traceES "Create index" $ fmap fst (ES.performBHRequest $ ES.keepBHResponse $ ESR.createIndexWith fullSettings shardCount idx) unless (ES.isSuccess cr) $ throwM (IndexError "Index creation failed.") mr <- traceES "Put mapping" $ - ES.putMapping idx (ES.MappingName "user") indexMapping + fmap fst (ES.performBHRequest $ ES.keepBHResponse $ ESR.putMapping @Value idx indexMapping) unless (ES.isSuccess mr) $ throwM (IndexError "Put Mapping failed.") @@ -205,7 +211,7 @@ updateMapping = liftIndexIO $ do -- https://github.com/wireapp/wire-server-deploy/blob/92311d189818ffc5e26ff589f81b95c95de8722c/charts/elasticsearch-index/templates/create-index.yaml void $ traceES "Put mapping" $ - ES.putMapping idx (ES.MappingName "user") indexMapping + fmap fst (ES.performBHRequest $ ES.keepBHResponse $ ESR.putMapping @Value idx indexMapping) resetIndex :: (MonadIndexIO m) => @@ -213,22 +219,21 @@ resetIndex :: m () resetIndex ciSettings = liftIndexIO $ do idx <- asks idxName - gone <- - ES.indexExists idx >>= \case - True -> ES.isSuccess <$> traceES "Delete Index" (ES.deleteIndex idx) - False -> pure True - if gone - then createIndex ciSettings - else throwM (IndexError "Index deletion failed.") + ES.indexExists idx >>= \case + True -> do + info $ msg ("Delete Index" :: String) + void $ ES.deleteIndex idx + False -> pure () + createIndex ciSettings -------------------------------------------------------------------------------- -- Internal -traceES :: (MonadIndexIO m) => ByteString -> IndexIO ES.Reply -> m ES.Reply +traceES :: (MonadIndexIO m) => ByteString -> IndexIO (ES.BHResponse contextualized body) -> m (ES.BHResponse contextualized body) traceES descr act = liftIndexIO $ do info (msg descr) r <- act - info . msg $ (r & statusCode . responseStatus) +++ val " - " +++ responseBody r + info . msg $ (statusCode . responseStatus $ getResponse r) +++ val " - " +++ responseBody (getResponse r) pure r -- | This mapping defines how elasticsearch will treat each field in a document. Here diff --git a/services/brig/src/Brig/User/Search/SearchIndex.hs b/services/brig/src/Brig/User/Search/SearchIndex.hs index f45006c8387..9634fb41b33 100644 --- a/services/brig/src/Brig/User/Search/SearchIndex.hs +++ b/services/brig/src/Brig/User/Search/SearchIndex.hs @@ -34,11 +34,11 @@ import Data.Handle (Handle (fromHandle)) import Data.Id import Data.Qualified (Qualified (Qualified)) import Database.Bloodhound qualified as ES +import Database.Bloodhound.Common.Requests as ESR import Imports hiding (log, searchable) import Wire.API.User (ColourId (..), Name (fromName)) import Wire.API.User.Search import Wire.IndexedUserStore (IndexedUserStoreError (..)) -import Wire.IndexedUserStore.ElasticSearch (mappingName) import Wire.UserSearch.Types import Wire.UserStore.IndexUser (normalized) @@ -77,15 +77,17 @@ queryIndex (IndexQuery q f _) s = do liftIndexIO $ do idx <- asks idxName let search = (ES.mkSearch (Just q) (Just f)) {ES.size = ES.Size (fromIntegral s)} - r <- - ES.searchByType idx mappingName search - >>= ES.parseEsResponse @_ @(ES.SearchResult UserDoc) - either (throwM . IndexLookupError) (traverse (userDocToContact localDomain) . mkResult) r + resp <- ES.tryPerformBHRequest . ES.keepBHResponse $ ESR.searchByIndex idx search + resp' <- either (throwM . IndexLookupError . Right) pure resp + let parsedResult = ES.parseEsResponse . fst $ resp' + r <- either (throwM . IndexLookupError . Left) pure parsedResult + either (throwM . IndexLookupError . Right) (traverse (userDocToContact localDomain) . mkResult) r where + mkResult :: ES.SearchResult UserDoc -> SearchResult UserDoc mkResult es = let results = mapMaybe ES.hitSource . ES.hits . ES.searchHits $ es in SearchResult - { searchFound = ES.hitsTotal . ES.searchHits $ es, + { searchFound = (ES.value . ES.hitsTotal . ES.searchHits) es, searchReturned = length results, searchTook = ES.took es, searchResults = results, @@ -183,7 +185,7 @@ termQ :: Text -> Text -> ES.Query termQ f v = ES.TermQuery ES.Term - { ES.termField = f, + { ES.termField = Key.fromText f, ES.termValue = v } Nothing @@ -248,7 +250,7 @@ matchTeamMembersSearchableByAllTeams = boolQuery { ES.boolQueryMustMatch = [ ES.QueryExistsQuery $ ES.FieldName "team", - ES.TermQuery (ES.Term (Key.toText searchVisibilityInboundFieldName) "searchable-by-all-teams") Nothing + ES.TermQuery (ES.Term searchVisibilityInboundFieldName "searchable-by-all-teams") Nothing ] } diff --git a/services/brig/test/integration/API/Search.hs b/services/brig/test/integration/API/Search.hs index 14832d5e377..7360047f090 100644 --- a/services/brig/test/integration/API/Search.hs +++ b/services/brig/test/integration/API/Search.hs @@ -1,6 +1,8 @@ {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE QuasiQuotes #-} +-- 'putMapping' is incorrectly deprecated in bloodhound +{-# OPTIONS_GHC -Wno-deprecations #-} {-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-} {-# OPTIONS_GHC -Wno-partial-type-signatures #-} {-# OPTIONS_GHC -Wno-redundant-constraints #-} @@ -49,6 +51,7 @@ import Data.String.Conversions import Data.Text qualified as Text import Data.Text.Encoding qualified as Text import Database.Bloodhound qualified as ES +import Database.Bloodhound.Common.Requests qualified as ESR import Federation.Util import Imports import Network.HTTP.ReverseProxy (waiProxyTo) @@ -611,9 +614,10 @@ testSearchOtherDomain opts brig = do testMigrationToNewIndex :: (TestConstraints m, MonadUnliftIO m) => Opt.Opts -> Brig -> m () testMigrationToNewIndex opts brig = do withOldESProxy opts $ \oldESUrl oldESIndex -> do + oldIndexName :: ES.IndexName <- either (\v -> fail ("Invalid index name" ++ Text.unpack v)) pure $ ES.mkIndexName oldESIndex let optsOldIndex = opts - & Opt.elasticsearchLens . Opt.indexLens .~ (ES.IndexName oldESIndex) + & Opt.elasticsearchLens . Opt.indexLens .~ oldIndexName & Opt.elasticsearchLens . Opt.urlLens .~ (ES.Server oldESUrl) -- Phase 1: Using old index only (phase1NonTeamUser, teamOwner, phase1TeamUser1, phase1TeamUser2, tid) <- withSettingsOverrides optsOldIndex $ do @@ -653,8 +657,8 @@ testMigrationToNewIndex opts brig = do -- Run Migrations let newIndexName = opts ^. Opt.elasticsearchLens . Opt.indexLens - taskNodeId <- assertRight =<< runBH opts (ES.reindexAsync $ ES.mkReindexRequest (ES.IndexName oldESIndex) newIndexName) - runBH opts $ waitForTaskToComplete @ES.ReindexResponse taskNodeId + taskNodeId <- assertRight =<< runBH opts (ES.reindexAsync $ ES.mkReindexRequest oldIndexName newIndexName) + void $ runBH opts $ waitForTaskToComplete @ES.ReindexResponse taskNodeId -- Phase 3: Using old index for search, writing to both indices, migrations have run refreshIndex brig @@ -690,14 +694,15 @@ testMigrationToNewIndex opts brig = do withOldESProxy :: (TestConstraints m, MonadUnliftIO m, HasCallStack) => Opt.Opts -> (Text -> Text -> m a) -> m a withOldESProxy opts f = do - indexName <- randomHandle + indexNameText <- randomHandle + indexName <- either (\v -> fail ("Invalid index name " ++ Text.unpack v)) pure $ ES.mkIndexName indexNameText createIndexWithMapping opts indexName oldMapping mgr <- liftIO $ initHttpManagerWithTLSConfig opts.elasticsearch.insecureSkipVerifyTls opts.elasticsearch.caCert (proxyPort, sock) <- liftIO Warp.openFreePort bracket - (async $ liftIO $ Warp.runSettingsSocket Warp.defaultSettings sock $ indexProxyServer indexName opts mgr) + (async $ liftIO $ Warp.runSettingsSocket Warp.defaultSettings sock $ indexProxyServer indexNameText opts mgr) cancel - (\_ -> f ("http://localhost:" <> Text.pack (show proxyPort)) indexName) -- f undefined indexName + (\_ -> f ("http://localhost:" <> Text.pack (show proxyPort)) indexNameText) -- f undefined indexName indexProxyServer :: Text -> Opt.Opts -> Manager -> Wai.Application indexProxyServer idx opts mgr = @@ -716,7 +721,7 @@ waitForTaskToComplete :: forall a m. (ES.MonadBH m, MonadThrow m, FromJSON a) => waitForTaskToComplete taskNodeId = do let policy = constantDelay 100000 <> limitRetries 30 let retryCondition _ = fmap not . isTaskComplete - task <- retrying policy retryCondition (const $ ES.getTask @m @a taskNodeId) + task <- retrying policy retryCondition (const $ ES.tryPerformBHRequest $ ESR.getTask @a taskNodeId) taskCompleted <- isTaskComplete task liftIO $ assertBool "Timed out waiting for task" taskCompleted where @@ -742,36 +747,40 @@ testWithBothIndicesAndOpts opts mgr name f = f newOpts <* deleteIndex opts indexName ] -withOldIndex :: (MonadIO m, HasCallStack) => Opt.Opts -> WaiTest.Session a -> m a +withOldIndex :: (MonadIO m, MonadCatch m, HasCallStack) => Opt.Opts -> WaiTest.Session a -> m a withOldIndex opts f = do - indexName <- randomHandle + indexNameText <- randomHandle + indexName <- either (\v -> error ("Invalid index name " ++ Text.unpack v)) pure $ ES.mkIndexName indexNameText createIndexWithMapping opts indexName oldMapping - let newOpts = opts & Opt.elasticsearchLens . Opt.indexLens .~ (ES.IndexName indexName) - withSettingsOverrides newOpts f <* deleteIndex opts indexName + let newOpts = opts & Opt.elasticsearchLens . Opt.indexLens .~ indexName + withSettingsOverrides newOpts f <* deleteIndex opts indexNameText -optsForOldIndex :: (MonadIO m, HasCallStack) => Opt.Opts -> m (Opt.Opts, Text) +optsForOldIndex :: (MonadIO m, MonadCatch m, HasCallStack) => Opt.Opts -> m (Opt.Opts, Text) optsForOldIndex opts = do - indexName <- randomHandle + indexNameText <- randomHandle + indexName <- either (\v -> error ("Invalid index name " ++ Text.unpack v)) pure $ ES.mkIndexName indexNameText createIndexWithMapping opts indexName oldMapping - pure (opts & Opt.elasticsearchLens . Opt.indexLens .~ (ES.IndexName indexName), indexName) - -createIndexWithMapping :: (MonadIO m, HasCallStack) => Opt.Opts -> Text -> Value -> m () -createIndexWithMapping opts name val = do - let indexName = ES.IndexName name - createReply <- runBH opts $ ES.createIndexWith [ES.AnalysisSetting analysisSettings] 1 indexName - unless (ES.isCreated createReply || ES.isSuccess createReply) $ do - liftIO $ assertFailure $ "failed to create index: " <> show name <> " with error: " <> show createReply - mappingReply <- runBH opts $ ES.putMapping indexName (ES.MappingName "user") val - unless (ES.isCreated mappingReply || ES.isSuccess createReply) $ do - liftIO $ assertFailure $ "failed to create mapping: " <> show name + pure (opts & Opt.elasticsearchLens . Opt.indexLens .~ indexName, indexNameText) + +createIndexWithMapping :: (MonadIO m, MonadCatch m, HasCallStack) => Opt.Opts -> ES.IndexName -> Value -> m () +createIndexWithMapping opts indexName val = do + createReply <- runBH opts . ES.performBHRequest . ES.keepBHResponse $ ESR.createIndexWith [ES.AnalysisSetting analysisSettings] 1 indexName + createReply' <- fmap fst $ either (error . show) pure $ createReply + unless (ES.isCreated createReply' || ES.isSuccess createReply') $ do + liftIO $ assertFailure $ "failed to create index: " <> show indexName <> " with error: " <> show createReply + + res <- runBH opts . ES.performBHRequest $ ES.keepBHResponse $ ESR.putMapping @Value indexName val + mappingReply <- fst <$> either (error . show) pure res + unless (ES.isCreated mappingReply || ES.isSuccess createReply') $ do + liftIO $ assertFailure $ "failed to create mapping: " <> show indexName -- | This doesn't fail if ES returns error because we don't really want to fail the tests for this -deleteIndex :: (MonadIO m, HasCallStack) => Opt.Opts -> Text -> m () +deleteIndex :: (MonadIO m, MonadCatch m, HasCallStack) => Opt.Opts -> Text -> m () deleteIndex opts name = do - let indexName = ES.IndexName name + let indexName = either (\v -> error ("Invalid index name" ++ show v)) id $ ES.mkIndexName name void $ runBH opts $ ES.deleteIndex indexName -runBH :: (MonadIO m, HasCallStack) => Opt.Opts -> ES.BH m a -> m a +runBH :: (MonadIO m, HasCallStack) => Opt.Opts -> ES.BH m a -> m (Either ES.EsError a) runBH opts action = do let (ES.Server esURL) = opts ^. Opt.elasticsearchLens . Opt.urlLens mgr <- liftIO $ initHttpManagerWithTLSConfig opts.elasticsearch.insecureSkipVerifyTls opts.elasticsearch.caCert diff --git a/services/brig/test/integration/Index/Create.hs b/services/brig/test/integration/Index/Create.hs index fab0ae91860..4dafdf6b952 100644 --- a/services/brig/test/integration/Index/Create.hs +++ b/services/brig/test/integration/Index/Create.hs @@ -53,7 +53,8 @@ testCreateIndexWhenNotPresent brigOpts = do case parseURI strictURIParserOptions (Text.encodeUtf8 esURL) of Left e -> fail $ "Invalid ES URL: " <> show esURL <> "\nerror: " <> show e Right esURI -> do - indexName <- ES.IndexName . Text.pack <$> replicateM 20 (Random.randomRIO ('a', 'z')) + indexNameE <- (ES.mkIndexName . Text.pack <$> replicateM 20 (Random.randomRIO ('a', 'z'))) + indexName <- either (\v -> fail ("Invalid (auto-generated) index name: " ++ show v)) (pure) indexNameE let replicas = 2 shards = 2 refreshInterval = 5 @@ -75,18 +76,17 @@ testCreateIndexWhenNotPresent brigOpts = do IndexEval.runCommand devNullLogger (IndexOpts.Create esSettings (galley brigOpts)) mgr <- liftIO $ initHttpManagerWithTLSConfig connSettings.esInsecureSkipVerifyTls connSettings.esCaCert let bEnv = (mkBHEnv esURL mgr) {ES.bhRequestHook = ES.basicAuthHook (ES.EsUsername "elastic") (ES.EsPassword "changeme")} - ES.runBH bEnv $ do + eitherIndexSettings <- ES.runBH bEnv $ do indexExists <- ES.indexExists indexName lift $ assertBool "Index should exist" indexExists - eitherIndexSettings <- ES.getIndexSettings indexName - lift $ do - case eitherIndexSettings of - Left err -> fail $ "Failed to fetch index settings with error: " <> show err - Right indexSettings -> do - assertEqual "Shard count should be set" (ES.ShardCount replicas) (ES.indexShards . ES.sSummaryFixedSettings $ indexSettings) - assertEqual "Replica count should be set" (ES.ReplicaCount replicas) (ES.indexReplicas . ES.sSummaryFixedSettings $ indexSettings) - assertEqual "Refresh interval should be set" [ES.RefreshInterval refreshInterval] (ES.sSummaryUpdateable indexSettings) + ES.getIndexSettings indexName + case eitherIndexSettings of + Left err -> fail $ "Failed to fetch index settings with error: " <> show err + Right indexSettings -> do + assertEqual "Shard count should be set" (ES.ShardCount replicas) (ES.indexShards . ES.sSummaryFixedSettings $ indexSettings) + assertEqual "Replica count should be set" (ES.ReplicaCount replicas) (ES.indexReplicas . ES.sSummaryFixedSettings $ indexSettings) + assertEqual "Refresh interval should be set" [ES.RefreshInterval refreshInterval] (ES.sSummaryUpdateable indexSettings) testCreateIndexWhenPresent :: BrigOpts.Opts -> Assertion testCreateIndexWhenPresent brigOpts = do @@ -94,7 +94,8 @@ testCreateIndexWhenPresent brigOpts = do case parseURI strictURIParserOptions (Text.encodeUtf8 esURL) of Left e -> fail $ "Invalid ES URL: " <> show esURL <> "\nerror: " <> show e Right esURI -> do - indexName <- ES.IndexName . Text.pack <$> replicateM 20 (Random.randomRIO ('a', 'z')) + indexNameE <- (ES.mkIndexName . Text.pack <$> replicateM 20 (Random.randomRIO ('a', 'z'))) + indexName <- either (\v -> fail ("Invalid (auto-generated) index name: " ++ show v)) (pure) indexNameE let replicas = 2 shards = 2 refreshInterval = 5 @@ -114,22 +115,21 @@ testCreateIndexWhenPresent brigOpts = do & IndexOpts.esIndexRefreshInterval .~ refreshInterval mgr <- liftIO $ initHttpManagerWithTLSConfig connSettings.esInsecureSkipVerifyTls connSettings.esCaCert let bEnv = (mkBHEnv esURL mgr) {ES.bhRequestHook = ES.basicAuthHook (ES.EsUsername "elastic") (ES.EsPassword "changeme")} - ES.runBH bEnv $ do - _ <- ES.createIndex (ES.IndexSettings (ES.ShardCount 1) (ES.ReplicaCount 1)) indexName + void $ ES.runBH bEnv $ do + void $ ES.createIndex (ES.IndexSettings (ES.ShardCount 1) (ES.ReplicaCount 1) ES.defaultIndexMappingsLimits) indexName indexExists <- ES.indexExists indexName lift $ assertBool "Index should exist" indexExists devNullLogger <- Log.create (Log.Path "/dev/null") IndexEval.runCommand devNullLogger (IndexOpts.Create esSettings (galley brigOpts)) - ES.runBH bEnv $ do + eitherIndexSettings <- ES.runBH bEnv $ do indexExists <- ES.indexExists indexName lift $ assertBool "Index should still exist" indexExists - eitherIndexSettings <- ES.getIndexSettings indexName - lift $ do - case eitherIndexSettings of - Left err -> fail $ "Failed to fetch index settings with error: " <> show err - Right indexSettings -> do - assertEqual "Shard count should not be updated" (ES.ShardCount 1) (ES.indexShards . ES.sSummaryFixedSettings $ indexSettings) - assertEqual "Replica count should not be updated" (ES.ReplicaCount 1) (ES.indexReplicas . ES.sSummaryFixedSettings $ indexSettings) - assertEqual "Refresh interval should not be updated" [] (ES.sSummaryUpdateable indexSettings) + ES.getIndexSettings indexName + case eitherIndexSettings of + Left err -> fail $ "Failed to fetch index settings with error: " <> show err + Right indexSettings -> do + assertEqual "Shard count should not be updated" (ES.ShardCount 1) (ES.indexShards . ES.sSummaryFixedSettings $ indexSettings) + assertEqual "Replica count should not be updated" (ES.ReplicaCount 1) (ES.indexReplicas . ES.sSummaryFixedSettings $ indexSettings) + assertEqual "Refresh interval should not be updated" [] (ES.sSummaryUpdateable indexSettings) diff --git a/tools/db/find-undead/src/Main.hs b/tools/db/find-undead/src/Main.hs index 5bc9506308e..46cbb062b40 100644 --- a/tools/db/find-undead/src/Main.hs +++ b/tools/db/find-undead/src/Main.hs @@ -40,7 +40,7 @@ main = do cas <- initCas (setCasBrig s) lgr mgr <- HTTP.newManager HTTP.defaultManagerSettings let es = initES (setESBrig s) mgr - runCommand lgr cas es (esIndex $ setESBrig s) (esMapping $ setESBrig s) + runCommand lgr cas es (esIndex $ setESBrig s) where desc = header "find-undead" diff --git a/tools/db/find-undead/src/Work.hs b/tools/db/find-undead/src/Work.hs index 87fceb70e64..4bbc411bd04 100644 --- a/tools/db/find-undead/src/Work.hs +++ b/tools/db/find-undead/src/Work.hs @@ -37,13 +37,19 @@ import System.Logger (Logger) import System.Logger qualified as Log import Wire.API.User (AccountStatus (..)) -runCommand :: Logger -> ClientState -> ES.BHEnv -> String -> String -> IO () -runCommand l cas es indexStr mappingStr = do - let index = ES.IndexName $ Text.pack indexStr - mapping = ES.MappingName $ Text.pack mappingStr +runCommand :: Logger -> ClientState -> ES.BHEnv -> String -> IO () +runCommand l cas es indexStr = do + index <- + either (\err -> fail ("Invalid index name: " ++ indexStr ++ ". Error: " ++ show err)) pure $ + ES.mkIndexName $ + Text.pack indexStr + let transform :: IO (Either ES.EsError a) -> IO a + transform res = + res + >>= \r -> either (\err -> (fail ("ElasticSearch error: " ++ show err))) pure r runConduit $ - transPipe (ES.runBH es) $ - getScrolled index mapping + transPipe (transform <$> ES.runBH es) $ + getScrolled index .| C.iterM (logProgress l) .| C.mapM ( \uuids -> do @@ -74,10 +80,10 @@ logUUID l f (uuid, _, time) = . Log.field "uuid" (show uuid) . Log.field "write time" (show $ writetimeToUTC <$> time) -getScrolled :: (ES.MonadBH m, MonadThrow m) => ES.IndexName -> ES.MappingName -> ConduitM () [UUID] m () -getScrolled index mapping = processRes =<< lift (ES.getInitialScroll index mapping esSearch) +getScrolled :: (ES.MonadBH m) => ES.IndexName -> ConduitM () [UUID] m () +getScrolled index = processRes =<< lift (ES.getInitialScroll index esSearch) where - processRes :: (ES.MonadBH m, MonadThrow m) => Either ES.EsError (ES.SearchResult User) -> ConduitM () [UUID] m () + processRes :: (ES.MonadBH m) => Either ES.EsError (ES.SearchResult User) -> ConduitM () [UUID] m () processRes = \case Left e -> throwM $ EsError e Right res -> @@ -86,7 +92,7 @@ getScrolled index mapping = processRes =<< lift (ES.getInitialScroll index mappi ids -> do yield ids processRes - =<< (\scrollId -> lift (ES.advanceScroll scrollId 120)) + =<< (\scrollId -> lift (ES.tryEsError (ES.advanceScroll scrollId 120))) =<< extractScrollId res esFilter :: ES.Filter