Show HN: Pyper – Concurrent Python Made Simple

(github.com)

156 points by pyper-dev 9 months ago

37 comments

Hello and happy new year!

We're excited to introduce the Pyper package for concurrency & parallelism in Python. Pyper is a flexible framework for concurrent / parallel data processing, following the functional paradigm.

Source code can be found on [github](https://github.com/pyper-dev/pyper)

Key features:

Intuitive API: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded, multiprocessed, and asynchronous work.

Functional Paradigm: Python functions are the building blocks of data pipelines. Let's you write clean, reusable code naturally.

Safety: Hides the heavy lifting of underlying task execution and resource clean-up. No more worrying about race conditions, memory leaks, or thread-level error handling.

Efficiency: Designed from the ground up for lazy execution, using queues, workers, and generators.

Pure Python: Lightweight, with zero sub-dependencies.

We'd love to hear any feedback on this project!

solidasparagus 9 months ago

Nice work! There is a gap when it comes to writing single-machine, concurrent CPU-bound python code. Ray is too big, pykka is threads only, builtins are poorly abstracted. The syntax is also very nice!

But I'm not sure I can use this even though I have a specific use-case that feels like it would work well (high-performance pure Python downloading from cloud object storage). The examples are a bit too simple and I don't understand how I can do more complicated things.

I chunk up my work, run it in parallel and then I need to do a fan-in step to reduce my chunks - how do you do that in Pyper?

Can the processes have state? Pure functions are nice, but if I'm reaching for multiprocess, I need performance and if I need performance, I'll often want a cache of some sort (I don't want to pickle and re-instantiate a cloud client every time I download some bytes for instance).

How do exceptions work? Observability? Logs/prints?

Then there's stuff that is probably asking too much from this project, but I get it if I write my own python pipeline so it matters to me - rate limiting WIP, cancellation, progress bars.

But if some of these problems are/were solved and it offers an easy way to use multiprocessing in python, I would probably use it!

  • pyper-dev 9 months ago

    Great feedback, thank you. We'll certainly be working on adding more examples to illustrate more complex use cases.

    One thing I'd mention is that we don't really imagine Pyper as a whole observability and orchestration platform. It's really a package for writing Python functions and executing them concurrently, in a flexible pattern that can be integrated with other tools.

    For example, I'm personally a fan of Prefect as an observability platform-- you could define pipelines in Pyper then wrap it in a Prefect flow for orchestration logic.

    Exception handling and logging can also be handled by orchestration tools (or in the business logic if appropriate, literally using try... except...)

    For a simple progress bar, tqdm is probably the first thing to try. As it wraps anything iterable, applying it to a pipeline might look like:

      import time
      from pyper import task
      from tqdm import tqdm
    
    
      @task(branch=True)
      def func(limit: int):
          for i in range(limit):
              time.sleep(0.1)
              yield i
    
    
      def main():
          for _ in tqdm(func(limit=20), total=20):
              pass
    
    
      if __name__ == "__main__":
          main()
  • halfcat 9 months ago

    > I don't want to pickle and re-instantiate a cloud client every time I download some bytes for instance

    Have you tried multiprocessing.shared_memory to address this?

    • solidasparagus 9 months ago

      I haven't played with that much! This isn't really a problem in general for my approach to writing this sort of code - when I use multiprocessing, I use a Process class or a worker task function with a setup step followed by a while loop that pulls from a work/control queue. But in the Pyper functional programming world, it would be a concern.

      IIRC multiprocessing.shared_memory is a much more low-level of abstraction than most python stuff, so I think I'd need to figure out how to make the client use the shared memory and I'm not sure if I could.

  • globular-toast 9 months ago

    Do you really need to reinvent the wheel every time for parallel workloads? Just learn GNU parallel and write single-threaded code.

    Concurrency in general isn't about parallelism. It's just about doing multiple things at the same time.

    • cess11 9 months ago

      GNU Parallel is really neat, software that's so good it's boring. Closing in on being a quarter century old by now, no? I remember first reading about it in 2003 maybe?

      I've also used 'fork in Picolisp a lot for this kind of thing, and also Elixir, which arguably has much nicer pipes.

      But hey, it's good that Python after like thirty years or so is trying to get decent concurrency. Eventually people that use it as a first language might learn about such things too.

      • solidasparagus 9 months ago

        Why did your comment need to be so condescending?

        • cess11 9 months ago

          I'm sorry. It's the trauma of preaching the virtues of crude but efficient concurrency and similar multiprocessing for many years, in many workplaces, and only rarely meeting anything but distrust, disinterest or uninformed rebuttals like "OS processes are very heavy, can't do concurrency that way".

          However, it's a real problem that 'beginner languages' like Python and Javascript doesn't readily do multithread computation, something which has been the default on personal computers for quite a while now and available for at least twenty years.

rtpg 9 months ago

You really should dive more into the `multiprocess` support option and highlight how this gets around issues with the GIL. This feels like a major value add, and "does this help with CPU-bound work" being "yes" is a big deal!

I don't really need pipelining that much, but pipelining along with a certain level of durability and easy multiprocessing support? Now we're talking

  • t43562 9 months ago

    ...although python 3.13 can be built without the GIL and it really does make threading useful. I did some comparisons with and without.

    I suppose one excellent thing about this would be if you could just change 1 parameter and switch from multiprocessing to threaded.

    • rtpg 9 months ago

      I think you could build off of threading. I do think here it's good to acknowledge that Python async is fundamentally single-threaded (or rather, "single thread per event loop"), so if you do go a multi-thread version you might have to do some bookkeeping to make it all work well.

      I'm not sure how well async Python libs are tested against working in a world with multiple event loops, but I bet there are a _lot_ of latent bugs in that space.

giancarlostoro 9 months ago

Lowkey I hate the "\" line continuation in Python to force PEP-8 compliance in a way... Is there any Pythonistas who would write the examples in there differently to achieve a similar level of readability?

> pipeline = task(get_data, branch=True) \

> | task(step1, workers=20) \

> | task(step2, workers=20) \

> | task(step3, workers=20, multiprocess=True)

  • dec0dedab0de 9 months ago

    I really don’t like overloading pipes like this. I would rather chain methods like how the django orm does it.

    you could reassign every line, but it would look nicer with chained functions.

      pipeline = task(get_data, branch=True)
      pipeline = pipeline | task(step1, workers=20)
      pipeline = pipeline |  task(step2, workers=20)
      pipeline = pipeline |  task(step3, workers=20, multiprocess=True)
    
    edit:

    I would be tempted to do something like this:

      steps = [task(step1, workers=20),
               task(step2, workers=20),
               task(step3, workers=20, multiprocess=True)]
      pipeline = task(get_data, branch=True)
    
      for step in steps:
          pipeline =   pipeline.__or__(step)
    • Rickster35 9 months ago

      According to the docs, | is syntactic sugar for the .pipe method.

        pipeline = task(get_data, branch=True).pipe(
            task(step1, workers=20)).pipe(
            task(step2, workers=20)).pipe(
            task(step3, workers=20, multiprocess=True))
      
      That's probably the chained method approach for those with this preference.
    • me-vs-cat 9 months ago

      This style looks pretty good to me:

          pipeline = task(...)
          pipeline |= task(...)
      
      So does this style:

          steps = [task(...), task(...)]
          pipeline = functools.reduce(operator.or_, steps)
      
      But it appears you can just change "task" to "Task" and then:

          pipeline = pyper.Pipeline([Task(...), Task(...)])
  • morkalork 9 months ago

    Wrap the statement in (...) and you can drop the backslashes. See also how people split up complicated loc queries in pandas on multiple lines too.

    • giancarlostoro 9 months ago

      Thank you! It seems this works for not just parens but square brackets and curly braces too! Only special requirement is indentation is consistent (duh though).

      I've not been doing Python day-to-day so I'm starting to lose my touch on all the nice little tricks.

      • Rickster35 9 months ago

        Yeah, this seems neatest if you don't like line breaks

        pipeline = (

            task(get_data, branch=True)
        
            | task(step1, workers=20)
        
            | task(step2, workers=20)
        
            | task(step3, workers=20, multiprocess=True)
        
        )

        Square brackets would create a list and braces would create a set of course. The contents still can be split over different lines-- just pointing that this syntax doesn't do the same thing.

minig33 9 months ago

This is cool - I’ve been looking for something like this. I really liked the syntax of Prefect v1 but it was overcomplicated with execution configuration in subsequent versions. I just want something to help me just run async pipelines and prevent AsyncIO weirdness - going to test this out.

JackC 9 months ago

Cool, I was just looking for something like this!

It's surprisingly annoying in built-in python to do something like this. The most recent thing I was trying to do was:

- load URLs from a file - hand them out to one subprocess per cpu - download them concurrently in threads or async within each subprocess - pull the results back into a single process for formatting and storing

Getting this to work and handle queues, ctrl-c, exceptions etc. is just a whole mess involving python builtins created at different times with different interfaces; I hacked until I kind of got it working, but didn't love it. Bundling it all in a single tested package would be great.

  • zenapollo 9 months ago

    I stumbled on grequests for this use case and it just works.

grandma_tea 9 months ago

Nice! I'm looking forward to trying it out. This seems very similar to https://github.com/cgarciae/pypeln/

  • pyper-dev 9 months ago

    We came across this at one point and thought it was a very innovative and interesting package!

    The important design point we're differing on is that Pyper implements 'pipelines' as functions, whereas pypeln seems to implement 'pipelines' as iterable objects.

gpderetta 9 months ago

From just a short skimming of the docs:

- my biggest issue with concurrency in python (especially with asyncio) is leaking tasks. Pyper should provide structured concurrency support a-la trio.

- I don't see the opposite of branch to collect the output of multiple sub pipelines into a single stage. I need this pretty much always and it is a chore to implement.

- Async need not force the full pipeline to be async. There should be an option to run async funcitons in background event loops. Especially as you already support threaded executions.

  • pyper-dev 9 months ago

    Your third point intrigues me a lot. I imagine for the majority of cases, it's generally more useful to work with async functions in the structure of async syntax, but I suppose it's possible to run async functions in a synchronous pipeline.

    Even though there's currently no built-in support for this, a workaround could be to just define synchronous helper functions to handle running your async logic in an event loop.

  • d0mine 9 months ago

    asyncio.TaskGroup from stdlib provides structured concurrency

    • gpderetta 9 months ago

      Since 3.11 it seems; I'm currently stuck on 3.8, but hopefully we should be able to upgrade soon. Thanks.

urduntupu 9 months ago

Very good README.md, teasing and explaining very well the the provided value. Well done!

[removed] 9 months ago
[deleted]
ge96 9 months ago

I still gotta wrap my head around concurrency, I'm not sure if using threads count for concurrency

  • gpderetta 9 months ago

    of course it does, why wouldn't they?

    • ge96 9 months ago

      Something about GIL and how code is written (one statement comes before the other), true parallelism, anyway I gotta do my hw

      I have not done anything significant like HFT to really dig deep into this

      Also coming from JS async/await (nodeJS has one thread)