# Copyright (C) 2012 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause
"""Collection of pytest fixtures used in conda.gateways tests."""

import os
import socket
from pathlib import Path
from shutil import which

import pytest
from xprocess import ProcessStarter

from ...common.serialize import json

MINIO_EXE = which("minio")


# rely on tests not requesting this fixture, and pytest not creating this if
# MINIO_EXE was not found
@pytest.fixture()
def minio_s3_server(xprocess, tmp_path):
    """
    Mock a local S3 server using `minio`

    This requires:
    - pytest-xprocess: runs the background process
    - minio: the executable must be in PATH

    Note, the given S3 server will be EMPTY! The test function needs
    to populate it. You can use
    `conda.testing.helpers.populate_s3_server` for that.
    """

    class Minio:
        # The 'name' below will be the name of the S3 bucket containing
        # keys like `noarch/repodata.json`
        # see https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
        name = "minio-s3-server"
        port = 9000

        def __init__(self):
            (Path(tmp_path) / self.name).mkdir()

        @property
        def server_url(self):
            return f"{self.endpoint}/{self.name}"

        @property
        def endpoint(self):
            return f"http://localhost:{self.port}"

        def populate_bucket(self, endpoint, bucket_name, channel_dir):
            """Prepare the s3 connection for our minio instance"""
            from boto3.session import Session
            from botocore.client import Config

            # Make the minio bucket public first
            # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-bucket-policies.html#set-a-bucket-policy
            session = Session()
            client = session.client(
                "s3",
                endpoint_url=endpoint,
                aws_access_key_id="minioadmin",
                aws_secret_access_key="minioadmin",
                config=Config(signature_version="s3v4"),
                region_name="us-east-1",
            )
            bucket_policy = json.dumps(
                {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Sid": "AddPerm",
                            "Effect": "Allow",
                            "Principal": "*",
                            "Action": ["s3:GetObject"],
                            "Resource": f"arn:aws:s3:::{bucket_name}/*",
                        }
                    ],
                }
            )
            client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)

            # Minio has to start with an empty directory; once available,
            # we can import all channel files by "uploading" them
            for current, _, files in os.walk(channel_dir):
                for f in files:
                    path = Path(current, f)
                    key = path.relative_to(channel_dir)
                    client.upload_file(
                        str(path),
                        bucket_name,
                        str(key).replace("\\", "/"),  # MinIO expects Unix paths
                        ExtraArgs={"ACL": "public-read"},
                    )

    print("Starting mock_s3_server")
    minio = Minio()

    class Starter(ProcessStarter):
        pattern = "MinIO Object Storage Server"
        terminate_on_interrupt = True
        timeout = 10
        args = [
            MINIO_EXE,
            "server",
            f"--address=:{minio.port}",
            tmp_path,
        ]

        def startup_check(self, port=minio.port):
            s = socket.socket()
            address = "localhost"
            error = False
            try:
                s.connect((address, port))
            except Exception as e:
                print(
                    "something's wrong with %s:%d. Exception is %s" % (address, port, e)
                )
                error = True
            finally:
                s.close()

            return not error

    # ensure process is running and return its logfile
    pid, logfile = xprocess.ensure(minio.name, Starter)
    print(f"Server (PID: {pid}) log file can be found here: {logfile}")
    yield minio
    xprocess.getinfo(minio.name).terminate()
