Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

withCreateConsumer and withCreateProducter methods have been removed #1085

Open
noelwelsh opened this issue Oct 21, 2022 · 13 comments
Open

withCreateConsumer and withCreateProducter methods have been removed #1085

noelwelsh opened this issue Oct 21, 2022 · 13 comments
Labels
docs New or improved documentation

Comments

@noelwelsh
Copy link

How does one use Kafka's MockConsumer and MockProducer with fs2-kafka?

The documentation (e.g. for ConsumerSettings) mentions withCreateConsumer and withCreateProducer but these methods have been removed. This commit removed the wtihCreateConsumer method from ConsumerSettings, and this commit removed the withCreateProducer method from ProducerSettings.

What is the replacement method? Thanks!

@MrKustra94
Copy link

Have you tried with com.evolutiongaming.skafka.producer.Producer#fromProducerJ2 and com.evolutiongaming.skafka.consumer.Consumer#fromConsumerJ1?

@noelwelsh
Copy link
Author

I'm happy to work on this issue if someone can tell me what the design should be.

@bplommer
Copy link
Member

Sorry for the slow response to this - good question, I don't think we really thought about this when making the changes you've linked to. I don't have all the context on this currently in my head but it looks like you'd need to override the default instances of MkConsumer/MkProducer with custom instances that create instances of the relevant mock classes. It would be good to have those in a testkit module and we definitely need to update the docs. I'd be happy to take a PR on this - is that enough to go on?

@bplommer bplommer added the docs New or improved documentation label Nov 18, 2022
@noelwelsh
Copy link
Author

I'll work on something and then we can discuss in a PR.

@noelwelsh
Copy link
Author

Umm ... the build doesn't work out of the box due to this:

ThisBuild / latestVersion := tlLatestVersion.value.getOrElse(
  throw new IllegalStateException("No tagged version found")
)

Any suggestions?

@bplommer
Copy link
Member

That's surprising, I haven't seen that problem - I'll try checking out a fresh copy of the repo later and see if I can reproduce this

@bplommer
Copy link
Member

In the meantime you can probably just type a value in to be able to start hacking?

@noelwelsh
Copy link
Author

👍

I've made a bit of progress. Perhaps you can take a look and tell me if this is keeping with what you're thinking of:

https://github.com/noelwelsh/fs2-kafka/tree/feature/public-mk-consumer

I'm having a lot of trouble testing this code. The basic problem seems to be that the MockConsumer is not assigned to any of the partitions. I've tried a lot of variations on the code below, which is following the pattern of existing tests. Any insights here would be appreciated!

    it("should use the MkConsumer instance in scope") {
      withTopic { topic =>
        createCustomTopic(topic, partitions = 3)
        val mockConsumer = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.NONE)

        implicit val mockMkConsumer = MkConsumer.fromKafkaByteConsumer[IO](mockConsumer)

        val consumed =
          KafkaConsumer[IO]
            .stream(consumerSettings[IO])
            .subscribeTo(topic)
            .evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
            .evalTap(
              _ =>
                IO.delay {
                  // mockConsumer.assign(java.util.Collections.singleton(new TopicPartition(topic, 0)))
                  // val partitionInfo = mockConsumer.partitionsFor(topic).get(0)
                  val record = new clients.consumer.ConsumerRecord(
                    topic,
                    0,
                    0L,
                    "Hello".getBytes(),
                    "Kafka".getBytes()
                  )
                  mockConsumer.addRecord(record)
                }
            )
            .records
            .map(committable => committable.record.key -> committable.record.value)
            .interruptAfter(10.seconds)
            .compile
            .toVector
            .unsafeRunSync()

        consumed.size shouldEqual 1
        consumed(0) shouldEqual "Hello" -> "Kafka"
      }
    }

@noelwelsh
Copy link
Author

noelwelsh commented Dec 2, 2022

I've tried playing around with creating custom MkConsumer and MkProducer instances in the code I'm trying to update. I'm afraid to say I think this new design is really the wrong thing.

The settings don't encapsulate all the settings any more. Previously the ConsumerSettings and ProducerSettings would encapsulate all the details needed to create a consumer or producer. Now the settings are split across the settings and the MkConsumer and MkProducer instances. So now I have to track down all the points of use of the settings and make sure the right instances are also used.

MkConsumer and MkProducer are not unique for a given type, as clearly shown by this issue, and therefore are not good candidates for type classes.

@bplommer
Copy link
Member

bplommer commented Dec 2, 2022

@noelwelsh that's a fair point about the ergonomics (though the traits are intended as capability traits rather than type classes so I don't see non-uniqueness as an argument against them in itself). The motivation for introducing the MkX traits was to decouple the effect type of consumer/producer creation from the effect type of the consumer/producer itself - see #588. One other way I can see to do this is reintroduce withCreateX but with type () => Consumer etc instead of F[Consumer] - what do you think?

@bplommer
Copy link
Member

bplommer commented Dec 2, 2022

As another option, would it help if there were an option to explicitly pass a MkX instance when creating an consumer/producer?

@bplommer
Copy link
Member

bplommer commented Dec 3, 2022

Umm ... the build doesn't work out of the box due to this:

ThisBuild / latestVersion := tlLatestVersion.value.getOrElse(
  throw new IllegalStateException("No tagged version found")
)

Any suggestions?

I'm unable to reproduce this when cloning the repo in the normal way:

git clone [email protected]:fd4s/fs2-kafka.git
cd fs2-kafka
sbt compile

Is there anything unusual about your development environment that might cause the tags to be missing?

@noelwelsh
Copy link
Author

Sorry about the delay. I don't get time to work on this most of the week.

Summary: I think the best solution is to add a method to pass a MkX instance when creating the settings.

Here's my reasoning:

  • The root problem is the consumer (or producer; but I'm just going talk about consumers to keep it simpler) may be created with a different effect type than the settings.

  • Scala 2 functions cannot have type parameters, so we cannot create a builder method on ConsumerSettings that accepts a function to build the consumer.

  • MkConsumer is essentially a function parameterised by an effect type, solving that issue.

  • The issue with the current design is that the settings are split across the ConsumerSettings and the MkConsumer, which must both be provided at the point where we create a consumer.

  • We want to encapsulate all the settings in a single object, and hence should move to providing a MkConsumer instance at the point where we construct the ConsumerSettings.

So the best way forward seems:

  • ConsumerSettings includes a MkConsumer field
  • this field defaults to the default MkConsumer (implicit) value
  • there is a builder method to change the value to a custom value
  • there is a builder method to use the MockConsumer

I don't think there is much value in keeping MkConsumer as an implicit value in this design.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs New or improved documentation
Development

No branches or pull requests

3 participants