scala on stackoverflow

  • strict warning: Non-static method view::load() should not be called statically in /home/eob/scalaclass.com/sites/all/modules/views/views.module on line 842.
  • strict warning: Declaration of views_handler_argument::init() should be compatible with views_handler::init(&$view, $options) in /home/eob/scalaclass.com/sites/all/modules/views/handlers/views_handler_argument.inc on line 745.
  • strict warning: Declaration of views_handler_filter::options_validate() should be compatible with views_handler::options_validate($form, &$form_state) in /home/eob/scalaclass.com/sites/all/modules/views/handlers/views_handler_filter.inc on line 589.
  • strict warning: Declaration of views_handler_filter::options_submit() should be compatible with views_handler::options_submit($form, &$form_state) in /home/eob/scalaclass.com/sites/all/modules/views/handlers/views_handler_filter.inc on line 589.
  • strict warning: Declaration of views_handler_filter_boolean_operator::value_validate() should be compatible with views_handler_filter::value_validate($form, &$form_state) in /home/eob/scalaclass.com/sites/all/modules/views/handlers/views_handler_filter_boolean_operator.inc on line 149.
  • strict warning: Declaration of views_plugin_row::options_validate() should be compatible with views_plugin::options_validate(&$form, &$form_state) in /home/eob/scalaclass.com/sites/all/modules/views/plugins/views_plugin_row.inc on line 135.
  • strict warning: Declaration of views_plugin_row::options_submit() should be compatible with views_plugin::options_submit(&$form, &$form_state) in /home/eob/scalaclass.com/sites/all/modules/views/plugins/views_plugin_row.inc on line 135.
Syndicate content
most recent 30 from stackoverflow.com 2016-07-24T02:56:33Z
Updated: 2 years 3 weeks ago

Non-Terminating ZeroMQ Load Balancer

23 July 2016 - 6:25pm

Given the following slightly modified Load Balancer ZeroMQ code:

package net.broker import org.zeromq.ZMQ object LruQueue2 { class ClientTask extends Runnable { override def run(): Unit = { val context = ZMQ.context(1) val client = context.socket(ZMQ.REQ) client.connect("tcp://localhost:5555") // send request, get reply client.send("HELLO".getBytes, 0) val reply = client.recv(0) println(s"Client id: received: ${new String(reply)}") } } class WorkerTask() extends Runnable { override def run(): Unit = { val context = ZMQ.context(1) val worker = context.socket(ZMQ.REQ) worker.connect("tcp://localhost:5556") worker.send("READY".getBytes, 0) while(true) { val clientAddr = worker.recv(0) val empty = worker.recv(0) val clientMsg = worker.recv(0) worker.send(clientAddr, ZMQ.SNDMORE) worker.send("".getBytes, ZMQ.SNDMORE) worker.send("WORLD".getBytes, 0) println(s"Worker sent: 3-frames to client: ${new String(clientAddr)}") } } } def main(args: Array[String]): Unit = { val NOFLAGS = 0 // worker using REQ socket to do LRU routing val NBR_CLIENTS = 10 val NBR_WORKERS = 3 val context = ZMQ.context(1) val frontend = context.socket(ZMQ.ROUTER) val backend = context.socket(ZMQ.ROUTER) frontend.bind("tcp://*:5555") backend.bind("tcp://*:5556") val clients = List.fill(NBR_CLIENTS)(new Thread(new ClientTask)) val workers = List.fill(NBR_WORKERS)(new Thread(new WorkerTask)) clients.foreach(_.start) workers.foreach(_.start) val workerQueue = scala.collection.mutable.Queue[Array[Byte]]() val poller = context.poller(2) poller.register(backend, ZMQ.Poller.POLLIN) poller.register(frontend, ZMQ.Poller.POLLIN) var clientNbr = NBR_CLIENTS while(true) { poller.poll() println("clientNbr:" + clientNbr) println("workerQueue.length: " + workerQueue.length) if(clientNbr == 0) { sys.exit(0) } else if(poller.pollin(0) && clientNbr > 0) { val workerAddr = backend.recv(NOFLAGS) val empty = backend.recv(NOFLAGS) val clientAddrOrReadyMsg = backend.recv(NOFLAGS) workerQueue.enqueue(workerAddr) if(new String(clientAddrOrReadyMsg) == "READY") { // nothing to do - worker is letting us know that he's ready to work } else { // retrieve remaining 2 frames of client message // [Empty][Client Message of "HELLO"] val empty = backend.recv(0) val workerResponse = backend.recv(0) frontend.send(clientAddrOrReadyMsg, ZMQ.SNDMORE) frontend.send("".getBytes, ZMQ.SNDMORE) frontend.send(workerResponse, NOFLAGS) clientNbr -= 1 } } else if (poller.pollin(1) && workerQueue.nonEmpty) { val clientAddr = frontend.recv(0) val empty = frontend.recv(0) val clientMsg = frontend.recv(0) backend.send(workerQueue.dequeue(), ZMQ.SNDMORE) backend.send("".getBytes, ZMQ.SNDMORE) backend.send(clientAddr, ZMQ.SNDMORE) backend.send("".getBytes, ZMQ.SNDMORE) backend.send(clientMsg, NOFLAGS) } else {} } } }

When I run the above code, I see the following output:

[info] Running net.broker.LruQueue2 [info] clientNbr:10 [info] workerQueue.length: 0 [info] clientNbr:10 [info] workerQueue.length: 1 [info] clientNbr:10 [info] workerQueue.length: 2 [info] clientNbr:10 [info] workerQueue.length: 3 [info] clientNbr:10 [info] workerQueue.length: 2 [info] clientNbr:10 [info] workerQueue.length: 1 [info] clientNbr:9 [info] workerQueue.length: 2 [info] clientNbr:8 [info] workerQueue.length: 3 [info] Client id: received: WORLD [info] Worker sent: 3-frames to client: πb` [info] Client id: received: WORLD [info] Worker sent: 3-frames to client: πb_ [info] Worker sent: 3-frames to client: πba [info] clientNbr:8 [info] workerQueue.length: 2 [info] clientNbr:8 [info] workerQueue.length: 1 [info] Worker sent: 3-frames to client: πbb [info] clientNbr:7 [info] workerQueue.length: 2 [info] Client id: received: WORLD [info] clientNbr:6 [info] workerQueue.length: 3 [info] Client id: received: WORLD [info] clientNbr:6 [info] Worker sent: 3-frames to client: πbc [info] workerQueue.length: 2 [info] clientNbr:6 [info] workerQueue.length: 1 [info] Worker sent: 3-frames to client: πbd [info] clientNbr:5 [info] workerQueue.length: 2 [info] Client id: received: WORLD [info] clientNbr:4 [info] workerQueue.length: 3 [info] Client id: received: WORLD [info] clientNbr:4 [info] Worker sent: 3-frames to client: πbe [info] workerQueue.length: 2 [info] clientNbr:4 [info] workerQueue.length: 1 [info] Worker sent: 3-frames to client: πbf [info] clientNbr:3 [info] workerQueue.length: 2 [info] Client id: received: WORLD [info] clientNbr:2 [info] workerQueue.length: 3 [info] Client id: received: WORLD [info] clientNbr:2 [info] Worker sent: 3-frames to client: πbg [info] workerQueue.length: 2 [info] clientNbr:2 [info] Worker sent: 3-frames to client: πbh [info] workerQueue.length: 1 [info] clientNbr:1 [info] workerQueue.length: 2 [info] Client id: received: WORLD [info] Client id: received: WORLD

I count 10 occurrences of Client Id: received: WORLD, so I would've expected the program to exit gracefully via sys.exit(0). But, it appears to me that, after printing out the Client Id: received: WORLD message, it's hung somewhere since it never terminates.

Why doesn't this program terminate?

Preserve method parameter names in scala macro

23 July 2016 - 6:21pm

I have an interface:

trait MyInterface { def doSomething(usefulName : Int) : Unit }

I have a macro that iterates over the methods of the interface and does stuff with the method names and parameters. I access the method names by doing something like this:

val tpe = typeOf[MyInterface] // Get lists of parameter names for each method val listOfParamLists = tpe.decls .filter(_.isMethod) .map(_.asMethod.paramLists.head.map(sym => sym.asTerm.name))

If I print out the names for doSomething's parameters, usefulName has become x$1. Why is this happening and is there a way to preserve the original parameter names?

I am using scala version 2.11.8, macros paradise version 2.1.0, and the blackbox context.

found : scala.collection.immutable.Nil.type found : scala.collection.immutable.Nil.type

23 July 2016 - 5:43pm

This is a snippet from a coursera course. I can't seem to figure out why it isn't working. I thought Nil as a member of List. What is going on here?

scala> :paste // Entering paste mode (ctrl-D to finish) abstract class List[+T] { def map[U](f: T => U): List[U] = this match { case x :: xs => f(x) :: xs.map(f) case Nil => Nil } }

I get the following error:

// Exiting paste mode, now interpreting. <console>:9: error: constructor cannot be instantiated to expected type; found : scala.collection.immutable.::[B] required: List[T] case x :: xs => f(x) :: xs.map(f) ^ <console>:9: error: not found: value x case x :: xs => f(x) :: xs.map(f) ^ <console>:9: error: not found: value xs case x :: xs => f(x) :: xs.map(f) ^ <console>:10: error: pattern type is incompatible with expected type; found : scala.collection.immutable.Nil.type required: List[T] Note: if you intended to match against the class, try `case _: <none>` case Nil => Nil ^ <console>:10: error: type mismatch; found : scala.collection.immutable.Nil.type required: List[U] case Nil => Nil ^

Spark - Conditional grouping within a single RDD

23 July 2016 - 4:05pm

I have information that was originally grouped in a single pairRDD as (num, letter) tuples, that is now grouped as (letter, set(num)) tuples. This representation is open to change, although I think this makes the most amount of sense for now.

I am attempting to group together the latter tuples, all contained within a single RDD, if the set corresponding to one tuple contains an element that is present within another tuple. For example, if I had the tuples ("A", [1, 2, 3, 4]) and ("B", [4, 5, 6]) they should combine to ("A", [1, 2, 3, 4, 5, 6]), because they both contain 4. However, if I had the tuples ("A", [1, 2]) and ("B", [3, 4]), they should remain separate, because there are no common values.

I have very limited knowledge of Scala, but I am attempting to do this problem in Scala anyway. There may be an easier alternative in Java or Python, or something obvious in Scala that I just don't know about. My main idea has been to try replacing all keys with the same key (e.g. just "A") and then performing a reduceByKey on the pairRDD, but I am unable to figure out how to keep the tuples separate if they fail the combination condition. Specifically, I have attempted to try the following:

val secondReduce = firstReduce.values.map(x => ("A", x)).reduceByKey((x, y) => if ((x & y) == x) x ++ y else ???)

Where firstReduce contains the aforementioned (letter, set(num)) tuples. I have no idea what to do for the else statement, or even know if this is a workable idea. What should I put for the else statement in order to make this work? Or, if it can't possibly work, what are my other options?

Let me know if I need to post more information or examples.

Monitoring Tools for Play Framework Application

23 July 2016 - 1:01pm

What are some existing tools for monitoring Play Framework Application?

Of course JVisualVM may be used, but I mean special tools for a Play Application.

2.4 or higher

Twirl template default parameter value is not applied

23 July 2016 - 12:34pm

I have a Play 2.5 template which starts from the following declaration:

@(title: String)(content: Html)(menu:Html = HtmlFormat.empty)(implicit request:Request[AnyContent])

So the second parameter is declared having a default value.

Now in the controller I have this action generator:

def document(title:String) = Action.async{implicit request => documentService.findByTitle(title).map{ case Some(d) => Ok(views.html.document(d)) case None => Ok(main("No document found")(content = Html("There is no such document"))) } }

So I do not pass the value of the menu parameter to the template invocation and I expect this to compile and work in accordance with the default parameter values semantics, but I am getting this compilation error:

[error] D:\Projects\feed\app\controllers\MainController.scala:28: missing arguments for method apply in class main; [error] follow this method with `_' if you want to treat it as a partially applied function [error] case None => Ok(main("No document found")(content = Html("There is no such document"))) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed

Could you explain what is wrong here?

java.net.ConnectException when trying to use Redis from Scala

23 July 2016 - 12:31pm

I created a Scala object for testing Redis client.

import com.redis._ object Test { def main(args: Array[String]) { val redis = new RedisClient("localhost", 6379) val mySet = Set(4, 5, 6, 7, 8) redis.set("myKey", mySet) redis.get("myKey") } }

Redis server is running on docker. It was installed and launched as follows:

## get official redis docker image docker pull redis:latest ## start a redis instance docker run --name some-redis -d redis

Using docker ps I see that the container of Redis is running:

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES xxxxxxxxxxxx redis "docker-entrypoint.sh" 3 seconds ago Up 3 seconds 6379/tcp

However when I execute the test program in Scala, it fails with the error message:

Exception in thread "main" java.lang.RuntimeException: java.net.ConnectException: Connection refused at com.redis.IO$class.connect(IO.scala:37) at com.redis.RedisClient.connect(RedisClient.scala:94) at com.redis.RedisCommand$class.initialize(RedisClient.scala:71) at com.redis.RedisClient.initialize(RedisClient.scala:94) at com.redis.RedisClient.(RedisClient.scala:98) at main.scala.Test$.main(Test.scala:12) at main.scala.Test.main(Test.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.(Socket.java:434) at java.net.Socket.(Socket.java:211) at com.redis.IO$class.connect(IO.scala:25)

What is wrong with my installation? It looks like the Redis server is not running...

Annotation.tailrec is throwing compilation error in scala

23 July 2016 - 11:17am

I am writing merge sort program in scala 2.11.8 version.

Below is my program -

object ListMergeSort { def main(args:Array[String]): Unit ={ val list:List[Int]= List(1,5,7,2,9,3,8,6,4) println(list) println(sort(list)) } def sort(l:List[Int]):List[Int]= { l match{ case Nil => l case h::Nil => l case _ => val (l1,l2) = l splitAt(l.length/2) listMergeSort(sort(l1),sort(l2)) } } //@annotation.tailrec def listMergeSort(l1:List[Int],l2:List[Int]):List[Int]={ (l1,l2) match{ case (Nil,l2) => l2 case (l1,Nil) => l1 case (h1::t1, h2::t2) => if (h1<h2) h1::listMergeSort(t1,l2) else h2::listMergeSort(l1,t2) } } }

The above program works fine technically correct and the output gives me the sorted list.

I want to annotate the listMergeSort function as tail recursive but the compiler is giving error "recursive call not in tail position".

import scala.util.Random object test { def msort[T <% Ordered[T]](xs: List[T]): List[T] = { @annotation.tailrec def merge(res: List[T], xs: List[T], ys: List[T]): List[T] = (xs, ys) match { case (_, Nil) => res.reverse ::: xs case (Nil, _) => res.reverse ::: ys case (x :: xs1, y :: ys1) => if (x < y) merge(x :: res, xs1, ys) else merge(y :: res, xs, ys1) } val n = xs.length / 2 if (n == 0) xs else { val (ys, zs) = xs splitAt n merge(Nil, msort(ys), msort(zs)) } } def main(args: Array[String]) { val list = Seq.fill(10)(Random.nextInt(500)).toList println(list) println(msort(list)) } }

The above program also has similar merge function but the @annotation.tailrec is not throwing any error.

can anyone help what could be the root cause of the error in my program?

Need to pull a specific message using spark

23 July 2016 - 10:18am

I am very new to spark. I am trying to pull the date and the error message from the following error log

Nov 11 09:44:53 www httpd[1933]: [error] [client 10.2.23.89] (36)File name too long

Nov 11 09:49:38 www httpd[2728]: [error] [client 10.2.23.128] (36)File name too long

Nov 11 10:14:23 www httpd[4530]: [error] [client 10.2.23.243] (36)File name too long

Nov 11 10:15:24 www httpd[4630]: [error] [client 10.2.23.42] (36)File name too long

Nov 11 12:05:07 www httpd[12062]: [error] [client 10.2.23.148] (36)File name too long

This my script which I have used in scala shell

val inputfile = sc.textFile("hdfs://localhost:8020/kirthi/errorlog.txt") val elog = inputfile.map(line => (line.substring(0, 6),line.substring(65, 83))) elog.collect()

This is the output am getting but it's not as expected. The message is not completely pulled in all the tuples. Since the length of each line of log varies.

Array[(String, String)] = Array((Nov 11,File name too long), (Nov 11,)File name too lon), (Nov 11,)File name too lon), (Nov 11,File name too long), (Nov 11,6)File name too lo))

But I want the output as posted below

Array[(String, String)] = Array((Nov 11,File name too long), (Nov 11,File name too long), (Nov 11,File name too long), (Nov 11,File name too long), (Nov 11,File name too long))

If I increase the length of the subtring am hitting with an array out of bounds expection error.

Could someone please help me out in getting the expected result.

Thanks

Scala breakable function

23 July 2016 - 10:06am

I have a function like this one: def process(update:(Any) => Any, break:terminate)

Class terminate :

class terminate() { private var terminate = false def apply() = terminate = true def break() = terminate }

Function update has a reference to object break, so it can call break() method of the same object function process gets.

Function process does some heavy expensive computations meanwhile calling update() with some progress updates and intermediate results.

As time of execution of the process function may be too long, update function may break the entire process using terminate class.

How should I exit process() (checking if(break.break()) inside it) keeping best performance possible ? Meaning throwing exception is probably not the best option. Doing many if checks inside the process() is not good either. So what do you think ?

Any other solutions to this problem that do not use separate termination class to break a function call are appreciated.

What is the efficient way of reading and processing very large CSV file in scala (> 1GB)?

23 July 2016 - 9:57am

In Scala how do you efficiently (memory consumption + performance) read very large csv file? is it fast enough to just stream it line by line and process each line at each iteration?

What i need to do with CSV data :-> In my application Single line in CSV file is treated as an one single record and all the records of the CSV file are to be converted into XML elements and JSON format and save it into another file in xml and json formats.

So here question is while reading the file from csv is it a good idea to read the file in chunks and provide that chunk to another thread which will convert that CSV records into an xml/json and write that xml/json to file? If yes how?

Data of the CSV can be anything, there is no restriction on the type of the data it can be numeric, big decimal, string or date. Any easy way to handle this different data types before saving it to xml? or we don't need to take care of types?

Many Thanks

how to get paginated select on slick + postgresql

23 July 2016 - 9:07am

In a postgresql database, with slick 3, what's the best way to have pagination?

  • get all rows and do pagination with scala (seems not very efficient) ?
  • static query with limit and offset?
  • is there any other way?

Trying to get a list with the Typesafe config library

23 July 2016 - 8:01am

I try to get a list from a config according to this example: How to get a list with the Typesafe config library

However, I get following exception:

Exception in thread "main" com.typesafe.config.ConfigException$WrongType: application.properties @ file:/xxx/application.properties: configYYY has type STRING rather than LIST at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:159) at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189) at com.typesafe.config.impl.SimpleConfig.getList(SimpleConfig.java:252) at com.typesafe.config.impl.SimpleConfig.getHomogeneousUnwrappedList(SimpleConfig.java:323) at com.typesafe.config.impl.SimpleConfig.getStringList(SimpleConfig.java:381)

How can I get a list from typesafe?

Need help on Spark MLlib Regression Algorithm explanation

23 July 2016 - 7:20am

I started learning Spark MLlib in Scala . Looking at below link,for Linear Regression Algorithm.

http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-least-squares-lasso-and-ridge-regression

and Training data at : https://github.com/apache/spark/blob/master/data/mllib/ridge-data/lpsa.data

In that example, saying "We compute the mean squared error at the end to evaluate goodness of fit". Could you please explain what does it mean?

And in That example, I understood

// Load and parse the data

// Building the model

And Not understood below and Could you please explain below

// Evaluate model on training examples and compute training error

// Save and load model

Thanks, Raghav

Scala- writing list to file using foreach

23 July 2016 - 6:53am

I'm trying to write a list I have into a file and I'm trying to it with the foreach call, as can be done with println. this works:

list.foreach(println)

but this won't work:

val file = "whatever.txt" val writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file))) list.foreach(writer.write)

I've tried some other ways to print to a file and in all of them had no luck, what an I doing wrong?

Materialize mapWithState stateSnapShots to database for later resume of spark streaming app

23 July 2016 - 4:46am

I have a Spark scala streaming app that sessionizes user generated events coming from Kafka, using mapWithState. I want to mature the setup by enabling to pauze and resume the app in the case of maintenance. I’m already writing kafka offset information to a database, so when restarting the app I can pick up at the last offset processed. But I also want to keep the state information.

So my goal is to;

  1. materialize session information after a key identifying the user times out.
  2. materialize a .stateSnapshot() when I gracefully shutdown the application, so I can use that data when restarting the app by feeding it as a parameter to StateSpec.

1 is working, with 2 I have issues.

For the sake of completeness, I also describe 1 because I’m always interested in a better solution for it:

1) materializing session info after key time out

Inside my update function for mapWithState, I have:

if (state.isTimingOut()) { // if key is timing out. val output = (key, stateFilterable(isTimingOut = true , start = state.get().start , end = state.get().end , duration = state.get().duration ))

That isTimingOut boolean I then later on use as:

streamParsed .filter(a => a._2.isTimingOut) .foreachRDD(rdd => rdd .map(stuff => Model(key = stuff._1, start = stuff._2.start, duration = stuff._2.duration) .saveToCassandra(keyspaceName, tableName) )

2) materialize a .stateSnapshot() with graceful shutdown

Materializing snapshot info doesn’t work. What is tried:

// define a class Listener class Listener(ssc: StreamingContext, state: DStream[(String, stateFilterable)]) extends Runnable { def run { if( ssc == null ) System.out.println("The spark context is null") else System.out.println("The spark context is fine!!!") var input = "continue" while( !input.equals("D")) { input = readLine("Press D to kill: ") System.out.println(input + " " + input.equals("D")) } System.out.println("Accessing snapshot and saving:") state.foreachRDD(rdd => rdd .map(stuff => Model(key = stuff._1, start = stuff._2.start, duration = stuff._2.duration) .saveToCassandra("some_keyspace", "some_table") ) System.out.println("Stopping context!") ssc.stop(true, true) System.out.println("We have stopped!") } } // Inside the app object: val state = streamParsed.stateSnapshots() var listener = new Thread(new Listener(ssc, state)) listener.start()

But when running this (so launching the app, waiting several minutes for some state information to build up, and then entering key 'D', I get the below. So I can't do anything 'new' with a dstream after quitting the ssc. I hoped to move from a DStream RDD to a regular RDD, quit the streaming context, and wrap up by saving the normal RDD. But don't know how. Hope someone can help!

Exception in thread "Thread-52" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after sta$ ting a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222) at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) at org.apache.spark.streaming.dstream.ForEachDStream.<init>(ForEachDStream.scala:34) at org.apache.spark.streaming.dstream.DStream.org$apache$spark$streaming$dstream$DStream$$foreachRDD(DStream.scala:687) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:659) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:659) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260) at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:659) at main.scala.feaUS.Listener.run(feaUS.scala:119) at java.lang.Thread.run(Thread.java:745)

Mapping JsArray in EsSpark and Spark Streaming

23 July 2016 - 4:36am

I want to save the following JsValue (play.api.libs.json.JsValue) object into Elasticsearch using Spark Streaming:

{"id":"123:123-123","title":"Bla-bla-bla","files":[{"file":"5678"},{"file":"1234"}]}

Given DStream[JsValue] MyDStream I do the following operation:

MyDStream.foreachRDD{rdd => rdd.map( line => HashMap( "id" -> (line \ "id").as[JsString].value, "title" -> (line \ "title").as[JsString].value, "files" -> (line \ "files").as[JsArray].value)) .distinct() .saveToEs("texts/text",Map("es.mapping.id" -> "id")) }

It works fine except properly parsing JsArray. This is what I get in Elasticsearch:

}, { "underlying$1": { "file": { "value": "1234" } } } ]

I want to exclude underlying$1 and value so that the result looks as follows:

"files": [ { "file": "5678" }, { "file": "1234" } ]

Converting RDD to DataFrame scala - NoSuchMethodError

23 July 2016 - 4:03am

I am trying to convert an RDD to a DataFrame in scala as follows

val posts = spark.textFile("~/allPosts/part-02064.xml.gz") import org.apache.spark.SparkContext._ import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.SparkContext._ val sqlContext = new org.apache.spark.sql.SQLContext(spark) import sqlContext.implicits._ posts.map(identity).toDF()

When I do this I get the following error.

java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext$implicits$.stringRddToDataFrameHolder(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/DataFrameHolder;

I can't for the life of me figure out what I'm doing wrong.

What does the Aux pattern accomplish in Scala?

23 July 2016 - 3:30am

I have a bit of a sense of the Aux pattern (as used in shapeless and elsewhere) in which a type member is extracted into a type parameter, and I know it's a workaround the fact that arguments in the same argument list can't depend on each other -- but I'm not clear generally what it's used for and what problems it solves.

For example, I'm currently trying to figure out how to preserve and work with the more specific type returned by a whitebox macro -- is this a usecase for Aux?

Is there a simple description?

in scala, comparing Int 7 with char '7' gives false

23 July 2016 - 3:27am

Why do the following expressions evaluate to false?

scala> 7 == '7' res0: Boolean = false scala> 7.toChar == '7' res1: Boolean = false scala> 7.toChar equals '7' res2: Boolean = false

What is the correct way to compare number with characters?

My problem is that I have the following Map[Char, Int](),

Map(7 -> 2, 1 -> 1, 5 -> 1, 0 -> 1)

and

map getOrElse(7.toChar, 0)

returns 0. I would expect the result to be 2, since my map contains 2 -> 7.

> omeprazole 40 mg price - buy misoprostol and mifepristone
- valtrex buy online no prescription - where to buy asacol - cell spy phone - generic levitra -